Concurrency Utilitiesの「再」まとめ - Java8対応版
以前、このような記事を書きました。
- Concurrency Utilitiesを使った並列処理・マルチスレッドのおさらい (2013-12-26)
前回の内容は、Concurrency UtilitiesだけでなくJavaのマルチスレッドの話も一部含んでいましたが、今回は、Concurrency Utilitiesだけにフォーカスして、全体が分かるように整理してまとめ直しました。
目次
概要
今回は、Concurrency UtilitiesのAPIをいくつかのグループに分類し、それぞれのグループの主要な機能を「広く浅く」紹介する、という形式でまとめています。
パッケージ単位で分け、それからjava.util.concurrent
をさらに機能ごとに分類しています。java.util.concurrent
パッケージの「説明」のところに記載されているものが分かりやすいので、それを参考に分けてみました。
java.util.concurrent
パッケージ- タスク・フレームワーク (Executor)
- 同期キュー
- シンクロナイザー
- 並行処理コレクション
- タイミング
java.util.concurrent.atomic
パッケージ- アトミック値型とアトミック操作
java.util.concurrent.locks
パッケージ- ロック・フレームワーク
内容のほとんどが、APIドキュメントを根気よく読めば分かる情報です。残りはソースコードを読んだり実際に使ってみて得た情報です。
なお、今回はJava8対応版なので、サンプルコードではラムダ式やストリームを使っています。
準備
実行環境は、Java8である以外は特に指定しません。今回の内容では、プラットフォームに依存するものは扱っていない認識です。
APIの動作を実際に動かして確認したいとき、APIドキュメントに記載されているサンプルコードの多くはクラス側の実装しか書いてないので、それを使って動かすコードを自前で書く必要があります。
この記事では、簡単に動作をお試しいただけるように、記事中のサンプルコードはそのままmain
メソッドに貼りつけてimport
を構成すれば実行できるようにしています。コードを簡潔にするために、いくつかユーティリティーメソッドを定義していますので、下記の雛形をお使いください。IDEをお使いでない方は、各コードの先頭にコメントアウトしたimport文を置いていますのでこれをご利用ください。
App.java
(ユーティリティー&mainクラス)
package local; public final class App { public static void main(String[] args) { // ここにサンプルコードをコピペしてください } static void log(String fmt, Object... args) { String now = String.format("%1$tT.%1$tL", System.currentTimeMillis()); now = "--:" + now.substring(3); System.out.printf("%s [%s] %s%n", now, Thread.currentThread().getName(), String.format(fmt, args)); } static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } }
タスク・フレームワーク (Executor)
ここで言うタスク・フレームワークとは、タスク(Runnable/Callable/FutureTask
)、スレッドプール(ThreadPool/ExecutorService
)、非同期入出力(Future
)、などで構成される、このAPIの中核の機能のことです。
基本的なタスク・フレームワークの使い方は、Executors
クラスのファクトリーメソッドを使ってスレッドプールを生成し、Runnable
,Callable<V>
,FutureTask<V>
などの実装クラスのタスクを実行させる、というものです。この場合、スレッドプールはExecutorService
インターフェイス,ScheduledExecutorService
インターフェイスを介して操作します。
前回の記事ではもう少し詳しく説明していますので、こちらも合わせてご覧ください。
今回は、Executors.newScheduledThreadPool(int)
を使った例を示します。
- 例:
Executors.newScheduledThreadPool(int)
で生成したスレッドプールでタスクを実行し結果を取得
// import java.util.concurrent.*; log("start"); ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); Callable<String> task = () -> { try { log("start"); sleep(3_000L); // 時間のかかる処理の代わり return "OK"; } finally { log("end"); } }; ScheduledFuture<String> result = threadPool.schedule(task, 5, TimeUnit.SECONDS); try { log("result=%s", result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } threadPool.shutdown(); log("end");
5秒後にタスクを実行するようにスケジューリングし、さらに3秒後にタスクが完了してFuture
の結果が返されます。
- 実行結果(
Executors.newScheduledThreadPool(int)
で生成したスレッドプールでタスクを実行し結果を取得)
--:51:43.608 [main] start --:51:48.882 [pool-1-thread-1] start --:51:51.894 [pool-1-thread-1] end --:51:51.894 [main] result=OK --:51:51.894 [main] end
このように、別スレッドで遅延実行したタスクの結果を取得する処理を比較的簡単に書くことができます。
もしScheduledExecutorService#shutdown()
(ScheduledExecutorService#shutdownNow()
, ExecutorService
も同様)を呼び出さないと、非デーモンスレッドが残ってしまいプロセスが終了しないので注意が必要です。
Executors.newCachedThreadPool()
の場合は例外で、このスレッドプールは、60秒間使用されないスレッドを終了させ、キャッシュから削除するようになっていますので、shutdown()
を呼び出さなくても60秒後にアプリケーションが終了します。
下記はExecutors.newCachedThreadPool()
の説明の引用です。
60秒間使用されなかったスレッドは、終了して、キャッシュから削除されます。
java.util.concurrent.Executors.newCachedThreadPool()
- Java SE 8 API ドキュメント
以下の表は、Executors
クラスのメソッドが実際にどんなクラスのインスタンスを返しているのかを調べたものです。
- 表:
Executors
の全メソッドの戻り値
メソッド | 戻り値の型 | 戻り値の実装クラス |
---|---|---|
callable(PrivilegedAction<?>) |
Callable<Object> |
Callable(匿名クラス) |
callable(PrivilegedExceptionAction<?>) |
Callable<Object> |
Callable(匿名クラス) |
callable(Runnable) |
Callable<Object> |
Executors.RunnableAdapter |
callable(Runnable, T) |
Callable<T> |
Executors.RunnableAdapter |
privilegedCallable(Callable<T>) |
Callable<T> |
Executors.PrivilegedCallable |
privilegedCallableUsing |
Callable<T> |
Executors.PrivilegedCallableUsingCurrentClassLoader |
defaultThreadFactory() |
ThreadFactory |
Executors.DefaultThreadFactory |
privilegedThreadFactory() |
ThreadFactory |
Executors.PrivilegedThreadFactory |
newCachedThreadPool() |
ExecutorService |
ThreadPoolExecutor |
newCachedThreadPool(ThreadFactory) |
ExecutorService |
ThreadPoolExecutor |
newFixedThreadPool(int) |
ExecutorService |
ThreadPoolExecutor |
newFixedThreadPool(int, ThreadFactory) |
ExecutorService |
ThreadPoolExecutor |
newSingleThreadExecutor() |
ExecutorService |
Executors.FinalizableDelegatedExecutorService |
newSingleThreadExecutor(ThreadFactory) |
ExecutorService |
Executors.FinalizableDelegatedExecutorService |
newWorkStealingPool() |
ExecutorService |
ForkJoinPool |
newWorkStealingPool(int) |
ExecutorService |
ForkJoinPool |
unconfigurableExecutorService(ExecutorService) |
ExecutorService |
Executors.DelegatedExecutorService |
newScheduledThreadPool(int) |
ScheduledExecutorService |
ScheduledThreadPoolExecutor |
newScheduledThreadPool(int, ThreadFactory) |
ScheduledExecutorService |
ScheduledThreadPoolExecutor |
newSingleThreadScheduledExecutor() |
ScheduledExecutorService |
Executors.DelegatedScheduledExecutorService |
newSingleThreadScheduledExecutor(ThreadFactory) |
ScheduledExecutorService |
Executors.DelegatedScheduledExecutorService |
unconfigurableScheduledExecutorService |
ScheduledExecutorService |
Executors.DelegatedScheduledExecutorService |
特にこだわりが無ければ、これらのメソッドで生成したスレッドプールを使用します。
スレッドプールの初期設定をもっと細かく設定したいのであれば、ThreadPoolExecutor
クラスやScheduledThreadPoolExecutor
クラス、そして次に紹介するForkJoinPool
クラスのコンストラクターを使って構築することもできます。
ThreadPoolExecutor
のコンストラクターを直接使うことで、コアプールサイズ、keep-alive時間、ワークキュー、スレッドファクトリー、RejectedExecutionHandler
(後述)が設定できます。外部キュー以外は、セッターで設定することもできます。サブクラスであるScheduledThreadPoolExecutor
では、最大プールサイズ、keep-alive時間をコンストラクターで設定することができないので、セッターで設定します。ScheduledThreadPoolExecutor
のワークキューは、常に内部クラスのDelayedWorkQueue
が使用されます。
RejectedExecutionHandler
インターフェイスは、名前の通り、タスク実行がリジェクトされた時のハンドラーです。タスク実行(execute
, submit
など)は、キューの容量オーバーなどでリジェクトされることがあります。
どのような場面で使用されるかは、下記ページを見ればある程度分かると思います。
ThreadPoolExecutor
のサブクラスにいくつかの実装クラスがあります。Executors
メソッドで生成したスレッドプールのデフォルトのRejectedExecutionHandler
には、ThreadPoolExecutor.AbortPolicy
が設定されています。これにより、リジェクトの際にはRejectedExecutionException
がスローされます。
次は、Fork/Joinフレームワークについてです。
Java7では、Fork/Joinフレームワーク(以下Fork/Joinと表記)が追加されました。
従来のタスク・フレームワークでは、開始したタスクは1つのスレッドがタスクが終了するまで1つのスレッドで処理するようになっていました。
Fork/Joinでは、投入されたタスクだけでなく、処理中のタスク内で分割(fork)された新しいタスクを処理しようとします。この仕組みはWork stealing(Work stealing - en:wikipedia)と呼ばれます。*1
Fork/Joinのためのスレッドプールは、ForkJoinPool
クラスを使用します。タスクは、ForkJoinTask<V>
もしくはそのサブクラスを使用します。
具体的な使用例は、下記の記事などを参照してください。
ForkJoinPool
はスレッドプールで使用するプロセッサー数を制御することができます。使用できるプロセッサーの数は、Runtime.availableProcessors()
で取得することができます。Executors.newWorkStealingPool()
は、new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)
と同じです。
ForkJoinPool
は共通プールのインスタンス(以下、Fork/Join共通プールと表記)があり、ForkJoinPool.commonPool()
で取得できます。
このプールは静的に構築されます。shutdown()またはshutdownNow()を試行しても、その実行状態には影響しません。ただし、このプールおよび実行中の処理は、System.exit(int)プログラムで自動的に終了します。
ForkJoinPool.commonPool()
- Java SE 8 API ドキュメント
Fork/Join共通プールは、Java8からシステムプロパティーで初期設定値を設定することが可能になりました。プロパティーは、ForkJoinPool
クラスの説明に書かれています。
また、Fork/Join自体ではありませんが、Java8では、ご存じStreamのパラレル(並列処理)と、Arrays
クラスのparallelSort
メソッドを始めとするパラレル処理が追加されています。これらのパラレル実行には、ForkJoinPool
のFork/Join共通プールが使用されています。
最後に、Java8で追加されたCompletableFuture
について。
CompletableFuture<T>
はFuture
ではありますがタスクではありません。それ自体が従来のExecutorファミリーには属さない独自のタスク・フレームワークと言えます。ただし、タスクを実行する際に使用されるスレッドプールは(ThreadPool)Executor
やForkJoinPool
*2です。
CompletableFuture
クラスは、CompletionStage
インターフェイスの実装クラスです。
CompletableFuture
の完了状態には、成功、失敗、キャンセルがあり、成功の場合は処理結果が取得でき、失敗の場合は例外がスローされます。
また、複数のCompletableFuture
を合成してひとつの新しいCompletableFuture
を生成することもできます。
CompletableFuture
の例:成功、失敗、キャンセル、合成
// import java.util.concurrent.*; // 成功 log("success-1"); CompletableFuture<String> successCF = CompletableFuture.supplyAsync(() -> { sleep(3_000L); return "success"; }); try { log("success-2"); log("success-3: result=%s", successCF.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } // 失敗 log("failure-1"); CompletableFuture<Void> failureCF = CompletableFuture.runAsync(() -> { sleep(3_000L); throw new RuntimeException("failure"); }); try { log("failure-2"); failureCF.get(); } catch (InterruptedException | ExecutionException e) { log("failure-3: exception=%s", e); } // キャンセル log("cancel-1"); CompletableFuture<Void> cancelCF = CompletableFuture.runAsync(() -> { sleep(3_000L); }); log("cancel-2"); sleep(1_500L); log("cancel-3: %s", cancelCF.cancel(true)); // 合成 (Either, async) CompletableFuture<Long> leftCF = CompletableFuture.supplyAsync(() -> { log("either-left: start"); sleep(2_000L); long t = System.currentTimeMillis(); log("either-left: time=%d", t); return t; }); CompletableFuture<Long> rightCF = CompletableFuture.supplyAsync(() -> { log("either-right: start"); sleep(3_000L); long v = 123L; log("either-right: value=%d", v); return v; }); CompletableFuture<Void> eitherCF = leftCF.acceptEitherAsync(rightCF, x -> { log("either-result: x=%d", x); }); try { eitherCF.get(); log("either-end"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
- 実行結果(
CompletableFuture
の例:成功、失敗、キャンセル、合成)
--:18:44.485 [main] success-1 --:18:44.719 [main] success-2 --:18:47.729 [main] success-3: result=success --:18:47.729 [main] failure-1 --:18:47.729 [main] failure-2 --:18:50.741 [main] failure-3: exception=java.util.concurrent.ExecutionException: java.lang.RuntimeException: failure --:18:50.741 [main] cancel-1 --:18:50.741 [main] cancel-2 --:18:52.255 [main] cancel-3: true --:18:52.255 [ForkJoinPool.commonPool-worker-2] either-left: start --:18:52.255 [ForkJoinPool.commonPool-worker-3] either-right: start --:18:54.269 [ForkJoinPool.commonPool-worker-2] either-left: time=1426256334269 --:18:54.269 [ForkJoinPool.commonPool-worker-2] either-result: x=1426256334269 --:18:54.269 [main] either-end
同期キュー
キュー(Queue)は、Concurrency Utilitiesと同時に*3導入されました。キューというデータ構造自体はもちろん、並列処理に限定されるデータ構造では無いのですが、Javaではそれ以前にキューという型がありませんでした。ちなみに、両端キューすなわちデック(Deque)は、さらに遅れてJava6になってから登場します。
並列処理で使用される同期キューは、主にBlockingQueue
インターフェイス (java.util.concurrent.BlockingQueue<E>
)の実装クラスになります。これは、データを格納する際にキューに空きが無かったり、データを取り出す際にデータが無かった場合に処理をブロックするタイプのキューです。もちろん、BlockingQueue
のほとんどの実装は「スレッドセーフ」なコレクションです。
BlockingQueue
の使い道といえば、Producer-Consumerパターンが有名です。これは、データのやりとりで必要な同期処理をすべて同期キューに任せることができ、生産者(Producer)と消費者(Consumer)は同期を意識しないで済むというパターンです。
下記のサンプルコードは、複数の生産者と複数の消費者でキューを共有する例です。生産者はランダムな数を生産し、消費者はその数(秒)だけスリープする、特に意味のない処理です。
// import java.util.Random; // import java.util.concurrent.*; // import java.util.stream.IntStream; log("start"); // 共有キュー BlockingQueue<Integer> q = new LinkedBlockingQueue<>(); // 生産者(producer) int nProducers = 2; ExecutorService producerThreads = Executors.newCachedThreadPool(); IntStream.range(0, nProducers).forEach(i -> producerThreads.execute(() -> { Random rnd = new Random(System.currentTimeMillis()); try { while (true) { int n = rnd.nextInt(6) + 2; log("produce: %d", n); q.add(n); sleep((rnd.nextInt(5) + 3) * 1_000L); } } catch (Exception e) { log("this thread was interrupted and exited task. (%s)", e); } })); // 消費者(consumer) int nConsumers = 3; ExecutorService consumerThreads = Executors.newCachedThreadPool(); IntStream.range(0, nConsumers).forEach(i -> consumerThreads.execute(() -> { try { while (true) { int n = q.take(); log("sleep %d", n); sleep(n * 1_000L); } } catch (Exception e) { log("this thread was interrupted and exited task. (%s)", e); } })); log("wait 30 seconds"); sleep(30_000L); log("shutdown"); producerThreads.shutdownNow(); // 強制終了 consumerThreads.shutdownNow(); // 強制終了
--:15:24.545 [main] start --:15:24.685 [pool-1-thread-1] produce: 4 --:15:24.685 [pool-1-thread-2] produce: 4 --:15:24.685 [main] wait 30 seconds --:15:24.685 [pool-2-thread-1] sleep 4 --:15:24.685 [pool-2-thread-2] sleep 4 --:15:31.690 [pool-1-thread-2] produce: 2 --:15:31.690 [pool-2-thread-3] sleep 2 --:15:31.690 [pool-1-thread-1] produce: 2 --:15:31.690 [pool-2-thread-2] sleep 2 --:15:34.701 [pool-1-thread-2] produce: 7 --:15:34.701 [pool-2-thread-1] sleep 7 --:15:34.701 [pool-1-thread-1] produce: 7 --:15:34.701 [pool-2-thread-3] sleep 7 --:15:39.710 [pool-1-thread-1] produce: 3 --:15:39.710 [pool-2-thread-2] sleep 3 --:15:39.710 [pool-1-thread-2] produce: 3 --:15:41.708 [pool-2-thread-1] sleep 3 --:15:42.723 [pool-1-thread-2] produce: 3 --:15:42.723 [pool-1-thread-1] produce: 3 --:15:42.723 [pool-2-thread-3] sleep 3 --:15:42.723 [pool-2-thread-2] sleep 3 --:15:45.736 [pool-1-thread-2] produce: 5 --:15:45.736 [pool-2-thread-2] sleep 5 --:15:45.736 [pool-1-thread-1] produce: 5 --:15:45.736 [pool-2-thread-1] sleep 5 --:15:51.743 [pool-1-thread-2] produce: 6 --:15:51.743 [pool-2-thread-3] sleep 6 --:15:51.743 [pool-1-thread-1] produce: 6 --:15:51.743 [pool-2-thread-2] sleep 6 --:15:54.692 [main] shutdown --:15:54.692 [pool-1-thread-1] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted) --:15:54.692 [pool-1-thread-2] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted) --:15:54.692 [pool-2-thread-3] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted) --:15:54.692 [pool-2-thread-1] this thread was interrupted and exited task. (java.lang.InterruptedException) --:15:54.692 [pool-2-thread-2] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted)
pool-1
が生産者のスレッドプール、pool-2
が消費者のスレッドプールです。
ログだけでは動きが掴みにくいと思いますが、生産者と消費者がお互いの処理のタイミングを意識することなく動作しているのが分かるでしょうか。
この項の内容とは関係ありませんが、ログの最後のところは、スレッドプールはmainスレッドが30秒後に強制的にシャットダウンさせているのでこうなっています。
シンクロナイザー
シンクロナイザーとは、複数のスレッド間での待ち合わせ(同期)を支援する機能のことです。Java8の時点では、5つのシンクロナイザーの実装があります。それぞれがある特定の同期方法を支援するための機能を持っています。
Semaphore
クラスCyclicBarrier
クラスCountDownLatch
クラスExchanger<V>
クラスPhaser
クラス ※Java7で追加
Semaphore
は、いわゆるカウンティングセマフォーです。
あらかじめ許可の数(カウント)を決めておき、スレッドが必要な数の許可を獲得できれば処理を行い、許可は獲得した分だけ利用不可の状態にします。処理が終わったら許可を返却すると、それらの許可は再度利用可能になります。必要な数の許可を獲得できなければ、待機させたり処理をスキップさせたりすることができます。
同時にタスクを実行できるスレッド数を制限する場合などに使用します。
下記のサンプルコードは、入場制限のシミュレーションです。
Semaphore
を使った入場制限のシミュレーション
/** * 来客グループ。 */ class Party { String name; int count; Party(String name, int count) { this.name = name; this.count = count; } /** * 時間をつぶす。 * ランダムミリ秒待機。 * @param x 倍率 */ void doSomething(int x) { sleep((long)((Math.random() * 10) + 3) * 100L * x); } } // 入場証 - 同時に入場できるのは20人まで Semaphore admissionPlates = new Semaphore(20); // 来場者グループの列 List<Party> parties = Stream.of("A", "B", "C", "D", "E").map(x -> { int count = (int)((Math.random() * 13) + 3); // 3~15人のグループ return new Party("グループ" + x, count); }).collect(Collectors.toList()); // 全グループの退場をチェックする CountDownLatch exitChecker = new CountDownLatch(parties.size()); // 非同期実行のためのスレッドプール ExecutorService threadPool = Executors.newCachedThreadPool(); log("開場"); parties.forEach(party -> { threadPool.execute(() -> { int count = party.count; String partyText = String.format("%s (%2d人) ", party.name, count); try { log("%sが来場しました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits()); // 入場証の取得、グループは必ず全員同時に入退場 // ただし3秒待っても入場証が足りない場合は入場を諦める if (admissionPlates.tryAcquire(count, 3L, TimeUnit.SECONDS)) { log("%sが入場しました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits()); // 場内での滞在時間 party.doSomething(5); // 入場証を返却 admissionPlates.release(count); log("%sが退場しました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits()); } else { log("%sが入場を諦めました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits()); } // 退場チェック(入場をあきらめたグループも含む) exitChecker.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); party.doSomething(2); // 各グループ間隔の調整 }); try { exitChecker.await(); log("全グループが退場しました"); } catch (InterruptedException e) { e.printStackTrace(); } log("閉場"); threadPool.shutdown();
来客グループの人数と来場タイミングはランダムです。同時に20人しか入れないので、入場証を持っている人だけを入場させるようにします。退場時に入場証は返却します。
残りの入場証が来客グループの人数に満たない場合は、3秒間だけ空くのを待ち、空かなければ入場を諦めます。
- 実行結果(
Semaphore
を使った入場制限のシミュレーション)
--:21:40.052 [main] 開場 --:21:40.093 [pool-1-thread-1] グループA ( 7人) が来場しました 入場証の残り数=20 --:21:40.093 [pool-1-thread-1] グループA ( 7人) が入場しました 入場証の残り数=13 --:21:41.092 [pool-1-thread-2] グループB ( 5人) が来場しました 入場証の残り数=13 --:21:41.093 [pool-1-thread-2] グループB ( 5人) が入場しました 入場証の残り数= 8 --:21:43.092 [pool-1-thread-3] グループC (13人) が来場しました 入場証の残り数= 8 --:21:44.093 [pool-1-thread-1] グループA ( 7人) が退場しました 入場証の残り数=15 --:21:44.093 [pool-1-thread-3] グループC (13人) が入場しました 入場証の残り数= 2 --:21:45.292 [pool-1-thread-1] グループD ( 5人) が来場しました 入場証の残り数= 2 --:21:45.593 [pool-1-thread-1] グループD ( 5人) が入場しました 入場証の残り数= 2 --:21:45.593 [pool-1-thread-2] グループB ( 5人) が退場しました 入場証の残り数= 7 --:21:46.292 [pool-1-thread-2] グループE (11人) が来場しました 入場証の残り数= 2 --:21:49.293 [pool-1-thread-2] グループE (11人) が入場を諦めました 入場証の残り数= 2 --:21:50.093 [pool-1-thread-1] グループD ( 5人) が退場しました 入場証の残り数= 7 --:21:50.094 [pool-1-thread-3] グループC (13人) が退場しました 入場証の残り数=20 --:21:50.094 [main] 全グループが退場しました --:21:50.095 [main] 閉場
この実行結果では、11人のグループEが来場したタイミングで残りの入場証が2個しかないため、待機していますが、3秒待っても入場証が返却されないので、諦めて帰ってしまいました。
なお、この例では退場者チェックを行う役目をCountDownLatch
が担っています。main
スレッドを待機させるために、入館したかどうかにかかわらず、帰った人をカウントして、カウントダウンします。
次は、Exchanger
を見てみましょう。これは、2つのスレッド間で値を交換するためのシンクロナイザーです。
Exchangerは、遺伝アルゴリズムやパイプライン設計などのアプリケーションで便利です。
Exchanger
- Java SE 8 API ドキュメント
次のサンプルコードの挙動を見れば、Exchanger
がどのような動作をするのかはご理解いただけると思います。
簡単に特徴を説明できる例が思い浮かばないので、また特に意味のない動作の例です。ごめんなさい。
Exchanger
の例:2つのスレッド間で値を交換する
Exchanger<String> exchanger = new Exchanger<>(); ExecutorService threadPool = Executors.newCachedThreadPool(); String[] sample1 = { "AB", "CDE", "FG" }; String[] sample2 = { "123", "456", "7890" }; for (int i = 0; i < sample1.length; i++) { String s1 = sample1[i]; Future<String> result1 = threadPool.submit(new Callable<String>() { String name = "task1"; @Override public String call() throws Exception { log("%s: value = [%s]", name, s1); sleep(100L); // 何か処理 String exchanged = exchanger.exchange(s1); // throws InterruptedException log("%s: exchanged = [%s]", name, exchanged); return exchanged; } }); String s2 = sample2[i]; Future<String> result2 = threadPool.submit(new Callable<String>() { String name = "task2"; @Override public String call() throws Exception { log("%s: value = [%s]", name, s2); sleep(3000L); // 何か処理 task1より時間が掛かる String exchanged = exchanger.exchange(s2); // throws InterruptedException log("%s: exchanged = [%s]", name, exchanged); return exchanged; } }); try { log("task1結果=%s", result1.get()); log("task2結果=%s", result2.get()); } catch (InterruptedException | ExecutionException e) { log("task実行中に例外が発生しました: 例外=%s", e); } } threadPool.shutdown();
task1とtask2でそれぞれの配列の値をexchange
メソッドによって交換し合います。交換の際、時間差がありますので、早い方のスレッドが遅い方のスレッドをexchange
メソッドで待機します。
- 実行結果(
Exchanger
の例:2つのスレッド間で値を交換する)
--:00:29.641 [pool-1-thread-1] task1: value = [AB] --:00:29.672 [pool-1-thread-2] task2: value = [123] --:00:32.808 [pool-1-thread-1] task1: exchanged = [123] --:00:32.808 [pool-1-thread-2] task2: exchanged = [AB] --:00:32.808 [main] task1結果=123 --:00:32.808 [main] task2結果=AB --:00:32.808 [pool-1-thread-1] task2: value = [456] --:00:32.808 [pool-1-thread-2] task1: value = [CDE] --:00:35.820 [pool-1-thread-1] task2: exchanged = [CDE] --:00:35.820 [pool-1-thread-2] task1: exchanged = [456] --:00:35.820 [main] task1結果=456 --:00:35.820 [main] task2結果=CDE --:00:35.820 [pool-1-thread-1] task2: value = [7890] --:00:35.820 [pool-1-thread-2] task1: value = [FG] --:00:38.832 [pool-1-thread-1] task2: exchanged = [FG] --:00:38.832 [pool-1-thread-2] task1: exchanged = [7890] --:00:38.832 [main] task1結果=7890 --:00:38.832 [main] task2結果=FG
次はPhaser
をみてみましょう。
Phaser
は、「バリアー」を実現するためのクラスです。バリアーとは、ウィキペディアによれば、
並列コンピューティングにおけるバリア(英: Barrier)とは、同期方法の一つであり、ソースコード中でスレッドやプロセスがある箇所で停止し、他の全てのスレッドプロセスがバリアに到達するまで進行しないようなものを示す。
バリア (計算機科学) - Wikipedia
というものを指します。残りのシンクロナイザー、CyclicBarrier
, CountDownLatch
もバリアーとされています。
ちなみに、Phaser
は、Java7で追加されました。
Phaser
の特徴は、名前の通りフェイズに関係しています。各フェイズの開始をバリアーで待機させ、同時に開始させることができる、再利用可能なバリアーです。
他の特徴としては、フェイズに参加するメンバー(スレッド)の数が開始後に変えられること、(ThreadGroup
のような)親子関係のツリーを構築できること、監視はメンバー以外でも可能であること、などがあります。
次のサンプルコードを見てください。
Phaser
の例:フェイズ開始の準備待機と途中参加
// import java.util.Arrays; // import java.util.Random; // import java.util.concurrent.*; Phaser phaser = new Phaser(); ExecutorService threadPool = Executors.newCachedThreadPool(); Random r = new Random(System.currentTimeMillis()); log("開始"); Arrays.asList("A", "B", "C").forEach(x -> { phaser.register(); threadPool.execute(() -> { try { Arrays.asList(1, 2).forEach(i -> { String prefix = String.format("%s: [フェイズ #%d]", x, i); long millis = (r.nextInt(10) * 500) + 1_000L; log(prefix + "の処理中 (sleep %d millis)", millis); sleep(millis); log(prefix + "の完了を待機"); int phaseNumber = phaser.arriveAndAwaitAdvance(); log("到着したフェイズ番号=" + phaseNumber); }); } finally { phaser.arriveAndDeregister(); } }); }); sleep(6_000L); log("フェイズに途中参加"); phaser.register(); sleep(15_000L); log("途中参加したフェイズの開始を待機"); phaser.arriveAndAwaitAdvance(); log("終了"); threadPool.shutdown();
フェイズ1の開始をA,B,Cのスレッドが待機します。ランダム時間でスリープさせているのは、それぞれのスレッドで準備に要する時間が異なる様子を表しています。全てのスレッドがarriveAndAwaitAdvance()
に到達すると、待機が解除されます。
フェイズ2では、mainスレッドが途中参加し、mainスレッドを含めた4スレッドがarriveAndAwaitAdvance()
に到達すると、待機が解除されます。
- 実行結果(フェイズ開始の準備待機と途中参加)
--:28:04.426 [main] 開始 --:28:04.504 [pool-1-thread-3] C: [フェイズ #1]の処理中 (sleep 5000 millis) --:28:04.504 [pool-1-thread-2] B: [フェイズ #1]の処理中 (sleep 3000 millis) --:28:04.504 [pool-1-thread-1] A: [フェイズ #1]の処理中 (sleep 2000 millis) --:28:06.516 [pool-1-thread-1] A: [フェイズ #1]の完了を待機 --:28:07.516 [pool-1-thread-2] B: [フェイズ #1]の完了を待機 --:28:09.514 [pool-1-thread-3] C: [フェイズ #1]の完了を待機 --:28:09.514 [pool-1-thread-2] 到着したフェイズ番号=1 --:28:09.514 [pool-1-thread-2] B: [フェイズ #2]の処理中 (sleep 3500 millis) --:28:09.514 [pool-1-thread-1] 到着したフェイズ番号=1 --:28:09.514 [pool-1-thread-3] 到着したフェイズ番号=1 --:28:09.514 [pool-1-thread-1] A: [フェイズ #2]の処理中 (sleep 2500 millis) --:28:09.514 [pool-1-thread-3] C: [フェイズ #2]の処理中 (sleep 4000 millis) --:28:10.514 [main] フェイズに途中参加 --:28:12.028 [pool-1-thread-1] A: [フェイズ #2]の完了を待機 --:28:13.028 [pool-1-thread-2] B: [フェイズ #2]の完了を待機 --:28:13.528 [pool-1-thread-3] C: [フェイズ #2]の完了を待機 --:28:25.528 [main] 途中参加したフェイズの開始を待機 --:28:25.528 [pool-1-thread-2] 到着したフェイズ番号=2 --:28:25.528 [main] 終了 --:28:25.528 [pool-1-thread-3] 到着したフェイズ番号=2 --:28:25.528 [pool-1-thread-1] 到着したフェイズ番号=2
CyclicBarrier
は、Phaser
をシンプルにしたような*4バリアー実装です。
CountDownLatch
は、Semaphore
で少し触れましたが、パーティーを特定しないタイプのバリアーです。あらかじめ設定したカウントをカウントダウンしていきカウントがゼロになった時に待機が解除される仕組みになっています。私はこのCountDownLatch
がけっこうお気に入りです。
CyclicBarrier
とCountDownLatch
の例については、前回の記事を御覧ください。
もし、どうしてもこれらの既存のシンクロナイザーでは要件を充たせないのであれば、java.util.concurrent.locks
パッケージのAPIを使用して、独自のシンクロナイザーを実装することもできます。
実際、5つの既存のシンクロナイザーのうち、Semaphore
とCountDownLatch
はjava.util.concurrent.locks.AbstractQueuedSynchronizer
クラスを利用して実装されていますし、残りもjava.util.concurrent.locks.LockSupport
クラスやjava.util.concurrent.locks.Lock
インターフェイスの実装クラスを使用して実装されています。
並行処理コレクション
BlockingQueue
以外にも、並列処理向きのコレクションが用意されています。
代表的なものが、java.util.concurrent.ConcurrentHashMap
クラスです。
特徴のひとつとして、アトミックな操作があります。例えば、ConcurrentHashMap#putIfAbsent(K, V)
は、意味的には、「ConcurrentHashMap#containsKey(K)
がtrue
ならConcurrentHashMap#put(K, V)
を実行する」のような操作(実際には、もっと複雑な実装になっています)ですが、これを他のスレッドに割り込まれること無く一貫して(=アトミックに)行うことができます。
また、Java8では、ラムダ式に対応したメソッドが追加されています。例えば、ConcurrentHashMap#computeIfPresent(K, BiFunction)
は、putIfAbsent
と似ていますが、こちらはキーの値を入力にしてラムダ式を評価した結果をputします。
ConcurrentHashMap
のアトミックな操作の詳細については、前回の記事も参照してください。
時間単位
Concurrency Utilitiesでは、時間の単位はTimeUnit
列挙型(java.util.concurrent.TimeUnit
)が使われます。TimeUnit
は主に、スケジューリングや、待ち合わせのタイムアウトの時間を設定する「タイミング・パラメーター」として使用されます。例えば、下記のメソッドで使用されます。
ScheduledThreadPoolExecutor#schedule(Callable<V>, long, TimeUnit)
Future#get(long, TimeUnit)
Semaphore#tryAcquire(int, long, TimeUnit)
使い方の例は、タスク・フレームワークの項を参照してください。
もちろん、TimeUnit
はConcurrency Utilities以外でも使用できます。標準APIで使用されている例としては、Process#waitFor(long, TimeUnit)
やjava.nio.file.attribute.FileTime.from(long, TimeUnit)
などがあります。
アトミック値型とアトミック操作
java.util.concurrent.atomic
パッケージは、並列処理でのアトミックな値の更新・取得を行うためのAPI群です。さらに細かく分けると、AtomicXXX
、FieldUpdater
、Adder/Accumulator
のグループ*5に分類できます。順番に見てみましょう。
AtomicXXX
(XXX
にはInteger
などが入る)はアトミック値型のクラスで、文字通り「アトミック」に更新ができる値です。「アトミック」とは、ひと言で言うと、複数のスレッドから値にアクセスした場合でも1スレッドの操作が一貫して行われることが保証される動作のことです。例えば、AtomicLong#getAndIncrement()
は、値の取得と加算が他のスレッドに割り込まれること無く行われます。前回の記事ではこの辺りをもう少し詳しく書いていますので、併せてご参照ください。
Java8では、アトミック値型にラムダ式に対応したメソッドがいくつか追加されています。例えば、AtomicLong#getAndUpdate(LongUnaryOperator)
があります。
アトミック値型と内包される型の関係は下表の通り。最後の2つは、参照型とプリミティブ型のペアーになっていて、マーキングができるようになっています。注:ペアーの表記は、Javaのものではなく、擬似コードです。
- 表:アトミック値型と内包される型の関係
クラス | 対応する型 |
---|---|
AtomicBoolean |
boolean |
AtomicInteger |
int |
AtomicLong |
long |
AtomicReference<V> |
V |
AtomicIntegerArray |
int[] |
AtomicLongArray |
long[] |
AtomicReferenceArray<E> |
E[] |
AtomicMarkableReference<V> |
(V, boolean) |
AtomicStampedReference<V> |
(V, int) |
FieldUpdater
には、AtomicIntegerFieldUpdater
, AtomicLongFieldUpdater
, AtomicReferenceFieldUpdater
があります。
FieldUpdater
を使うと、アトミックを使用していない任意のクラスのvolatile
フィールドに「外付け」アトミック機能を組み込むことができます。リフレクションを使っているのでprivateフィールドでもOKですが、volatile
は必須です。
もちろん、FieldUpdater
以外からのvolatile
フィールドへのアクセスをブロックすることはできませんので、FieldUpdater
からのみのアクセスになるような使い方に限定しないといけません。
Adder/Accumulator
は、Java8の"jsr166e: Concurrency Update"として導入された機能のひとつ(Scalable updatable variables)です。それぞれlong
とdouble
のための計4クラスあります。
Adder/Accumulator
もAtomicInteger
などと同じくNumber
のサブクラスなので、値型の一種と言えるかもしれません。ただし、下記の制約がありますのでご注意ください。
このクラスはNumberを拡張しますが、equals、hashCode、compareToなどのメソッドを定義しません。インスタンスでは変更が想定されているため、コレクション・キーとしては役立たないからです。
LongAdder
- Java SE 8 API ドキュメント
また、LongAdder
,LongAccumulator
とAtomicLong
には似たところがあります。ドキュメントにはこう書かれています。
きめ細かい同期制御のためではなく統計収集などの目的に使用される共通の値が、複数のスレッドによって更新される場合、通常はAtomicLongよりもこのクラスをお薦めします。更新の競合が少ないときは、2つのクラスの特徴は似ています。競合が多いときは、期待されるスループットはこのクラスの方がかなり高くなります。ただし、容量消費も多くなります。
LongAccumulator
- Java SE 8 API ドキュメント
並列ストリームを使ってlong
値をカウントアップする処理を、LongAdder
とAtomicLong
で比べてみます。「容量消費」は度外視します。
LongAdder
とAtomicLong
のパフォーマンス比較
// import java.util.concurrent.atomic.*; // import java.util.stream.IntStream; int nThread = 1_000; LongAdder longAdder = new LongAdder(); AtomicLong atomicLong = new AtomicLong(); long t; int sum; log("開始(LongAdder)"); t = System.currentTimeMillis(); sum = IntStream.range(0, nThread).parallel().map(i -> { IntStream.range(0, 1_000_000).forEach(j -> longAdder.increment()); return 1; }).sum(); log("終了(LongAdder): 所要時間=%dミリ秒,", System.currentTimeMillis() - t); log(" 終了スレッド数=%d, カウント=%s", sum, longAdder); log("開始(AtomicLong)"); t = System.currentTimeMillis(); sum = IntStream.range(0, nThread).parallel().map(i -> { IntStream.range(0, 1_000_000).forEach(j -> atomicLong.incrementAndGet()); return 1; }).sum(); log("終了(AtomicLong): 所要時間=%dミリ秒,", System.currentTimeMillis() - t); log(" 終了スレッド数=%d, カウント=%s", sum, atomicLong);
1000のスレッドがカウントアップを1,000,000回行い、合計が1,000,000,000になる処理です。並列処理はパラレルストリームを利用しています。
- 実行結果(
LongAdder
とAtomicLong
のパフォーマンス比較)
--:14:54.799 [main] 開始(LongAdder) --:15:09.012 [main] 終了(LongAdder): 所要時間=14175ミリ秒, --:15:09.013 [main] 終了スレッド数=1000, カウント=1000000000 --:15:09.013 [main] 開始(AtomicLong) --:15:46.161 [main] 終了(AtomicLong): 所要時間=37148ミリ秒, --:15:46.161 [main] 終了スレッド数=1000, カウント=1000000000
このように、膨大な件数の集計処理を行う場合の速度に差が付きます。
ロック・フレームワーク
java.util.concurrent.locks
パッケージには、ロックに関するAPIがまとめられています。
パッケージの説明の日本語訳にちょっと違和感があります。パッケージ全体を見たところでは、言語機能組み込みのsynchronized
およびObject#wait
, Object#notify
とは別のロッキング・フレームワーク、という意味だと思うのですが...
ざっと全体を見てみましょう。
ReentrantLock
クラスは、基本機能ではsynchronized
とほぼ同じことができ*6、かつ拡張可能なロック実装です。
LockSupport
クラスは、synchronized
やReentrantLock
とは異なるロック操作(park/unpark
)を提供します。
この2つの詳細は後述します。
ReentrantReadWriteLock
は、Read-Writeロックの(ReadWriteLock
インターフェイスの)デフォルト実装です。
詳しくは、下記の記事をご覧ください。
AbstractOwnableSynchronizer
, AbstractQueuedLongSynchronizer
, AbstractQueuedSynchronizer
クラスは、新たにシンクロナイザーを実装するような場合に用意されたクラスです。シンクロナイザーの項でも触れましたが、Semaphore
とCountDownLatch
は、AbstractQueuedSynchronizer
を使用して実装されています。
ブロック・ロック、および先入れ先出し(FIFO)待機キューに依存する関連シンクロナイザ(セマフォ、イベントなど)を実装するフレームワークを提供します。このクラスは、状態表現を単一の原子int値に依存する大半の種類のシンクロナイザの有用な基盤として設計されています。
AbstractQueuedSynchronizer
- Java SE 8 API ドキュメント
StampedLock
クラスは、Java8で追加された新しい独立したロック実装です。
long
値で表現される「スタンプ」を使って、ロックなしでの読み取り(楽観的読み取り:optimistic-read)が可能な状態を管理できます。また、このクラスはLock
インターフェイスを実装していません。
下記の記事が参考になります。
さて、冒頭で少し触れたReentrantLock
とLockSupport
について、もう少し詳しく見ていきます。
ReentrantLock
クラスの説明には、次のことが書かれています。
synchronizedメソッドおよび文を使用してアクセスする暗黙の監視ロックと同じ基本動作およびセマンティックスを使用し、かつ拡張機能を持つ、再入可能な相互排他Lockです。
ReentrantLock
- Java SE 8 API ドキュメント
これはつまり、言語機能であるsynchronized
とObject#wait
, Object#notify
によるロックの基本動作とほぼ同じことが、APIであるReentrantLock
を使って実現できる*7ということです。その上、synchronized
では実現できない機能もいくつか持っています。
ReentrantLock
は、例えばCondition
インターフェイスと組み合わせて、このように使うことができます。
ReentrantLock
とCondition#await
,Condition#signal
// import java.util.concurrent.locks.*; Lock lock = new ReentrantLock(); Condition c = lock.newCondition(); new Thread(() -> { log("task 1"); lock.lock(); try { log("await"); try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } log("end"); }).start(); sleep(500L); new Thread(() -> { log("task 2"); lock.lock(); try { log("sleep"); sleep(3_000L); c.signal(); } finally { lock.unlock(); } log("end"); }).start();
これは、下記のコードとほぼ同じ意味合いになります。
synchronized
とObject#wait
,Object#notify
の例
Object lock = new Object(); new Thread(() -> { log("task 1"); synchronized (lock) { log("await"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log("end"); }).start(); sleep(500L); new Thread(() -> { log("task 2"); synchronized (lock) { log("sleep"); sleep(3_000L); lock.notify(); } log("end"); }).start();
最後に、LockSupport
について。
LockSupport
クラスは、すべてのメソッドがstatic
です。主に、park
,unpark
メソッドと、その亜種で構成されています。
park/unpark
というのは耳慣れない言葉ですね。クラスの説明にはこう書かれています。
parkメソッドとunparkメソッドは、非推奨メソッドThread.suspendおよびThread.resumeをこのような目的に使用できなくなるような問題が発生しないスレッドをブロックおよびブロック解除するための効率的な手段を提供します。parkを呼び出しているあるスレッドと、そのスレッドのunparkを試みている別のスレッドの間の競合は、パーミットのために活発な状態を保持します。さらに、呼出し側のスレッドで割込みが発生し、かつタイムアウト・バージョンがサポートされている場合は、parkが復帰します。parkメソッドはまた「理由なしで」いつでも復帰する可能性があるため、一般には、復帰時に状態を再チェックするループ内で呼び出す必要があります。この意味で、parkはスピンにそれほどの時間を浪費しない「ビジー待機」の最適化として機能しますが、効果を発揮させるにはunparkとペアにする必要があります。
LockSupport
- Java SE 8 API ドキュメント
基本的な使い方は、park
で待機しているスレッドをunpark
で復帰させる、というものですが、説明にもあるとおり、park
メソッドは「理由なしで」いつでも復帰する可能性があるため、ループの中に閉じ込めておく必要があります。
また、park
状態で待機中のスレッドに割り込みをかけても復帰しますが、その際にInterruptedException
がスローされません。割り込まれたかどうかはThread#isInterrupted()
を使って判断できます。
park
/unpark
の例
// import java.util.concurrent.atomic.AtomicBoolean; // import java.util.concurrent.locks.LockSupport; Thread mainThread = Thread.currentThread(); AtomicBoolean canProceed = new AtomicBoolean(); new Thread(() -> { sleep(5_000L); log("unpark(1回目)前"); LockSupport.unpark(mainThread); log("unpark(1回目)後"); sleep(5_000L); canProceed.set(true); log("unpark(2回目)前"); LockSupport.unpark(mainThread); log("unpark(2回目)後"); }).start(); // parkの呼び出し前にunparkでパーミットをとっておくことができる LockSupport.unpark(mainThread); // パーミットは同時に2つ以上取れないのでこれは無視される LockSupport.unpark(mainThread); while (!canProceed.get()) { log("park前"); LockSupport.park(); // 1回目は取得済みパーミットを消費して通過 log("park後"); } log("ループを抜けて終了");
- 実行結果(
park
/unpark
の例)
--:20:59.586 [main] park前 --:20:59.980 [main] park後 --:20:59.981 [main] park前 --:21:04.586 [Thread-0] unpark(1回目)前 --:21:04.586 [Thread-0] unpark(1回目)後 --:21:04.586 [main] park後 --:21:04.587 [main] park前 --:21:09.587 [Thread-0] unpark(2回目)前 --:21:09.587 [Thread-0] unpark(2回目)後 --:21:09.587 [main] park後 --:21:09.588 [main] ループを抜けて終了
park
側のスレッドは、AtomicBoolean
を使ってwhile
ループに閉じ込めています。
コードコメントにあるとおり、1回目のpark
は取得済みパーミットを消費してそのまま通過できていますが、2回目は無視されています。
いかがでしたか。
私もまだ理解できていないところが多いですが、広く浅く網羅することはできたと思います。
しかしさすがに時間がかかってしまいました。少なくとも1/30の記事の時点には半分ほど書けていて、残り半分がしんどかった。
ともあれ、参考になれば幸いです。
(おわり)