エンジニア的なネタを毎週書くブログ

東京でWebサービスの開発をしています 【英語版やってみました】http://taichiw-e.hatenablog.com/

ParallelStreamで外部接続しちゃいかんの? → 独自スレッドプールでParallelStreamを使ってみた

ParallelStreamでI/O待ち状態を作るとプール内のスレッドを食いつぶす?

ParallelStreamはアプリ全体で共通で持っているスレッドプールを使う上、CPUのコア数分しかスレッドが無いのでI/O待ちが発生するようなものには使ってはいけない、という話を聞きました。

そうなの?ということで検証してみました。

List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
	list.add(Integer.valueOf(i));
}

list.stream()
  .parallel()
  .forEach(i -> {
    //1秒かかる外部サービス
    restTemplate.getForObject("http://localhost:8081/sleep/", String.class);
    System.out.println(i);
});

f:id:taichiw:20180930225024p:plain

ForkJoinPool.commonPool-worker- ...
という、アプリ内での共通スレッドプールが、ParallelStreamで使われています。
4コアCPUで実験しているため、プール内のスレッドは3つ。(+呼び出し元のスレッドで可動しているスレッドは4つ)。

I/O待ちの間はスレッドは専有状態になるため、
4個通信 → (1秒経ってレスポンス) → また4個通信

というように、外部通信が4件ずつしか行われません。

また、この間に同じアプリ内の他の処理でもParallelStreamを使おうとすると、そちらとの、Common Pool内のスレッド取得合戦が起こってしまうことも確認できました。

オレオレスレッドを使ってみる …でもこれ大丈夫?

Common Poolのスレッド数を設定で増やすことも可能なようですが、「スレッド取得合戦」を避けるため、独自のスレッドを作ってParallelStreamを回すことを検討してみます。

ForkJoinPool pool = new ForkJoinPool(10);
pool.submit(() ->
  list.stream()
  .parallel()
  .forEach(i -> {
    //1秒かかる外部サービス
    restTemplate.getForObject("http://localhost:8081/sleep/", String.class);
    System.out.println(i);
  })
).get();
ForkJoinPool pool = new ForkJoinPool(10);

で、スレッドプールを作成し、このプールを使ってparallel streamを回します。

すると

  1. スレッド数の上限が10になったので、10並列で通信を実行。一気に10回分のサービス呼び出しが終了。
  2. 同時に、アプリ内の他の箇所でParallelStream(デフォルトのCommon Poolを使用)を実行してもスレッド取得が競合しない

という結果が得られました。

ただ… メソッド実行のたびにスレッドが増えていきます。大丈夫なんでしょうか。
f:id:taichiw:20180930222344p:plain

これ、本番可動させてたら、一定期間経つとアラートが飛んできて、再起動させられるんじゃないでしょうか。
悪夢が蘇ってきます。


…ということで軽くロングランテストを実施。
3時間位流し続けてみた限りでは、
GCのタイミングで古いスレッドは消えていっており、特にスレッドが溢れたりメモリリークしたりしている様子はありませんでした。
f:id:taichiw:20180930225134p:plain
49652番目のスレッドプール。これより以前に作成されたものは消えている。

ただ、都度スレッドを作成して破棄するのは、CPUとメモリの無駄遣いっぽい雰囲気がします。

専用の「スレッドプール」を作る

ということで、以下のように(シングルトンの)インスタンス変数としてスレッドプールを定義してみました。

@RestController
public class TestController {

  final ForkJoinPool pool = new ForkJoinPool(100);

  @GetMapping(value = "test")
  public String test() throws ExecutionException, InterruptedException {

    RestTemplate restTemplate = new RestTemplate();

    List<Integer> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      list.add(Integer.valueOf(i));
    }

    pool.submit(() ->
        list.stream().parallel()
            .forEach(i -> {
              //1秒かかる外部サービス
              restTemplate.getForObject("http://localhost:8081/sleep/", String.class);
              System.out.println(i);
            })
        ).get();

