ParallelStreamで外部接続しちゃいかんの? → 独自スレッドプールでParallelStreamを使ってみた
ParallelStreamでI/O待ち状態を作るとプール内のスレッドを食いつぶす?
ParallelStreamはアプリ全体で共通で持っているスレッドプールを使う上、CPUのコア数分しかスレッドが無いのでI/O待ちが発生するようなものには使ってはいけない、という話を聞きました。
そうなの?ということで検証してみました。
List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add(Integer.valueOf(i)); } list.stream() .parallel() .forEach(i -> { //1秒かかる外部サービス restTemplate.getForObject("http://localhost:8081/sleep/", String.class); System.out.println(i); });
手元のPC上で動かした結果を、Visual VMでみてみたところ。
ForkJoinPool.commonPool-worker- ...
という、アプリ内での共通スレッドプールが、ParallelStreamで使われています。
4コアCPUで実験しているため、プール内のスレッドは3つ。(それに加えて呼び出し元のスレッドがあるので、JVM全体で可動しているスレッドは4つ)。
I/O待ちの間はスレッドは専有状態になるため、
4個通信 → (1秒経ってレスポンス) → また4個通信
というように、外部通信が4件ずつしか行われません。
また、この間に同じアプリ内の他の処理でもParallelStreamを使おうとすると、そちらとの、Common Pool内のスレッド取得合戦が起こってしまうことも確認できました。
オレオレスレッドを使ってみる …でもこれ大丈夫?
Common Poolのスレッド数を設定で増やすことも可能なようですが、「スレッド取得合戦」を避けるため、独自のスレッドを作ってParallelStreamを回すことを検討してみます。
ForkJoinPool pool = new ForkJoinPool(10); pool.submit(() -> list.stream() .parallel() .forEach(i -> { //1秒かかる外部サービス restTemplate.getForObject("http://localhost:8081/sleep/", String.class); System.out.println(i); }) ).get();
ForkJoinPool pool = new ForkJoinPool(10);
で、スレッドプールを作成し、このプールを使ってparallel streamを回します。
すると
- スレッド数の上限が10になったので、10並列で通信を実行。一気に10回分のサービス呼び出しが終了。
- 同時に、アプリ内の他の箇所でParallelStream(デフォルトのCommon Poolを使用)を実行してもスレッド取得が競合しない
という結果が得られました。
ただ… メソッド実行のたびにスレッドが増えていきます。大丈夫なんでしょうか。
これ、本番可動させてたら無尽蔵にスレッドが増え、一定期間経つとアラートが飛んできて、再起動を余儀なくさせられるんじゃないでしょうか。
過去の悪夢が蘇ってきます。
…ということで軽くロングランテストを実施。
3時間位流し続けてみた限りでは、
GCのタイミングで古いスレッドは消えていっており、特にスレッドが溢れたりメモリリークしたりしている様子はありませんでした。
49652番目のスレッドプール。これより以前に作成されたものは消えている。
ただ、都度スレッドを作成して破棄するのは、CPUとメモリの無駄遣いっぽい雰囲気がします。
専用の「スレッドプール」を作る
ということで、以下のように(シングルトンの)インスタンス変数としてスレッドプールを定義してみました。
@RestController public class TestController { final ForkJoinPool pool = new ForkJoinPool(100); @GetMapping(value = "test") public String test() throws ExecutionException, InterruptedException { RestTemplate restTemplate = new RestTemplate(); List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add(Integer.valueOf(i)); } pool.submit(() -> list.stream().parallel() .forEach(i -> { //1秒かかる外部サービス restTemplate.getForObject("http://localhost:8081/sleep/", String.class); System.out.println(i); }) ).get(); System.out.println("OK"); return "OK"; } }
いい感じでスレッドの使い回しができているようです。