読者です 読者をやめる 読者になる 読者になる

argius note

プログラミング関連

開発しています



Concurrency Utilitiesの「再」まとめ - Java8対応版

Java

以前、このような記事を書きました。

前回の内容は、Concurrency UtilitiesだけでなくJavaのマルチスレッドの話も一部含んでいましたが、今回は、Concurrency Utilitiesだけにフォーカスして、全体が分かるように整理してまとめ直しました。


目次

  • 概要
  • 準備
  • タスク・フレームワーク (Executor)
  • 同期キュー
  • シンクロナイザー
  • 並行処理コレクション
  • 時間単位
  • アトミック値型とアトミック操作
  • ロック・フレームワーク

概要

今回は、Concurrency UtilitiesのAPIをいくつかのグループに分類し、それぞれのグループの主要な機能を「広く浅く」紹介する、という形式でまとめています。


パッケージ単位で分け、それからjava.util.concurrentをさらに機能ごとに分類しています。java.util.concurrentパッケージの「説明」のところに記載されているものが分かりやすいので、それを参考に分けてみました。


内容のほとんどが、APIドキュメントを根気よく読めば分かる情報です。残りはソースコードを読んだり実際に使ってみて得た情報です。


なお、今回はJava8対応版なので、サンプルコードではラムダ式やストリームを使っています。




準備

実行環境は、Java8である以外は特に指定しません。今回の内容では、プラットフォームに依存するものは扱っていない認識です。


APIの動作を実際に動かして確認したいとき、APIドキュメントに記載されているサンプルコードの多くはクラス側の実装しか書いてないので、それを使って動かすコードを自前で書く必要があります。
この記事では、簡単に動作をお試しいただけるように、記事中のサンプルコードはそのままmainメソッドに貼りつけてimportを構成すれば実行できるようにしています。コードを簡潔にするために、いくつかユーティリティーメソッドを定義していますので、下記の雛形をお使いください。IDEをお使いでない方は、各コードの先頭にコメントアウトしたimport文を置いていますのでこれをご利用ください。

  • App.java (ユーティリティー&mainクラス)
package local;

public final class App {

    public static void main(String[] args) {
        // ここにサンプルコードをコピペしてください
    }

    static void log(String fmt, Object... args) {
        String now = String.format("%1$tT.%1$tL", System.currentTimeMillis());
        now = "--:" + now.substring(3);
        System.out.printf("%s [%s] %s%n", now, Thread.currentThread().getName(), String.format(fmt, args));
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

}









タスク・フレームワーク (Executor)

ここで言うタスク・フレームワークとは、タスク(Runnable/Callable/FutureTask)、スレッドプール(ThreadPool/ExecutorService)、非同期入出力(Future)、などで構成される、このAPIの中核の機能のことです。


基本的なタスク・フレームワークの使い方は、Executorsクラスのファクトリーメソッドを使ってスレッドプールを生成し、Runnable,Callable<V>,FutureTask<V>などの実装クラスのタスクを実行させる、というものです。この場合、スレッドプールはExecutorServiceインターフェイス,ScheduledExecutorServiceインターフェイスを介して操作します。
前回の記事ではもう少し詳しく説明していますので、こちらも合わせてご覧ください。


今回は、Executors.newScheduledThreadPool(int)を使った例を示します。