    System.out.println("OK");
    return "OK";
  }
}

f:id:taichiw:20180930224809p:plain

いい感じでスレッドの使い回しができているようです。

SpringBootのアプリで、JMHを使ってみた

メソッドのちょっとした書き方の違いによる速度の計測をしたくて(Micro Benchmarkというらしい。対義語はMacro Benchmark)、JMHで計測をしてみました。

JMHのオフィシャルページを見ると、mvn archetype:generate なるものが出てきて「なんじゃこりゃ?」だったり
適当にググったページでは、そもそも古くて今は動かないサンプルだったりと、どうにもわかりにくかったのですが、こちらのページのとおりに試してみたところ、うまくいきました。
Microbenchmarking with Java | Baeldung

それでもハマったところ
1. SpringBootのmain methodがあるクラスに、Benchmark用のmain methodも書いたところ、以下のようなエラーが出て何故か起動できず。

# JMH version: 1.21
# VM version: JDK 1.8.0_121, Java HotSpot(TM) 64-Bit Server VM, 25.121-b13
# VM invoker: C:\Program Files\Java\jdk1.8.0_121\jre\bin\java.exe
# VM options: -Xmx3000m -Ddebug -XX:TieredStopAtLevel=1 -Xverify:none -Dspring.output.ansi.enabled=always -Dzookeeper.address=b -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=51723 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true -javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1\lib\idea_rt.jar=51724:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1\bin -Dfile.encoding=UTF-8
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: rakuten.travel.availability.calculation.api.BenchmarkRunner.benchMarkKick3

# Run progress: 0.00% complete, ETA 00:10:00
# Warmup Fork: 1 of 1
Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 51723; nested exception is: 
	java.net.BindException: Address already in use: JVM_Bind
<forked VM failed with exit code 1>
<stdout last='20 lines'>
</stdout>
<stderr last='20 lines'>
Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 51723; nested exception is: 
	java.net.BindException: Address already in use: JVM_Bind
</stderr>

→ 上記ページのように、mainメソッドを書くための専用のクラスを別に作ったところ、問題なく動作。

2. テスト対象のメソッドにパラメータを渡す方法がわからなかった
→ これも上記ページに書いてある


おまけ
とあるメソッドのリファクタ結果

Benchmark                        Mode  Cnt       Score        Error  Units
BenchmarkRunner.benchMarkKick1  thrpt    5   29271.881 ±   1088.831  ops/s
BenchmarkRunner.benchMarkKick2  thrpt    5   74336.732 ±   4597.243  ops/s
BenchmarkRunner.benchMarkKick3  thrpt    5  280990.396 ± 185017.629  ops/s

1はもとのまま。2はちょっとリファクタリング。3はもっとリファクタリング
もとより桁一つ速くなった。うひょー!

Mavenで実行しているTestに割り当てるヒープを増やす方法

Maven Surefire Pluginを使う。
https://maven.apache.org/surefire/maven-surefire-plugin/

  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <configuration>
        <argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
    </configuration>
  </plugin>

ぐぐると、MAVEN_OPTSに書けばいい とか、 Jenkinsの場合はGlobal MAVEN_OPTSに書けばいい とか出てくるんだけど、
これは、MavenJavaプロセスに割り当てるメモリ。
その中で実行されるテスト用のメモリは別に確保される。

実際、テストコード内で以下のように割り当てメモリを吐いてみると

		int mb = 1024 * 1024;
		Runtime runtime = Runtime.getRuntime();
		System.out.println("Max Memory:" + runtime.maxMemory() / mb);

MAVEN_OPTSに何を書いても増えないし減らない。

Chrome Tech Talk Night

events.withgoogle.com
Google さんで行われた、Chrome Tech Talk Night に参加してきました。

弊社の新人研修依頼、10年以上ぶりに森タワーのオフィスに入ったなぁ とか しょうもないことを思いながら。


