Concurrency Utilitiesを使った並列処理・マルチスレッドのおさらい
この記事は、個人的なおさらいのための、Java Concurrency Utilitiesの一部を使ったサンプルとメモです。
目新しいものは特にありません。
記事内のサンプルとAPIドキュメント参照はJava7(Java SE 7)を基準にしていますが、Java Concurrency Utilities自体は一部を除いてJava5(Java SE 5.0)から使えるようになっています。
あと、いつものことですが、画像がありません。
追記(2014-01-05): この機能の呼称は (Java) Concurrency Utilities が公式で、この記事内の"Utility"というのは正確ではない+混在していますのでご注意ください。ごめんなさい。
追記(2014-08-15): この記事は、キーワード"Java Concurrency Utilities"でGoogle検索した時に2番目に来ていたり、ブログ内でのアクセスが多かったりと、読んでいただく機会が多いみたいなので、読みやすくなるように改訂しました。"Utility"も"Utilities"に直しました。
ありがとうございます。
追記(2015-03-21):Concurrency Utilitiesを中心にしたまとめを書いてみました。→Concurrency Utilitiesの「再」まとめ - Java8対応版
並列処理を比較的手軽に実現できるConcurrency Utilities
マルチスレッドによる並列処理システムを構築するには、Javaの標準機能だけでもできないことはありませんが、安定した実現には多くの煩雑さと難解さが伴います。特に、シーケンシャルな処理では発生しないマルチスレッド特有の問題を考慮する必要があります。有名なところでは、スレッドアンセーフ(非スレッドセーフ)なオブジェクトの共有、デッドロックなどがあります。
そういった問題を回避しつつ、比較的容易にマルチスレッド関連機能を扱えるようにしてくれるのが、Concurrency Utilitiesです。
Concurrency Utilitiesは、JSR166で提案され、Java5から標準機能として使えるようになりました。
Concurrency UtilitiesのAPIは、java.util.concurrent
パッケージとそのサブパッケージに格納されています。
今回は、Concurrency Utilitiesのうち、いくつかをピックアップしておさらいします。
サンプルのための前提と準備
サンプルの実行結果は、JDK、JRE共に本家のJava7で、Windows7(32bit版)でコンパイル・実行したものです。他の環境でも動作するとは思いますが、細かな挙動が異なる可能性があります。
はじめに、それぞれのサンプルが小さくなるように、ユーティリティーなどを準備しておきます。また、同様の理由で、サンプルの一部ではmain
メソッドとクラスは省略している箇所があります。
- ユーティリティーメソッドクラス
concurrent.Util
package concurrent; final class Util { private Util() { } static void log(Object o) { log("%s", o); } static void log(String fmt, Object... args) { Thread currentThread = Thread.currentThread(); String datetime = String.format("%1$tT.%1$tL", System.currentTimeMillis()); datetime = "--:--:" + datetime.substring(6); String threadName = String.format("%s", currentThread.getName()); System.out.printf("%s [%s] %s%n", datetime, threadName, String.format(fmt, args)); } static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } static void waitForSeveralSeconds() { final long millis = (long)(1000L * (Math.random() * 9 + 1)); log("waiting ... (it takes %d milliseconds)", millis); sleep(millis); } }
SleepTask
(Runnable
の実装クラス)- 指定した時間スリープする
package concurrent; import static concurrent.Util.*; final class SleepTask implements Runnable { private final long sleepMillis; SleepTask(long time) { this.sleepMillis = time; } @Override public void run() { log("begin"); sleep(sleepMillis); log("end"); } }
SlowDownCallableTask
(Callable<Integer>
の実装クラス)waitForSeveralSeconds
してから、引数の3倍の値を結果として返す
package concurrent; import static concurrent.Util.*; import java.util.concurrent.*; final class SlowDownCallableTask implements Callable<Integer> { private final int input; SlowDownCallableTask(int input) { this.input = input; } @Override public Integer call() throws Exception { log("input=%d", input); waitForSeveralSeconds(); return input * 3; } }
スレッドプールを作ってタスクを処理させる - Executors
, ExecutorService
Executors
,ExecutorService
は、スレッドプールに関する機能を集約したものです。
スレッドを生成するのはそれなりに処理コストがかかるので、スレッドをあらかじめ作っておいたり、キャッシュして使いまわしたりします。それがスレッドプールです。
スレッドプールを使う場面では、キャッシュの管理だけではなく、タスクを空きスレッドに割り振って実行させたり、余ったスレッドを破棄したり、足りないスレッドを補充したり、同時実行数を制御したり、処理結果の受け渡しのための待ち合わせをしたりと、さまざまな機能が要求されます。これらを簡単に実現してくれるのが、スレッドプールのさまざまな操作を提供するExecutorService
インターフェイスと、ExecutorService
を生成するExecutors
ユーティリティークラスです。
まずは、ごく簡単な使い方を見てください。
ExecutorsSample1
//import static concurrent.Util.*; //import java.util.concurrent.*; ExecutorService pool; // pool = Executors.newCachedThreadPool(); pool = Executors.newFixedThreadPool(2); // pool = Executors.newSingleThreadExecutor(); pool.execute(new SleepTask(3000L)); pool.execute(new SleepTask(2000L)); pool.execute(new SleepTask(1000L)); pool.shutdown(); // pool.shutdownNow(); log("end");
ExecutorService#execute
(Executor#execute
から継承されたメソッド)は、Runnable
のタスクとして実行するメソッドです。Thread
を使った時のnew Thread(task).start()
とするのと近いです。ExecutorService#submit
というメソッドもありますが、これについては後述します。
Executors.newXXXThreadXXX
という名前のメソッドは、スレッドプール(型はExecutorService
)を生成するファクトリーメソッドです。いくつかありますが、ここでは分かりやすいところで3種類を挙げています。
Executors.newCachedThreadPool
は、足りない場合は自動的にスレッドを補充するスレッドプールを作ります。
Executors.newFixedThreadPool
は、スレッド数が指定した数になるスレッドプールを生成します。タスクが投入された際にスレッドが足りない場合は、空きスレッドが出るまでタスクの実行は保留されます。
Executors.newSingleThreadExecutor
は、スレッド数が1のスレッドプールを生成します。これはExecutors.newFixedThreadPool(1)
とほぼ同じです。
それでは、実行した結果を見てみましょう。
- 実行結果(
ExecutorsSample1
) -newFixedThreadPool(2)
の場合
--:--:16.841 [pool-1-thread-2] begin --:--:16.841 [pool-1-thread-1] begin --:--:16.841 [main] end --:--:18.875 [pool-1-thread-2] end --:--:18.876 [pool-1-thread-2] begin --:--:19.875 [pool-1-thread-1] end --:--:19.876 [pool-1-thread-2] end
2つのスレッド、pool-1-thread-1
とpool-1-thread-2
は同時に開始していますが、pool-1-thread-2
は2番目に投入した2秒スリープするタスクを実行して先に終了するため、1番目のタスクで3秒スリープするpool-1-thread-1
より早く空きスレッドになります。その直後、pool-1-thread-2
が保留となっていた3番目のタスクを処理しています。
main
スレッドの処理は早々に終了しています。ExecutorService#shutdown
は、既に投入済みのタスクは処理が終わるまで待機してからスレッドを終了させます。
また、shutdown
を実行しないと、main
スレッドが終了してもJVMは終了しません。
shutdown
の代わりにExecutorService#shutdownNow
(サンプルではコメントアウトになっている)を使うと、スレッドが実行中でも割り込み(InterruptException
)を発生させてタスクを強制終了します。但し、確実にタスクを終了する保証はされない点に注意が必要です。
もうひとつ、デーモンスレッドだけになると、JVMは終了します。main
スレッドは「非デーモンスレッド」です。ということは、スレッドプールのスレッドがすべてデーモンスレッドになっていれば、main
スレッドが終了した時点で非デーモンスレッドがゼロになり、JVMは終了するということですね。(Thread#setDaemon
参照。)
スレッドプールのスレッドをデーモンスレッドにするには、デーモンスレッドを生成するThreadFactory
を実装し、スレッドプール生成ファクトリーメソッドを実行する際に渡します。
ExecutorsSample2
//import static concurrent.Util.*; //import java.util.concurrent.*; ThreadFactory daemonThreadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = Executors.defaultThreadFactory().newThread(r); thread.setDaemon(true); return thread; } }; ExecutorService pool = Executors.newFixedThreadPool(2, daemonThreadFactory); pool.execute(new SleepTask(3000L)); pool.execute(new SleepTask(2000L)); sleep(2500L); log("end");
- 実行結果(
ExecutorsSample2
)
--:--:38.864 [pool-2-thread-1] begin --:--:38.864 [pool-1-thread-1] begin --:--:40.896 [pool-2-thread-1] end --:--:41.364 [main] end
main
スレッド以外はデーモンスレッドなので、main
スレッドが終了した直後にJVMが終了し、pool-1-thread-1
の処理は完了しませんでした。
処理結果を待ち合わせる - Future
並列計算をするには、前項のExecutorService
を使えばできそうです。
ところが、Runnable
のタスクのRunnable#run
メソッドは値を返すようになっていません。それに、計算が終わっていないのに結果を返すこともできません。計算が終わるまで待っていたら並列計算の意味がありません。どうやって値を返すようにすれば良いのでしょうか。
これを解決するには、Future
インターフェイスに関連した機能を使います。デザインパターンでいうところのFutureパターンです。
FutureTaskSample
//import static concurrent.Util.log; //import java.util.concurrent.*; ExecutorService pool = Executors.newCachedThreadPool(); log("start"); // Callable + Future Future<Integer> future = pool.submit(new SlowDownCallableTask(7)); Integer futureResult = future.get(); // throws InterruptedException, ExecutionException log("result(Future): %d", futureResult); // FutureTask FutureTask<Integer> task = new FutureTask<>(new SlowDownCallableTask(5)); pool.submit(task); Integer futureTaskResult = task.get(); // throws InterruptedException, ExecutionException log("result(FutureTask): %d", futureTaskResult); pool.shutdown(); log("end");
最初の例は、Callable
(java.util.concurrent.Callable
インタフェース)の結果をFuture
(java.util.concurrent.Future
インターフェイス)として受け取って、Future#get
で結果が返されるまで待機します。
2番目の例は、FutureTask
(java.util.concurrent.FutureTask
クラス)を使ったものです。FutureTask
は、Runnable
とFuture
の役割を1つのクラスで賄うことができます。
Callable
のタスクを実行するには、ExecutorService#submit
を使用します。
- 実行結果(
FutureTaskSample
)
--:--:04.220 [main] start --:--:04.270 [pool-1-thread-1] input=7 --:--:04.307 [pool-1-thread-1] waiting ... (it takes 3521 milliseconds) --:--:07.829 [main] result(Future): 21 --:--:07.830 [pool-1-thread-2] input=5 --:--:07.831 [pool-1-thread-2] waiting ... (it takes 7715 milliseconds) --:--:15.546 [main] result(FutureTask): 15 --:--:15.547 [main] end
同じグループのすべてスレッドを待ち合わせる(バリアー) - CyclicBarrier
java.util.concurrent.CyclicBarrier
クラスについて。
並列処理における「バリアー」という言葉は、「ロック」に比べると格段に馴染みの薄い用語ではないでしょうか。
Wikipediaでは「バリア (計算機科学)」*1という項目があります。
再利用できるので、CyclicBarrier
という名前になっているようです。
※Java7以降であれば、Phaser
(java.util.concurrent.Phaser
クラス)も検討してみてください。
APIドキュメントに使用例がありますが、そのままでは動かせないコードなので、以下のサンプルはそのまま(と言ってもmain
メソッドを用意+インポート編成が必要...)で動かせるようにしています。細かい処理は削っています。
CyclicBarrierSample
int threadCount = 5; ExecutorService pool = Executors.newFixedThreadPool(threadCount); final int[] results = new int[threadCount]; Runnable mergeTask = new Runnable() { @Override public void run() { log("merge begin"); log("array: %s", Arrays.toString(results)); int total = 0; for (int result : results) total += result; log("total: %d", total); log("merge end"); } }; final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount + 1, mergeTask); for (int i = 0; i < threadCount; i++) { final int index = i; Runnable calcTask = new Runnable() { @Override public void run() { log("index=%d", index); results[index] = (index + 1) * 3; waitForSeveralSeconds(); log("await"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException ex) { log(ex); } } }; pool.execute(calcTask); } log("await"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException ex) { log(ex); } log("end"); pool.shutdown();
並列計算を簡略化した処理になっています。各ワーカースレッド*2がcalcTask
を実行し、結果をmergeTask
で集計します。
CyclicBarrier#await
を実行すると、そのスレッドは「バリアーポイント」で待機状態に入ります。待機スレッドが待ち合わせるスレッドの数に達すると、バリアーポイントを通過(「トリップ」する)して、後続の処理を行います。
CyclicBarrier
のコンストラクタの第1引数は、待ち合わせるスレッドの数を指定します。このサンプルでは、ワーカースレッドに加えてmain
スレッドも待ち合わせに参加するので、ワーカースレッド数+1をセットしています。第2引数には、トリップした時に実行されるアクション(バリアーアクション)を指定します。(無くても良い。)このアクションは、最後にバリアーに入ったスレッドにより実行されます。(CyclicBarrier
コンストラクタの説明参照。)
CyclicBarrier#await
には、タイムアウトを設定するバージョン(CyclicBarrier#await(long, java.util.concurrent.TimeUnit)
)もあります。
それにしても、こういう時のtry-catch
は邪魔ですね...
- 実行結果(
CyclicBarrierSample
)
--:--:27.424 [main] await --:--:27.424 [pool-1-thread-1] index=0 --:--:27.424 [pool-1-thread-2] index=1 --:--:27.424 [pool-1-thread-3] index=2 --:--:27.424 [pool-1-thread-4] index=3 --:--:27.424 [pool-1-thread-5] index=4 --:--:27.483 [pool-1-thread-2] waiting ... (it takes 8605 milliseconds) --:--:27.483 [pool-1-thread-3] waiting ... (it takes 9917 milliseconds) --:--:27.483 [pool-1-thread-4] waiting ... (it takes 3176 milliseconds) --:--:27.484 [pool-1-thread-1] waiting ... (it takes 8469 milliseconds) --:--:27.484 [pool-1-thread-5] waiting ... (it takes 2514 milliseconds) --:--:29.998 [pool-1-thread-5] await --:--:30.661 [pool-1-thread-4] await --:--:35.954 [pool-1-thread-1] await --:--:36.088 [pool-1-thread-2] await --:--:37.401 [pool-1-thread-3] await --:--:37.401 [pool-1-thread-3] merge begin --:--:37.402 [pool-1-thread-3] array: [3, 6, 9, 12, 15] --:--:37.402 [pool-1-thread-3] total: 45 --:--:37.403 [main] end --:--:37.403 [pool-1-thread-3] merge end
待ち合わせるスレッドの数は6です。この例では、pool-1-thread-3
が最後にバリアーポイントに到達、「トリップ」し、mergeTask
はpool-1-thread-3
によって実行されていることが分かります。
一定回数ポイントを通過したら待機解除 - CountDownLatch
java.util.concurrent.CountDownLatch
クラスについて。
並列処理における「ラッチ」の意味を調べてみたんですが、はっきりとした定義が分かりませんでした。ラッチ回路が近いのでしょうか。
一般的な言葉でのラッチは閂、掛け金のことを指します。Wikipediaには「(鉄道駅などの)改札」とも。
ここでの定義は、見出しの通り、「一定回数ポイントを通過したら待機解除する仕掛け」(のようなもの)と仮置きしておきましょうか。
CyclicBarrier
と少し似たところはありますが、CyclicBarrier
と異なるのは、待機解除の条件が、スレッド同士の待ち合わせとは関係ないところです。
CountDownLatch
は、何度かカウントダウン(CountDownLatch#countDown
)を呼び出し、カウントがゼロになった時点で待機解除するという仕掛けです。待機はCountDownLatch#await
メソッドで行いますが、カウントダウンする役目は待機している以外のどのスレッドが担っても良いし、同じスレッドが何度カウントダウンしても良いのです。
※Java7以降であれば、Phaser
(java.util.concurrent.Phaser
クラス)も検討してみてください。
次のサンプルは、先着3スレッドがカウントダウンを行った時点でカウントがゼロになりmain
スレッドが待機解除する、という処理です。
CountDownLatchSample
//import static concurrent.Util.*; //import java.util.concurrent.*; int threadCount = 5; ExecutorService pool = Executors.newFixedThreadPool(threadCount); final CountDownLatch countDownLatch = new CountDownLatch(3); Runnable task = new Runnable() { @Override public void run() { waitForSeveralSeconds(); log("count down"); countDownLatch.countDown(); } }; for (int i = 0; i < threadCount; i++) pool.execute(task); log("await"); boolean zeroCount = countDownLatch.await(8L, TimeUnit.SECONDS); // throws InterruptedException // countDownLatch.await(); // throws InterruptedException if (zeroCount) log("*** zero"); else log("*** not zero, count = %d", countDownLatch.getCount()); pool.shutdown();
ここではタイムアウト付きで待機している(CountDownLatch#await(long, java.util.concurrent.TimeUnit)
)ので、8秒経過した時点でカウントダウンがゼロになるのを待たずに待機解除します。
- 実行結果(
CountDownLatchSample
)
--:--:50.788 [main] await --:--:50.788 [pool-1-thread-1] waiting ... (it takes 7597 milliseconds) --:--:50.788 [pool-1-thread-2] waiting ... (it takes 9696 milliseconds) --:--:50.788 [pool-1-thread-3] waiting ... (it takes 8587 milliseconds) --:--:50.788 [pool-1-thread-4] waiting ... (it takes 9837 milliseconds) --:--:50.788 [pool-1-thread-5] waiting ... (it takes 3313 milliseconds) --:--:54.137 [pool-1-thread-5] count down --:--:58.420 [pool-1-thread-1] count down --:--:58.824 [main] *** not zero, count = 1 --:--:59.409 [pool-1-thread-3] count down --:--:00.518 [pool-1-thread-2] count down --:--:00.661 [pool-1-thread-4] count down
このケースでは、5スレッド中3スレッドが終了に8秒以上かかっているので、カウントダウンがゼロになる前にmain
スレッドが待機解除しています。
共有オブジェクトの値をアトミックに更新 - java.util.concurrent.atomic.*
java.util.concurrent.atomic
パッケージについて。
並列処理におけるアトミックな操作とは、あるスレッドがスレッド間で共有するオブジェクトに対して値の取得・更新などを行う際に、それらの一連の処理が他のスレッドに割り込みされずに一気に行えることが保証されているもののことです。
ところで、Javaの並列処理では、一見、アトミックに操作できているように見えるのに、そうでない場合があります。
次のサンプルをご覧ください。
AtomicValueSample1
(not atomic)
import java.util.concurrent.*; final class NotAtomicValueSample1 { int value; public static void main(String[] args) throws InterruptedException { final NotAtomicValueSample1 o = new NotAtomicValueSample1(); int threadCount = 100; ExecutorService pool = Executors.newFixedThreadPool(threadCount); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); Runnable task = new Runnable() { @Override public void run() { for (int i = 0; i < 100; i++) ++o.value; countDownLatch.countDown(); } }; for (int i = 0; i < threadCount; i++) pool.execute(task); countDownLatch.await(); // throws InterruptedException log("value=%d", o.value); pool.shutdown(); } }
共有フィールドvalue
に対して、100スレッドが100回ずつ加算を行っています。よって、結果は常に10000になるように見えます。環境にもよりますが、実際、ほとんどが10000になるでしょう。
試したところ、7回目までは10000になりましたが、8回目で次の結果が出ました。*3
- 実行結果(
NotAtomicValueSample1
)
--:--:31.087 [main] value=9987
マイナス13。13回、加算に失敗したのでしょうか。それとも、処理が実行されなかったのでしょうか。
これはJavaの並列処理についてある程度知っていれば回避できる有名な問題で、Java言語仕様の「17 スレッドとロック - 17.4 メモリモデル」に関連する話になります。
1つ目は、命令の順序変更(リオーダー)に関するもので、大雑把に言ってしまうと、コンパイラの最適化により実際の実行順序が思っていたのと変わってしまう場合があるということです。この問題は、volatile
修飾子を付けることで、リオーダーを禁止することができます。
ただし、このケースの場合は、これでは解決しません。
本命は2つ目で、アトミックでない操作がされている場合です。
実はフィールドのインクリメント(++o.value
)は、バイトコードレベルでは、1回の操作ではありません。
// ++o.value getfield #26; //Field AtomicValueSample1.value:I iconst_1 iadd putfield #26; //Field AtomicValueSample1.value:I
別のスレッドが書き戻す(putfield
)前にフィールドの値を取得して、加算して、書き戻した場合には、前回のputfield
の値は上書きされることになります。
Javaのマルチスレッドのメモリモデルについてもっと知りたい方には、下記のj5ik2oさんのエントリーが詳しいです。(長めです。)
この問題を解消する方法のひとつに、java.util.concurrent.atomic
パッケージのクラス群を使うものがあります。
このサンプルでは、フィールドはint
型なので、同パッケージのAtomicInteger
を使います。
AtomicValueSample2
//import java.util.concurrent.*; //import java.util.concurrent.atomic.*; // AtomicInteger final class AtomicValueSample2 { AtomicInteger value = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { final AtomicValueSample2 o = new AtomicValueSample2(); int threadCount = 100; ExecutorService pool = Executors.newFixedThreadPool(threadCount); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); Runnable task = new Runnable() { @Override public void run() { for (int i = 0; i < 100; i++) o.value.incrementAndGet(); countDownLatch.countDown(); } }; for (int i = 0; i < threadCount; i++) pool.execute(task); countDownLatch.await(); // throws InterruptedException log("value=%d", o.value.get()); pool.shutdown(); } }
AtomicInteger#incrementAndGet
は、アトミックに加算と加算後の値の取得を行うことができる操作です。
他にも、getAndIncrement
, getAndDecrement
, addAndGet
, getAndAdd
メソッドなどがあります。
- 実行結果(
AtomicValueSample2
)
--:--:12.200 [main] value=10000 # 5000回くらい
問題は解消したようです。
ただし、当然ながら、プリミティブ型の値とは異なり、裏では同期処理やオブジェクトへのアクセスが発生することになりますので、シビアな性能が求められる処理での使用は、慎重に検討すべきです。
java.util.concurrent.atomic
パッケージ以外にも、アトミックな操作が提供されたクラスがあります。
例えば、ConcurrentMap
インターフェイスの実装クラスがあります。
次のサンプルは、HashMap
クラスと、ConcurrentMap
の実装クラスであるConcurrentHashMap
クラスとで同様の処理を書いてみたものです。
AtomicValueSample3
//import java.util.*; //import java.util.concurrent.*; Map<String, String> m1 = new HashMap<>(); ConcurrentMap<String, String> m2 = new ConcurrentHashMap<>(); String k = "keyString"; String v = "valueString"; // 存在しなければ登録 if (!m1.containsKey(k)) m1.put(k, v); // アトミックでない操作 m2.putIfAbsent(k, v); // アトミックな操作 // 存在したら置き換え if (m1.containsKey(k)) m1.put(k, v); // アトミックでない操作 m2.replace(k, v); // アトミックな操作 // 存在したら削除 if (m1.containsKey(k)) m1.remove(k); // アトミックでない操作 m2.remove(k); // アトミックな操作
(実行結果無し)
追記(2014-08-15): アトミックでない操作も含めて、並列処理やマルチスレッドによって惹き起こされるバグは、テストでは発見が困難になりますので、まずは安易に共有オブジェクトを持つことを避けるようにすべきです。どうしても必要な場合は、アトミックでない操作を行わないように注意しながら使いましょう。
その他と、まとめ
他にも同期キューや、セマフォ(java.util.concurrent.Semaphore
クラス)、ロック関連(java.util.concurrent.locks
パッケージ)などが用意されています。
Java7で追加されたものでは、Fork/Join関連(java.util.concurrent.ForkJoinPool
クラス他)や、バリアー実装のPhaser
(java.util.concurrent.Phaser
)というのもあります。
この辺は、次回があったら触れたいと思います。