  • 例:Executors.newScheduledThreadPool(int)で生成したスレッドプールでタスクを実行し結果を取得
// import java.util.concurrent.*;

log("start");

ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
Callable<String> task = () -> {
    try {
        log("start");
        sleep(3_000L); // 時間のかかる処理の代わり
        return "OK";
    } finally {
        log("end");
    }
};
ScheduledFuture<String> result = threadPool.schedule(task, 5, TimeUnit.SECONDS);
try {
    log("result=%s", result.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
threadPool.shutdown();

log("end");


5秒後にタスクを実行するようにスケジューリングし、さらに3秒後にタスクが完了してFutureの結果が返されます。

  • 実行結果(Executors.newScheduledThreadPool(int)で生成したスレッドプールでタスクを実行し結果を取得)
--:51:43.608 [main] start
--:51:48.882 [pool-1-thread-1] start
--:51:51.894 [pool-1-thread-1] end
--:51:51.894 [main] result=OK
--:51:51.894 [main] end

このように、別スレッドで遅延実行したタスクの結果を取得する処理を比較的簡単に書くことができます。

もしScheduledExecutorService#shutdown()ScheduledExecutorService#shutdownNow(), ExecutorServiceも同様)を呼び出さないと、非デーモンスレッドが残ってしまいプロセスが終了しないので注意が必要です。
Executors.newCachedThreadPool()の場合は例外で、このスレッドプールは、60秒間使用されないスレッドを終了させ、キャッシュから削除するようになっていますので、shutdown()を呼び出さなくても60秒後にアプリケーションが終了します。
下記はExecutors.newCachedThreadPool()の説明の引用です。

60秒間使用されなかったスレッドは、終了して、キャッシュから削除されます。

java.util.concurrent.Executors.newCachedThreadPool() - Java SE 8 API ドキュメント

以下の表は、Executorsクラスメソッドが実際にどんなクラスのインスタンスを返しているのかを調べたものです。

メソッド 戻り値の型 戻り値の実装クラス
callable(PrivilegedAction<?>) Callable<Object> Callable(匿名クラス)
callable(PrivilegedExceptionAction<?>) Callable<Object> Callable(匿名クラス)
callable(Runnable) Callable<Object> Executors.RunnableAdapter
callable(Runnable, T) Callable<T> Executors.RunnableAdapter
privilegedCallable(Callable<T>) Callable<T> Executors.PrivilegedCallable
privilegedCallableUsing
CurrentClassLoader(Callable<T>)
Callable<T> Executors.PrivilegedCallableUsingCurrentClassLoader
defaultThreadFactory() ThreadFactory Executors.DefaultThreadFactory
privilegedThreadFactory() ThreadFactory Executors.PrivilegedThreadFactory
newCachedThreadPool() ExecutorService ThreadPoolExecutor
newCachedThreadPool(ThreadFactory) ExecutorService ThreadPoolExecutor
newFixedThreadPool(int) ExecutorService ThreadPoolExecutor
newFixedThreadPool(int, ThreadFactory) ExecutorService ThreadPoolExecutor
newSingleThreadExecutor() ExecutorService Executors.FinalizableDelegatedExecutorService
newSingleThreadExecutor(ThreadFactory) ExecutorService Executors.FinalizableDelegatedExecutorService
newWorkStealingPool() ExecutorService ForkJoinPool
newWorkStealingPool(int) ExecutorService ForkJoinPool
unconfigurableExecutorService(ExecutorService) ExecutorService Executors.DelegatedExecutorService
newScheduledThreadPool(int) ScheduledExecutorService ScheduledThreadPoolExecutor
newScheduledThreadPool(int, ThreadFactory) ScheduledExecutorService ScheduledThreadPoolExecutor
newSingleThreadScheduledExecutor() ScheduledExecutorService Executors.DelegatedScheduledExecutorService
newSingleThreadScheduledExecutor(ThreadFactory) ScheduledExecutorService Executors.DelegatedScheduledExecutorService
unconfigurableScheduledExecutorService
(ScheduledExecutorService)
ScheduledExecutorService Executors.DelegatedScheduledExecutorService

特にこだわりが無ければ、これらのメソッドで生成したスレッドプールを使用します。
スレッドプールの初期設定をもっと細かく設定したいのであれば、ThreadPoolExecutorクラスScheduledThreadPoolExecutorクラス、そして次に紹介するForkJoinPoolクラスのコンストラクターを使って構築することもできます。
ThreadPoolExecutorコンストラクターを直接使うことで、コアプールサイズ、keep-alive時間、ワークキュー、スレッドファクトリー、RejectedExecutionHandler(後述)が設定できます。外部キュー以外は、セッターで設定することもできます。サブクラスであるScheduledThreadPoolExecutorでは、最大プールサイズ、keep-alive時間をコンストラクターで設定することができないので、セッターで設定します。ScheduledThreadPoolExecutorのワークキューは、常に内部クラスのDelayedWorkQueueが使用されます。


RejectedExecutionHandlerインターフェイスは、名前の通り、タスク実行がリジェクトされた時のハンドラーです。タスク実行(execute, submitなど)は、キューの容量オーバーなどでリジェクトされることがあります。
どのような場面で使用されるかは、下記ページを見ればある程度分かると思います。

ThreadPoolExecutorのサブクラスにいくつかの実装クラスがあります。Executorsメソッドで生成したスレッドプールのデフォルトのRejectedExecutionHandlerには、ThreadPoolExecutor.AbortPolicyが設定されています。これにより、リジェクトの際にはRejectedExecutionExceptionがスローされます。





次は、Fork/Joinフレームワークについてです。

Java7では、Fork/Joinフレームワーク(以下Fork/Joinと表記)が追加されました。
従来のタスク・フレームワークでは、開始したタスクは1つのスレッドがタスクが終了するまで1つのスレッドで処理するようになっていました。
Fork/Joinでは、投入されたタスクだけでなく、処理中のタスク内で分割(fork)された新しいタスクを処理しようとします。この仕組みはWork stealing(Work stealing - en:wikipedia)と呼ばれます。*1


Fork/Joinのためのスレッドプールは、ForkJoinPoolクラスを使用します。タスクは、ForkJoinTask<V>もしくはそのサブクラスを使用します。

具体的な使用例は、下記の記事などを参照してください。


ForkJoinPoolはスレッドプールで使用するプロセッサー数を制御することができます。使用できるプロセッサーの数は、Runtime.availableProcessors()で取得することができます。Executors.newWorkStealingPool()は、new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)と同じです。


ForkJoinPoolは共通プールのインスタンス(以下、Fork/Join共通プールと表記)があり、ForkJoinPool.commonPool()で取得できます。

このプールは静的に構築されます。shutdown()またはshutdownNow()を試行しても、その実行状態には影響しません。ただし、このプールおよび実行中の処理は、System.exit(int)プログラムで自動的に終了します。

ForkJoinPool.commonPool() - Java SE 8 API ドキュメント


Fork/Join共通プールは、Java8からシステムプロパティーで初期設定値を設定することが可能になりました。プロパティーは、ForkJoinPoolクラスの説明に書かれています。


また、Fork/Join自体ではありませんが、Java8では、ご存じStreamのパラレル(並列処理)と、ArraysクラスのparallelSortメソッドを始めとするパラレル処理が追加されています。これらのパラレル実行には、ForkJoinPoolのFork/Join共通プールが使用されています。





最後に、Java8で追加されたCompletableFutureについて。

CompletableFuture<T>Futureではありますがタスクではありません。それ自体が従来のExecutorファミリーには属さない独自のタスク・フレームワークと言えます。ただし、タスクを実行する際に使用されるスレッドプールは(ThreadPool)ExecutorForkJoinPool*2です。

CompletableFutureクラスは、CompletionStageインターフェイスの実装クラスです。


CompletableFutureの完了状態には、成功、失敗、キャンセルがあり、成功の場合は処理結果が取得でき、失敗の場合は例外がスローされます。

また、複数CompletableFutureを合成してひとつの新しいCompletableFutureを生成することもできます。

  • CompletableFutureの例:成功、失敗、キャンセル、合成
// import java.util.concurrent.*;

// 成功
log("success-1");
CompletableFuture<String> successCF = CompletableFuture.supplyAsync(() -> {
    sleep(3_000L);
    return "success";
});
try {
    log("success-2");
    log("success-3: result=%s", successCF.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
// 失敗
log("failure-1");
CompletableFuture<Void> failureCF = CompletableFuture.runAsync(() -> {
    sleep(3_000L);
    throw new RuntimeException("failure");
});
try {
    log("failure-2");
    failureCF.get();
} catch (InterruptedException | ExecutionException e) {
    log("failure-3: exception=%s", e);
}
// キャンセル
log("cancel-1");
CompletableFuture<Void> cancelCF = CompletableFuture.runAsync(() -> {
    sleep(3_000L);
});
log("cancel-2");
sleep(1_500L);
log("cancel-3: %s", cancelCF.cancel(true));
// 合成 (Either, async)
CompletableFuture<Long> leftCF = CompletableFuture.supplyAsync(() -> {
    log("either-left: start");
    sleep(2_000L);
    long t = System.currentTimeMillis();
    log("either-left: time=%d", t);
    return t;
});
CompletableFuture<Long> rightCF = CompletableFuture.supplyAsync(() -> {
    log("either-right: start");
    sleep(3_000L);
    long v = 123L;
    log("either-right: value=%d", v);
    return v;
});
CompletableFuture<Void> eitherCF = leftCF.acceptEitherAsync(rightCF, x -> {
    log("either-result: x=%d", x);
});
try {
    eitherCF.get();
    log("either-end");
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

  • 実行結果(CompletableFutureの例:成功、失敗、キャンセル、合成)
--:18:44.485 [main] success-1
--:18:44.719 [main] success-2
--:18:47.729 [main] success-3: result=success
--:18:47.729 [main] failure-1
--:18:47.729 [main] failure-2
--:18:50.741 [main] failure-3: exception=java.util.concurrent.ExecutionException: java.lang.RuntimeException: failure
--:18:50.741 [main] cancel-1
--:18:50.741 [main] cancel-2
--:18:52.255 [main] cancel-3: true
--:18:52.255 [ForkJoinPool.commonPool-worker-2] either-left: start
--:18:52.255 [ForkJoinPool.commonPool-worker-3] either-right: start
--:18:54.269 [ForkJoinPool.commonPool-worker-2] either-left: time=1426256334269
--:18:54.269 [ForkJoinPool.commonPool-worker-2] either-result: x=1426256334269
--:18:54.269 [main] either-end









同期キュー

キュー(Queue)は、Concurrency Utilitiesと同時に*3導入されました。キューというデータ構造自体はもちろん、並列処理に限定されるデータ構造では無いのですが、Javaではそれ以前にキューという型がありませんでした。ちなみに、両端キューすなわちデック(Deque)は、さらに遅れてJava6になってから登場します。


並列処理で使用される同期キューは、主にBlockingQueueインターフェイス (java.util.concurrent.BlockingQueue<E>)の実装クラスになります。これは、データを格納する際にキューに空きが無かったり、データを取り出す際にデータが無かった場合に処理をブロックするタイプのキューです。もちろん、BlockingQueueのほとんどの実装は「スレッドセーフ」なコレクションです。


BlockingQueueの使い道といえば、Producer-Consumerパターンが有名です。これは、データのやりとりで必要な同期処理をすべて同期キューに任せることができ、生産者(Producer)と消費者(Consumer)は同期を意識しないで済むというパターンです。


下記のサンプルコードは、複数の生産者と複数の消費者でキューを共有する例です。生産者はランダムな数を生産し、消費者はその数(秒)だけスリープする、特に意味のない処理です。

  • 例:複数の生産者と複数の消費者でキューを共有
// import java.util.Random;
// import java.util.concurrent.*;
// import java.util.stream.IntStream;

log("start");

// 共有キュー
BlockingQueue<Integer> q = new LinkedBlockingQueue<>();

// 生産者(producer)
int nProducers = 2;
ExecutorService producerThreads = Executors.newCachedThreadPool();
IntStream.range(0, nProducers).forEach(i -> producerThreads.execute(() -> {
    Random rnd = new Random(System.currentTimeMillis());
    try {
        while (true) {
            int n = rnd.nextInt(6) + 2;
            log("produce: %d", n);
            q.add(n);
            sleep((rnd.nextInt(5) + 3) * 1_000L);
        }
    } catch (Exception e) {
        log("this thread was interrupted and exited task. (%s)", e);
    }
}));

// 消費者(consumer)
int nConsumers = 3;
ExecutorService consumerThreads = Executors.newCachedThreadPool();
IntStream.range(0, nConsumers).forEach(i -> consumerThreads.execute(() -> {
    try {
        while (true) {
            int n = q.take();
            log("sleep %d", n);
            sleep(n * 1_000L);
        }
    } catch (Exception e) {
        log("this thread was interrupted and exited task. (%s)", e);
    }
}));

log("wait 30 seconds");
sleep(30_000L);
log("shutdown");
producerThreads.shutdownNow(); // 強制終了
consumerThreads.shutdownNow(); // 強制終了


  • 実行結果(複数の生産者と複数の消費者でキューを共有)
--:15:24.545 [main] start
--:15:24.685 [pool-1-thread-1] produce: 4
--:15:24.685 [pool-1-thread-2] produce: 4
--:15:24.685 [main] wait 30 seconds
--:15:24.685 [pool-2-thread-1] sleep 4
--:15:24.685 [pool-2-thread-2] sleep 4
--:15:31.690 [pool-1-thread-2] produce: 2
--:15:31.690 [pool-2-thread-3] sleep 2
--:15:31.690 [pool-1-thread-1] produce: 2
--:15:31.690 [pool-2-thread-2] sleep 2
--:15:34.701 [pool-1-thread-2] produce: 7
--:15:34.701 [pool-2-thread-1] sleep 7
--:15:34.701 [pool-1-thread-1] produce: 7
--:15:34.701 [pool-2-thread-3] sleep 7
--:15:39.710 [pool-1-thread-1] produce: 3
--:15:39.710 [pool-2-thread-2] sleep 3
--:15:39.710 [pool-1-thread-2] produce: 3
--:15:41.708 [pool-2-thread-1] sleep 3
--:15:42.723 [pool-1-thread-2] produce: 3
--:15:42.723 [pool-1-thread-1] produce: 3
--:15:42.723 [pool-2-thread-3] sleep 3
--:15:42.723 [pool-2-thread-2] sleep 3
--:15:45.736 [pool-1-thread-2] produce: 5
--:15:45.736 [pool-2-thread-2] sleep 5
--:15:45.736 [pool-1-thread-1] produce: 5
--:15:45.736 [pool-2-thread-1] sleep 5
--:15:51.743 [pool-1-thread-2] produce: 6
--:15:51.743 [pool-2-thread-3] sleep 6
--:15:51.743 [pool-1-thread-1] produce: 6
--:15:51.743 [pool-2-thread-2] sleep 6
--:15:54.692 [main] shutdown
--:15:54.692 [pool-1-thread-1] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted)
--:15:54.692 [pool-1-thread-2] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted)
--:15:54.692 [pool-2-thread-3] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted)
--:15:54.692 [pool-2-thread-1] this thread was interrupted and exited task. (java.lang.InterruptedException)
--:15:54.692 [pool-2-thread-2] this thread was interrupted and exited task. (java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted)

pool-1が生産者のスレッドプール、pool-2が消費者のスレッドプールです。
ログだけでは動きが掴みにくいと思いますが、生産者と消費者がお互いの処理のタイミングを意識することなく動作しているのが分かるでしょうか。

この項の内容とは関係ありませんが、ログの最後のところは、スレッドプールはmainスレッドが30秒後に強制的にシャットダウンさせているのでこうなっています。







シンクロナイザー

シンクロナイザーとは、複数のスレッド間での待ち合わせ(同期)を支援する機能のことです。Java8の時点では、5つのシンクロナイザーの実装があります。それぞれがある特定の同期方法を支援するための機能を持っています。


Semaphoreは、いわゆるカウンティングセマフォーです。
あらかじめ許可の数(カウント)を決めておき、スレッドが必要な数の許可を獲得できれば処理を行い、許可は獲得した分だけ利用不可の状態にします。処理が終わったら許可を返却すると、それらの許可は再度利用可能になります。必要な数の許可を獲得できなければ、待機させたり処理をスキップさせたりすることができます。
同時にタスクを実行できるスレッド数を制限する場合などに使用します。


下記のサンプルコードは、入場制限のシミュレーションです。

  • Semaphoreを使った入場制限のシミュレーション
/**
 * 来客グループ。
 */
class Party {
    String name;
    int count;
    Party(String name, int count) {
        this.name = name;
        this.count = count;
    }
    /**
     * 時間をつぶす。
     * ランダムミリ秒待機。
     * @param x 倍率
     */
    void doSomething(int x) {
        sleep((long)((Math.random() * 10) + 3) * 100L * x);
    }
}

// 入場証 - 同時に入場できるのは20人まで
Semaphore admissionPlates = new Semaphore(20);

// 来場者グループの列
List<Party> parties = Stream.of("A", "B", "C", "D", "E").map(x -> {
    int count = (int)((Math.random() * 13) + 3); // 3~15人のグループ
        return new Party("グループ" + x, count);
    }).collect(Collectors.toList());

// 全グループの退場をチェックする
CountDownLatch exitChecker = new CountDownLatch(parties.size());
// 非同期実行のためのスレッドプール
ExecutorService threadPool = Executors.newCachedThreadPool();

log("開場");
parties.forEach(party -> {
    threadPool.execute(() -> {
        int count = party.count;
        String partyText = String.format("%s (%2d人) ", party.name, count);
        try {
            log("%sが来場しました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits());
            // 入場証の取得、グループは必ず全員同時に入退場
            // ただし3秒待っても入場証が足りない場合は入場を諦める
            if (admissionPlates.tryAcquire(count, 3L, TimeUnit.SECONDS)) {
                log("%sが入場しました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits());
                // 場内での滞在時間
                party.doSomething(5);
                // 入場証を返却
                admissionPlates.release(count);
                log("%sが退場しました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits());
            }
            else {
                log("%sが入場を諦めました 入場証の残り数=%2d", partyText, admissionPlates.availablePermits());
            }
            // 退場チェック(入場をあきらめたグループも含む)
            exitChecker.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    party.doSomething(2); // 各グループ間隔の調整
});
try {
    exitChecker.await();
    log("全グループが退場しました");
} catch (InterruptedException e) {
    e.printStackTrace();
}
log("閉場");
threadPool.shutdown();


来客グループの人数と来場タイミングはランダムです。同時に20人しか入れないので、入場証を持っている人だけを入場させるようにします。退場時に入場証は返却します。
残りの入場証が来客グループの人数に満たない場合は、3秒間だけ空くのを待ち、空かなければ入場を諦めます。

  • 実行結果(Semaphoreを使った入場制限のシミュレーション)
--:21:40.052 [main] 開場
--:21:40.093 [pool-1-thread-1] グループA ( 7人) が来場しました 入場証の残り数=20
--:21:40.093 [pool-1-thread-1] グループA ( 7人) が入場しました 入場証の残り数=13
--:21:41.092 [pool-1-thread-2] グループB ( 5人) が来場しました 入場証の残り数=13
--:21:41.093 [pool-1-thread-2] グループB ( 5人) が入場しました 入場証の残り数= 8
--:21:43.092 [pool-1-thread-3] グループC (13人) が来場しました 入場証の残り数= 8
--:21:44.093 [pool-1-thread-1] グループA ( 7人) が退場しました 入場証の残り数=15
--:21:44.093 [pool-1-thread-3] グループC (13人) が入場しました 入場証の残り数= 2
--:21:45.292 [pool-1-thread-1] グループD ( 5人) が来場しました 入場証の残り数= 2
--:21:45.593 [pool-1-thread-1] グループD ( 5人) が入場しました 入場証の残り数= 2
--:21:45.593 [pool-1-thread-2] グループB ( 5人) が退場しました 入場証の残り数= 7
--:21:46.292 [pool-1-thread-2] グループE (11人) が来場しました 入場証の残り数= 2
--:21:49.293 [pool-1-thread-2] グループE (11人) が入場を諦めました 入場証の残り数= 2
--:21:50.093 [pool-1-thread-1] グループD ( 5人) が退場しました 入場証の残り数= 7
--:21:50.094 [pool-1-thread-3] グループC (13人) が退場しました 入場証の残り数=20
--:21:50.094 [main] 全グループが退場しました
--:21:50.095 [main] 閉場

この実行結果では、11人のグループEが来場したタイミングで残りの入場証が2個しかないため、待機していますが、3秒待っても入場証が返却されないので、諦めて帰ってしまいました。

なお、この例では退場者チェックを行う役目をCountDownLatchが担っています。mainスレッドを待機させるために、入館したかどうかにかかわらず、帰った人をカウントして、カウントダウンします。



次は、Exchangerを見てみましょう。これは、2つのスレッド間で値を交換するためのシンクロナイザーです。

Exchangerは、遺伝アルゴリズムパイプライン設計などのアプリケーションで便利です。

Exchanger - Java SE 8 API ドキュメント

次のサンプルコードの挙動を見れば、Exchangerがどのような動作をするのかはご理解いただけると思います。
簡単に特徴を説明できる例が思い浮かばないので、また特に意味のない動作の例です。ごめんなさい。

  • Exchangerの例:2つのスレッド間で値を交換する
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService threadPool = Executors.newCachedThreadPool();
String[] sample1 = { "AB", "CDE", "FG" };
String[] sample2 = { "123", "456", "7890" };
for (int i = 0; i < sample1.length; i++) {
    String s1 = sample1[i];
    Future<String> result1 = threadPool.submit(new Callable<String>() {
        String name = "task1";
        @Override
        public String call() throws Exception {
            log("%s: value = [%s]", name, s1);
            sleep(100L); // 何か処理
            String exchanged = exchanger.exchange(s1); // throws InterruptedException
            log("%s: exchanged = [%s]", name, exchanged);
            return exchanged;
        }
    });
    String s2 = sample2[i];
    Future<String> result2 = threadPool.submit(new Callable<String>() {
        String name = "task2";
        @Override
        public String call() throws Exception {
            log("%s: value = [%s]", name, s2);
            sleep(3000L); // 何か処理 task1より時間が掛かる
            String exchanged = exchanger.exchange(s2); // throws InterruptedException
            log("%s: exchanged = [%s]", name, exchanged);
            return exchanged;
        }
    });
    try {
        log("task1結果=%s", result1.get());
        log("task2結果=%s", result2.get());
    } catch (InterruptedException | ExecutionException e) {
        log("task実行中に例外が発生しました: 例外=%s", e);
    }
}
threadPool.shutdown();

task1とtask2でそれぞれの配列の値をexchangeメソッドによって交換し合います。交換の際、時間差がありますので、早い方のスレッドが遅い方のスレッドをexchangeメソッドで待機します。

  • 実行結果(Exchangerの例:2つのスレッド間で値を交換する)
--:00:29.641 [pool-1-thread-1] task1: value = [AB]
--:00:29.672 [pool-1-thread-2] task2: value = [123]
--:00:32.808 [pool-1-thread-1] task1: exchanged = [123]
--:00:32.808 [pool-1-thread-2] task2: exchanged = [AB]
--:00:32.808 [main] task1結果=123
--:00:32.808 [main] task2結果=AB
--:00:32.808 [pool-1-thread-1] task2: value = [456]
--:00:32.808 [pool-1-thread-2] task1: value = [CDE]
--:00:35.820 [pool-1-thread-1] task2: exchanged = [CDE]
--:00:35.820 [pool-1-thread-2] task1: exchanged = [456]
--:00:35.820 [main] task1結果=456
--:00:35.820 [main] task2結果=CDE
--:00:35.820 [pool-1-thread-1] task2: value = [7890]
--:00:35.820 [pool-1-thread-2] task1: value = [FG]
--:00:38.832 [pool-1-thread-1] task2: exchanged = [FG]
--:00:38.832 [pool-1-thread-2] task1: exchanged = [7890]
--:00:38.832 [main] task1結果=7890
--:00:38.832 [main] task2結果=FG



次はPhaserをみてみましょう。
Phaserは、「バリアー」を実現するためのクラスです。バリアーとは、ウィキペディアによれば、

並列コンピューティングにおけるバリア(英: Barrier)とは、同期方法の一つであり、ソースコード中でスレッドやプロセスがある箇所で停止し、他の全てのスレッドプロセスがバリアに到達するまで進行しないようなものを示す。

バリア (計算機科学) - Wikipedia

というものを指します。残りのシンクロナイザー、CyclicBarrier, CountDownLatchもバリアーとされています。
ちなみに、Phaserは、Java7で追加されました。


Phaserの特徴は、名前の通りフェイズに関係しています。各フェイズの開始をバリアーで待機させ、同時に開始させることができる、再利用可能なバリアーです。
他の特徴としては、フェイズに参加するメンバー(スレッド)の数が開始後に変えられること、(ThreadGroupのような)親子関係のツリーを構築できること、監視はメンバー以外でも可能であること、などがあります。


次のサンプルコードを見てください。

  • Phaserの例:フェイズ開始の準備待機と途中参加
// import java.util.Arrays;
// import java.util.Random;
// import java.util.concurrent.*;

Phaser phaser = new Phaser();
ExecutorService threadPool = Executors.newCachedThreadPool();
Random r = new Random(System.currentTimeMillis());

log("開始");
Arrays.asList("A", "B", "C").forEach(x -> {
    phaser.register();
    threadPool.execute(() -> {
        try {
            Arrays.asList(1, 2).forEach(i -> {
                String prefix = String.format("%s: [フェイズ #%d]", x, i);
                long millis = (r.nextInt(10) * 500) + 1_000L;
                log(prefix + "の処理中 (sleep %d millis)", millis);
                sleep(millis);
                log(prefix + "の完了を待機");
                int phaseNumber = phaser.arriveAndAwaitAdvance();
                log("到着したフェイズ番号=" + phaseNumber);
            });
        } finally {
            phaser.arriveAndDeregister();
        }
    });
});
sleep(6_000L);
log("フェイズに途中参加");
phaser.register();
sleep(15_000L);
log("途中参加したフェイズの開始を待機");
phaser.arriveAndAwaitAdvance();
log("終了");
threadPool.shutdown();

フェイズ1の開始をA,B,Cのスレッドが待機します。ランダム時間でスリープさせているのは、それぞれのスレッドで準備に要する時間が異なる様子を表しています。全てのスレッドがarriveAndAwaitAdvance()に到達すると、待機が解除されます。
フェイズ2では、mainスレッドが途中参加し、mainスレッドを含めた4スレッドがarriveAndAwaitAdvance()に到達すると、待機が解除されます。

  • 実行結果(フェイズ開始の準備待機と途中参加)
--:28:04.426 [main] 開始
--:28:04.504 [pool-1-thread-3] C: [フェイズ #1]の処理中 (sleep 5000 millis)
--:28:04.504 [pool-1-thread-2] B: [フェイズ #1]の処理中 (sleep 3000 millis)
--:28:04.504 [pool-1-thread-1] A: [フェイズ #1]の処理中 (sleep 2000 millis)
--:28:06.516 [pool-1-thread-1] A: [フェイズ #1]の完了を待機
--:28:07.516 [pool-1-thread-2] B: [フェイズ #1]の完了を待機
--:28:09.514 [pool-1-thread-3] C: [フェイズ #1]の完了を待機
--:28:09.514 [pool-1-thread-2] 到着したフェイズ番号=1
--:28:09.514 [pool-1-thread-2] B: [フェイズ #2]の処理中 (sleep 3500 millis)
--:28:09.514 [pool-1-thread-1] 到着したフェイズ番号=1
--:28:09.514 [pool-1-thread-3] 到着したフェイズ番号=1
--:28:09.514 [pool-1-thread-1] A: [フェイズ #2]の処理中 (sleep 2500 millis)
--:28:09.514 [pool-1-thread-3] C: [フェイズ #2]の処理中 (sleep 4000 millis)
--:28:10.514 [main] フェイズに途中参加
--:28:12.028 [pool-1-thread-1] A: [フェイズ #2]の完了を待機
--:28:13.028 [pool-1-thread-2] B: [フェイズ #2]の完了を待機
--:28:13.528 [pool-1-thread-3] C: [フェイズ #2]の完了を待機
--:28:25.528 [main] 途中参加したフェイズの開始を待機
--:28:25.528 [pool-1-thread-2] 到着したフェイズ番号=2
--:28:25.528 [main] 終了
--:28:25.528 [pool-1-thread-3] 到着したフェイズ番号=2
--:28:25.528 [pool-1-thread-1] 到着したフェイズ番号=2



CyclicBarrierは、Phaserをシンプルにしたような*4バリアー実装です。

CountDownLatchは、Semaphoreで少し触れましたが、パーティーを特定しないタイプのバリアーです。あらかじめ設定したカウントをカウントダウンしていきカウントがゼロになった時に待機が解除される仕組みになっています。私はこのCountDownLatchがけっこうお気に入りです。


CyclicBarrierCountDownLatchの例については、前回の記事を御覧ください。



もし、どうしてもこれらの既存のシンクロナイザーでは要件を充たせないのであれば、java.util.concurrent.locksパッケージのAPIを使用して、独自のシンクロナイザーを実装することもできます。
実際、5つの既存のシンクロナイザーのうち、SemaphoreCountDownLatchjava.util.concurrent.locks.AbstractQueuedSynchronizerクラスを利用して実装されていますし、残りもjava.util.concurrent.locks.LockSupportクラスやjava.util.concurrent.locks.Lockインターフェイスの実装クラスを使用して実装されています。







並行処理コレクション

BlockingQueue以外にも、並列処理向きのコレクションが用意されています。

代表的なものが、java.util.concurrent.ConcurrentHashMapクラスです。

特徴のひとつとして、アトミックな操作があります。例えば、ConcurrentHashMap#putIfAbsent(K, V)は、意味的には、「ConcurrentHashMap#containsKey(K)trueならConcurrentHashMap#put(K, V)を実行する」のような操作(実際には、もっと複雑な実装になっています)ですが、これを他のスレッドに割り込まれること無く一貫して(=アトミックに)行うことができます。
また、Java8では、ラムダ式に対応したメソッドが追加されています。例えば、ConcurrentHashMap#computeIfPresent(K, BiFunction)は、putIfAbsentと似ていますが、こちらはキーの値を入力にしてラムダ式を評価した結果をputします。


ConcurrentHashMapのアトミックな操作の詳細については、前回の記事も参照してください。







時間単位

Concurrency Utilitiesでは、時間の単位はTimeUnit列挙型(java.util.concurrent.TimeUnit)が使われます。TimeUnitは主に、スケジューリングや、待ち合わせのタイムアウトの時間を設定する「タイミング・パラメーター」として使用されます。例えば、下記のメソッドで使用されます。

使い方の例は、タスク・フレームワークの項を参照してください。


もちろん、TimeUnitはConcurrency Utilities以外でも使用できます。標準APIで使用されている例としては、Process#waitFor(long, TimeUnit)java.nio.file.attribute.FileTime.from(long, TimeUnit)などがあります。







アトミック値型とアトミック操作

java.util.concurrent.atomicパッケージは、並列処理でのアトミックな値の更新・取得を行うためのAPI群です。さらに細かく分けると、AtomicXXXFieldUpdaterAdder/Accumulatorのグループ*5に分類できます。順番に見てみましょう。


AtomicXXXXXXにはIntegerなどが入る)はアトミック値型のクラスで、文字通り「アトミック」に更新ができる値です。「アトミック」とは、ひと言で言うと、複数のスレッドから値にアクセスした場合でも1スレッドの操作が一貫して行われることが保証される動作のことです。例えば、AtomicLong#getAndIncrement()は、値の取得と加算が他のスレッドに割り込まれること無く行われます。前回の記事ではこの辺りをもう少し詳しく書いていますので、併せてご参照ください。


Java8では、アトミック値型にラムダ式に対応したメソッドがいくつか追加されています。例えば、AtomicLong#getAndUpdate(LongUnaryOperator)があります。

アトミック値型と内包される型の関係は下表の通り。最後の2つは、参照型とプリミティブ型のペアーになっていて、マーキングができるようになっています。注:ペアーの表記は、Javaのものではなく、擬似コードです。

  • 表:アトミック値型と内包される型の関係
クラス 対応する型
AtomicBoolean boolean
AtomicInteger int
AtomicLong long
AtomicReference<V> V
AtomicIntegerArray int[]
AtomicLongArray long[]
AtomicReferenceArray<E> E[]
AtomicMarkableReference<V> (V, boolean)
AtomicStampedReference<V> (V, int)



FieldUpdaterには、AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdaterがあります。
FieldUpdaterを使うと、アトミックを使用していない任意のクラスのvolatileフィールドに「外付け」アトミック機能を組み込むことができます。リフレクションを使っているのでprivateフィールドでもOKですが、volatileは必須です。
もちろん、FieldUpdater以外からのvolatileフィールドへのアクセスをブロックすることはできませんので、FieldUpdaterからのみのアクセスになるような使い方に限定しないといけません。



Adder/Accumulatorは、Java8の"jsr166e: Concurrency Update"として導入された機能のひとつ(Scalable updatable variables)です。それぞれlongdoubleのための計4クラスあります。

Adder/AccumulatorAtomicIntegerなどと同じくNumberのサブクラスなので、値型の一種と言えるかもしれません。ただし、下記の制約がありますのでご注意ください。

このクラスはNumberを拡張しますが、equals、hashCode、compareToなどのメソッドを定義しません。インスタンスでは変更が想定されているため、コレクション・キーとしては役立たないからです。

LongAdder - Java SE 8 API ドキュメント


また、LongAdder,LongAccumulatorAtomicLongには似たところがあります。ドキュメントにはこう書かれています。

きめ細かい同期制御のためではなく統計収集などの目的に使用される共通の値が、複数のスレッドによって更新される場合、通常はAtomicLongよりもこのクラスをお薦めします。更新の競合が少ないときは、2つのクラスの特徴は似ています。競合が多いときは、期待されるスループットはこのクラスの方がかなり高くなります。ただし、容量消費も多くなります。

LongAccumulator - Java SE 8 API ドキュメント


並列ストリームを使ってlong値をカウントアップする処理を、LongAdderAtomicLongで比べてみます。「容量消費」は度外視します。

  • LongAdderAtomicLongのパフォーマンス比較
// import java.util.concurrent.atomic.*;
// import java.util.stream.IntStream;

int nThread = 1_000;
LongAdder longAdder = new LongAdder();
AtomicLong atomicLong = new AtomicLong();
long t;
int sum;

log("開始(LongAdder)");
t = System.currentTimeMillis();
sum = IntStream.range(0, nThread).parallel().map(i -> {
    IntStream.range(0, 1_000_000).forEach(j -> longAdder.increment());
    return 1;
}).sum();
log("終了(LongAdder): 所要時間=%dミリ秒,", System.currentTimeMillis() - t);
log("        終了スレッド数=%d, カウント=%s", sum, longAdder);

log("開始(AtomicLong)");
t = System.currentTimeMillis();
sum = IntStream.range(0, nThread).parallel().map(i -> {
    IntStream.range(0, 1_000_000).forEach(j -> atomicLong.incrementAndGet());
    return 1;
}).sum();
log("終了(AtomicLong): 所要時間=%dミリ秒,", System.currentTimeMillis() - t);
log("        終了スレッド数=%d, カウント=%s", sum, atomicLong);

1000のスレッドがカウントアップを1,000,000回行い、合計が1,000,000,000になる処理です。並列処理はパラレルストリームを利用しています。


  • 実行結果(LongAdderAtomicLongのパフォーマンス比較)
--:14:54.799 [main] 開始(LongAdder)
--:15:09.012 [main] 終了(LongAdder): 所要時間=14175ミリ秒,
--:15:09.013 [main]         終了スレッド数=1000, カウント=1000000000
--:15:09.013 [main] 開始(AtomicLong)
--:15:46.161 [main] 終了(AtomicLong): 所要時間=37148ミリ秒,
--:15:46.161 [main]         終了スレッド数=1000, カウント=1000000000


このように、膨大な件数の集計処理を行う場合の速度に差が付きます。







ロック・フレームワーク

java.util.concurrent.locksパッケージには、ロックに関するAPIがまとめられています。

パッケージの説明の日本語訳にちょっと違和感があります。パッケージ全体を見たところでは、言語機能組み込みのsynchronizedおよびObject#wait, Object#notifyとは別のロッキング・フレームワーク、という意味だと思うのですが...

ざっと全体を見てみましょう。


ReentrantLockクラスは、基本機能ではsynchronizedとほぼ同じことができ*6、かつ拡張可能なロック実装です。
LockSupportクラスは、synchronizedReentrantLockとは異なるロック操作(park/unpark)を提供します。

この2つの詳細は後述します。


ReentrantReadWriteLockは、Read-Writeロックの(ReadWriteLockインターフェイスの)デフォルト実装です。

詳しくは、下記の記事をご覧ください。



AbstractOwnableSynchronizer, AbstractQueuedLongSynchronizer, AbstractQueuedSynchronizerクラスは、新たにシンクロナイザーを実装するような場合に用意されたクラスです。シンクロナイザーの項でも触れましたが、SemaphoreCountDownLatchは、AbstractQueuedSynchronizerを使用して実装されています。

ブロック・ロック、および先入れ先出し(FIFO)待機キューに依存する関連シンクロナイザ(セマフォ、イベントなど)を実装するフレームワークを提供します。このクラスは、状態表現を単一の原子int値に依存する大半の種類のシンクロナイザの有用な基盤として設計されています。

AbstractQueuedSynchronizer - Java SE 8 API ドキュメント


StampedLockクラスは、Java8で追加された新しい独立したロック実装です。
long値で表現される「スタンプ」を使って、ロックなしでの読み取り(楽観的読み取り:optimistic-read)が可能な状態を管理できます。また、このクラスはLockインターフェイスを実装していません。

下記の記事が参考になります。



さて、冒頭で少し触れたReentrantLockLockSupportについて、もう少し詳しく見ていきます。


ReentrantLockクラスの説明には、次のことが書かれています。

synchronizedメソッドおよび文を使用してアクセスする暗黙の監視ロックと同じ基本動作およびセマンティックスを使用し、かつ拡張機能を持つ、再入可能な相互排他Lockです。

ReentrantLock - Java SE 8 API ドキュメント

これはつまり、言語機能であるsynchronizedObject#wait, Object#notifyによるロックの基本動作とほぼ同じことが、APIであるReentrantLockを使って実現できる*7ということです。その上、synchronizedでは実現できない機能もいくつか持っています。


ReentrantLockは、例えばConditionインターフェイスと組み合わせて、このように使うことができます。

  • ReentrantLockCondition#await, Condition#signal
// import java.util.concurrent.locks.*;

Lock lock = new ReentrantLock();
Condition c = lock.newCondition();
new Thread(() -> {
    log("task 1");
    lock.lock();
    try {
        log("await");
        try {
            c.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } finally {
        lock.unlock();
    }
    log("end");
}).start();
sleep(500L);
new Thread(() -> {
    log("task 2");
    lock.lock();
    try {
        log("sleep");
        sleep(3_000L);
        c.signal();
    } finally {
        lock.unlock();
    }
    log("end");
}).start();


これは、下記のコードとほぼ同じ意味合いになります。

  • synchronizedObject#wait, Object#notifyの例
Object lock = new Object();
new Thread(() -> {
    log("task 1");
    synchronized (lock) {
        log("await");
        try {
            lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    log("end");
}).start();
sleep(500L);
new Thread(() -> {
    log("task 2");
    synchronized (lock) {
        log("sleep");
        sleep(3_000L);
        lock.notify();
    }
    log("end");
}).start();



最後に、LockSupportについて。

LockSupportクラスは、すべてのメソッドstaticです。主に、park,unparkメソッドと、その亜種で構成されています。


park/unparkというのは耳慣れない言葉ですね。クラスの説明にはこう書かれています。

parkメソッドとunparkメソッドは、非推奨メソッドThread.suspendおよびThread.resumeをこのような目的に使用できなくなるような問題が発生しないスレッドをブロックおよびブロック解除するための効率的な手段を提供します。parkを呼び出しているあるスレッドと、そのスレッドのunparkを試みている別のスレッドの間の競合は、パーミットのために活発な状態を保持します。さらに、呼出し側のスレッドで割込みが発生し、かつタイムアウト・バージョンがサポートされている場合は、parkが復帰します。parkメソッドはまた「理由なしで」いつでも復帰する可能性があるため、一般には、復帰時に状態を再チェックするループ内で呼び出す必要があります。この意味で、parkはスピンにそれほどの時間を浪費しない「ビジー待機」の最適化として機能しますが、効果を発揮させるにはunparkとペアにする必要があります。

LockSupport - Java SE 8 API ドキュメント

基本的な使い方は、parkで待機しているスレッドをunparkで復帰させる、というものですが、説明にもあるとおり、parkメソッドは「理由なしで」いつでも復帰する可能性があるため、ループの中に閉じ込めておく必要があります。
また、park状態で待機中のスレッドに割り込みをかけても復帰しますが、その際にInterruptedExceptionがスローされません。割り込まれたかどうかはThread#isInterrupted()を使って判断できます。


  • park/unparkの例
// import java.util.concurrent.atomic.AtomicBoolean;
// import java.util.concurrent.locks.LockSupport;

Thread mainThread = Thread.currentThread();
AtomicBoolean canProceed = new AtomicBoolean();
new Thread(() -> {
    sleep(5_000L);
    log("unpark(1回目)前");
    LockSupport.unpark(mainThread);
    log("unpark(1回目)後");
    sleep(5_000L);
    canProceed.set(true);
    log("unpark(2回目)前");
    LockSupport.unpark(mainThread);
    log("unpark(2回目)後");
}).start();
// parkの呼び出し前にunparkでパーミットをとっておくことができる
LockSupport.unpark(mainThread);
// パーミットは同時に2つ以上取れないのでこれは無視される
LockSupport.unpark(mainThread);
while (!canProceed.get()) {
    log("park前");
    LockSupport.park(); // 1回目は取得済みパーミットを消費して通過
    log("park後");
}
log("ループを抜けて終了");



  • 実行結果(park/unparkの例)
--:20:59.586 [main] park前
--:20:59.980 [main] park後
--:20:59.981 [main] park前
--:21:04.586 [Thread-0] unpark(1回目)前
--:21:04.586 [Thread-0] unpark(1回目)後
--:21:04.586 [main] park後
--:21:04.587 [main] park前
--:21:09.587 [Thread-0] unpark(2回目)前
--:21:09.587 [Thread-0] unpark(2回目)後
--:21:09.587 [main] park後
--:21:09.588 [main] ループを抜けて終了

park側のスレッドは、AtomicBooleanを使ってwhileループに閉じ込めています。
コードコメントにあるとおり、1回目のparkは取得済みパーミットを消費してそのまま通過できていますが、2回目は無視されています。








いかがでしたか。

私もまだ理解できていないところが多いですが、広く浅く網羅することはできたと思います。
しかしさすがに時間がかかってしまいました。少なくとも1/30の記事の時点には半分ほど書けていて、残り半分がしんどかった。


ともあれ、参考になれば幸いです。

(おわり)

*1:Map-Reduceとは「似て非なるもの」のようです。

*2:厳密には他のパターンもあります。詳しくはクラスのドキュメント参照。

*3:どちらもJava SE 5.0で追加されたAPIです。

*4:登場順は逆ですが。

*5:公式の分類ではありません。

*6:見かけの処理はほぼ同じですが同じものではありません。

*7:厳密に言うとAPIだけでなくtry-finallyと組み合わせて、ですね。