そもそも私、フロントエンジニアではないので、正直細かいところはよくわからんというか的はずれな理解だった感も否めないのですが…


発表する人、発表する人、

  • これは標準化される(予定である/されている)
  • まだ絶賛仕様決めてるような段階だけどGithubに最新のリポジトリがある
  • オンラインでオープンにディスカッションされている


くっ ここはそういう世界か…!
と。


テクノロジー会社としてのあるべき姿をまざまざと見せつけられた気がしました。

なので忘れないようにブログ。

"Fiber" で 100万スレッドが実行可能に!? - Project Valhalla とProject Loom : JavaDayTokyo 2018 参加レポート2 #JavaDayTokyo

Project XXX系の話について、Project ValhallaとProject Loomについて聞きました。
どちらも講演はDavid Buckさん*1

残念ながら、どちらもJava11には入らないので、LTSのリリースサイクルを考えるとまともにサービスで使えるのは早くて2021年…
とはいえ、どちらもとても夢のある話でした。

Project Valhalla

Value Types (値型)

class Point {
   int x;
   int y;
}

のような、プリミティブ型をフィールドとして持つクラスがある時に

  Point[] points;

配列を作ると、各Pointインスタンス毎にヘッダを持ってしまい、大きなメモリのオーバーヘッドが発生します。
メモリの最適化だけ考えると

  int[] xList;
  int[] yList;

とプリミティブ変数の配列を宣言するのが最も良いのですが、これではせっかくのオブジェクト指向が台無し。

そこで、「例えば*2

value class Point {
   int x;
   int y;
}

このようにクラス宣言した際に、さも intの配列が2つあるかのようなメモリ確保がされる… というのが概要です。
これによって、省メモリ化に加えて、参照の高速化も図ることができるようになります。

ここまでは昨年のJavaDayTokyoでも既に紹介されていました。

Generic Specialization (ジェネリックの特殊化)

上記を実現する過程で、このようなことができる様になる… と紹介されました。

ArrayList<int> xList;

要はプリミティブ型がほとんど普通のオブジェクトのように使えると。
さらに、intStreamのような(普通のstreamと別にメソッドを用意しなくてはならないような)「ダサい」対応も不要になるとのことです。

とは言えいろいろ悩んでるらしい

とは言え、この「ジェネリックの特殊化」をいざ実装しようとすると悩ましい課題が多いらしく… まだまだ実用化には時間がかかりそうでした。

実際の所、どのくらい使えるんだろう

現実的な利用シーンを考えると、フィールドにプリミティブしか持たないクラスというのはあんまりない気も。
せめて、StringやLocalDate系が混ざっていてもうまいこと扱っていただけるようになると良さそうなのですが。

Project Loom

こちらがタイトルの、”100万スレッド”です。
現在のJVMでは、JVM上のスレッドは、OSが管理するスレッドとマッピングされています。それぞれのスレッドは、用途が異なるかもしれません。あるスレッドは複雑な暗号化に使われるかもしれないし、またあるスレッドは単に変数を一つ書き換えるだけの簡単な処理に使われるかもしれません。

しかしながらOSは、アプリケーションのコードを知らないため、各スレッドの用途もわかりません。そのため、どのスレッドも一律、「何にでも使えるスレッド」を用意することになります。

ここで、「Fiber」という考え方が登場します。Fiberは、OSではなく、Javaのruntimeやユーザコードによって支配されるスレッド。アプリケーションはスレッドの用途を知っているため、無駄のないスレッドを効率的に作ることが可能です。その結果、OS管理のスレッドに比べてなんと1000倍ほど数のスレッドが作れるようになるとのこと。
それだけOSが作るスレッドには「無駄」があるそうです。

ブロッキングと非同期

ちょうど一年前のJavaDayTokyoで聞いた話ですが、
taichiw.hatenablog.com
ブロッキングな処理はスケールしにくい、という問題点があります。
その対策として、非同期(ノンブロッキング)に処理を制御する、という方法があるものの、プログラミングやデバッギングが難しい、という問題があります。

