エンジニア的なネタを毎週書くブログ

東京でWebサービスの開発をしています 【英語版やってみました】http://taichiw-e.hatenablog.com/

ParallelStreamで外部接続しちゃいかんの? → 独自スレッドプールでParallelStreamを使ってみた

ParallelStreamでI/O待ち状態を作るとプール内のスレッドを食いつぶす?

ParallelStreamはアプリ全体で共通で持っているスレッドプールを使う上、CPUのコア数分しかスレッドが無いのでI/O待ちが発生するようなものには使ってはいけない、という話を聞きました。

そうなの?ということで検証してみました。

List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
	list.add(Integer.valueOf(i));
}

list.stream()
  .parallel()
  .forEach(i -> {
    //1秒かかる外部サービス
    restTemplate.getForObject("http://localhost:8081/sleep/", String.class);
    System.out.println(i);
});

f:id:taichiw:20180930225024p:plain
手元のPC上で動かした結果を、Visual VMでみてみたところ。

ForkJoinPool.commonPool-worker- ...
という、アプリ内での共通スレッドプールが、ParallelStreamで使われています。
4コアCPUで実験しているため、プール内のスレッドは3つ。(それに加えて呼び出し元のスレッドがあるので、JVM全体で可動しているスレッドは4つ)。

I/O待ちの間はスレッドは専有状態になるため、
4個通信 → (1秒経ってレスポンス) → また4個通信

というように、外部通信が4件ずつしか行われません。

また、この間に同じアプリ内の他の処理でもParallelStreamを使おうとすると、そちらとの、Common Pool内のスレッド取得合戦が起こってしまうことも確認できました。

オレオレスレッドを使ってみる …でもこれ大丈夫?

Common Poolのスレッド数を設定で増やすことも可能なようですが、「スレッド取得合戦」を避けるため、独自のスレッドを作ってParallelStreamを回すことを検討してみます。

ForkJoinPool pool = new ForkJoinPool(10);
pool.submit(() ->
  list.stream()
  .parallel()
  .forEach(i -> {
    //1秒かかる外部サービス
    restTemplate.getForObject("http://localhost:8081/sleep/", String.class);
    System.out.println(i);
  })
).get();
ForkJoinPool pool = new ForkJoinPool(10);

で、スレッドプールを作成し、このプールを使ってparallel streamを回します。

すると

  1. スレッド数の上限が10になったので、10並列で通信を実行。一気に10回分のサービス呼び出しが終了。
  2. 同時に、アプリ内の他の箇所でParallelStream(デフォルトのCommon Poolを使用)を実行してもスレッド取得が競合しない

という結果が得られました。

ただ… メソッド実行のたびにスレッドが増えていきます。大丈夫なんでしょうか。
f:id:taichiw:20180930222344p:plain

これ、本番可動させてたら無尽蔵にスレッドが増え、一定期間経つとアラートが飛んできて、再起動を余儀なくさせられるんじゃないでしょうか。
過去の悪夢が蘇ってきます。


…ということで軽くロングランテストを実施。
3時間位流し続けてみた限りでは、
GCのタイミングで古いスレッドは消えていっており、特にスレッドが溢れたりメモリリークしたりしている様子はありませんでした。
f:id:taichiw:20180930225134p:plain
49652番目のスレッドプール。これより以前に作成されたものは消えている。

ただ、都度スレッドを作成して破棄するのは、CPUとメモリの無駄遣いっぽい雰囲気がします。

専用の「スレッドプール」を作る

ということで、以下のように(シングルトンの)インスタンス変数としてスレッドプールを定義してみました。

@RestController
public class TestController {

  final ForkJoinPool pool = new ForkJoinPool(100);

  @GetMapping(value = "test")
  public String test() throws ExecutionException, InterruptedException {

    RestTemplate restTemplate = new RestTemplate();

    List<Integer> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      list.add(Integer.valueOf(i));
    }

    pool.submit(() ->
        list.stream().parallel()
            .forEach(i -> {
              //1秒かかる外部サービス
              restTemplate.getForObject("http://localhost:8081/sleep/", String.class);
              System.out.println(i);
            })
        ).get();

    System.out.println("OK");
    return "OK";
  }
}

f:id:taichiw:20180930224809p:plain

いい感じでスレッドの使い回しができているようです。