この双方の問題を解決するのがFiber.
スレッドモデル(=ブロッキングな処理)であるため取扱が簡単であるにもかかわらず、スケーラビリティに優れるという夢のようなお話です。

言ってることが未来

Fiberはスレッドなので、「どんな処理が」「どこまで進んだか」が管理されています。
他方、JVM上で管理されているオブジェクトなので、Serializeが可能とのこと。
その結果何ができるようになるかと言うと…

  • 実行中のスレッドをSerializeして保存。JVM停止。→JVMを起動してスレッドの続きを実行
  • 分散データベースに於いて、「処理を他のサーバに転送」

と言ったことができるようになるとのこと。

… よくわかりません。

まだ検討段階

実際の所、まだまだ開発が始まったばかりでCommitもないとか。
早くて今年中にアーリーアクセスが出る…… かも。

とのことでした。

しかしとっても夢のある話。実現が楽しみです。

*1:基調講演でFlight Recorderのデモもされて… 来週末の JJUG CCCでもセッションがあるそうで… 寝不足で頭が回ってないといってましたがこれだけ準備してたら、そらそうでしょうなぁ…

*2:まだ決まったわけではない… と強調されてました

Java11について : JavaDayTokyo 2018 参加レポート1 #JavaDayTokyo

今年もJavaDayTokyoに参加させていただきました。

今年の自分なりのテーマは、以下についての情報収集でした。

  • 半年後にリリースされるLTSバージョンであるJava11の新機能
  • その他今後追加される機能について
    • 特に、一昨年くらいから聞いているProject Valhalla。結構な数のコレクションをゴリゴリ処理するのが最近の自分のお仕事の大事なところなので、どうなったか興味アリ

このエントリでは主にJava 11についてまとめます。

Java 10とJava 11 の差

Java 10 からJava 11への追加機能として、現在8件のJPEが予定されているそうです。
http://openjdk.java.net/projects/jdk/11/

この内、コードの書き方に影響がありそうな、文法面での変更は、Project Amberの一つでもあるこれだけかなぁ…と。
JEP 323: Local-Variable Syntax for Lambda Parameters

ということで、Java8 → Java11の差は雑に言うと、
Project Jigsaw + "var"
と言っていいかなぁ… という気がしました。

櫻庭 祐一 さんのセッション、「Java SE 10、そしてJava SE 11への移行ガイド」でも、8つのチェックポイントが紹介されていましたが、ハマりどころになるのは概ねJigsaw絡みと感じました。

Ask the Experts! でもそのような回答だったようです。

11 の次の LTS バージョンは 17 !

Java 11の次のLTSバージョンは、2021年9月にリリースされるJava17とのこと。
私、1年半に一回LTSが出るものだと思っていたんですが、3年後なんですね。オリンピック終わっちゃってるのかぁ…

Mission Control と Flight RecorderがJDK11からオープンソース

これまで有償ツールだったMission ControlとFlight Recorderが、JDK11から無償で使えるようになるとのこと。
基調講演内でデモがあったのですが、パフォーマンスの問題を見つけるのにずいぶん使えそうに見えました。
気になる…!

一つのドキュメントに、いろいろ一緒に書くのは難しかった話

・ビジネス上の要件
・自分のところのAPIはどう振る舞うべきか
・どうしてそう決めたか(ちょっともとの要件からねじれてる)
・中はどう作るべきか

を、全部一箇所に書くのは結局無理でした。

なので
・ビジネス上の要件
・自分のところのAPIはどう振る舞うべきか(の、一段階抽象化したやつ)
だけを書いて

・どうしてそう決めたか → こうすれば「想定されてるケース全部うまくいくから亅で逃げる
・本当のAPIの振る舞い → 別に書いたテストで表現
・どう作るか → 上のテストが通れば何でもいいよ

という体裁に。
俺が伝えるべきことは伝えられるドキュメントになったと思うけど、
俺は一体何屋なんだ(笑)