読者です 読者をやめる 読者になる 読者になる

argius note

プログラミング関連

開発しています



Concurrency Utilitiesを使った並列処理・マルチスレッドのおさらい

Java

この記事は、個人的なおさらいのための、Java Concurrency Utilitiesの一部を使ったサンプルとメモです。
目新しいものは特にありません。
記事内のサンプルとAPIドキュメント参照はJava7(Java SE 7)を基準にしていますが、Java Concurrency Utilities自体は一部を除いてJava5(Java SE 5.0)から使えるようになっています。

あと、いつものことですが、画像がありません。


追記(2014-01-05): この機能の呼称は (Java) Concurrency Utilities が公式で、この記事内の"Utility"というのは正確ではない+混在していますのでご注意ください。ごめんなさい。
追記(2014-08-15): この記事は、キーワード"Java Concurrency Utilities"でGoogle検索した時に2番目に来ていたり、ブログ内でのアクセスが多かったりと、読んでいただく機会が多いみたいなので、読みやすくなるように改訂しました。"Utility"も"Utilities"に直しました。
ありがとうございます。


追記(2015-03-21):Concurrency Utilitiesを中心にしたまとめを書いてみました。→Concurrency Utilitiesの「再」まとめ - Java8対応版


並列処理を比較的手軽に実現できるConcurrency Utilities

マルチスレッドによる並列処理システムを構築するには、Javaの標準機能だけでもできないことはありませんが、安定した実現には多くの煩雑さと難解さが伴います。特に、シーケンシャルな処理では発生しないマルチスレッド特有の問題を考慮する必要があります。有名なところでは、スレッドアンセーフ(非スレッドセーフ)なオブジェクトの共有、デッドロックなどがあります。
そういった問題を回避しつつ、比較的容易にマルチスレッド関連機能を扱えるようにしてくれるのが、Concurrency Utilitiesです。
Concurrency Utilitiesは、JSR166で提案され、Java5から標準機能として使えるようになりました。

Concurrency UtilitiesのAPIは、java.util.concurrentパッケージとそのサブパッケージに格納されています。


今回は、Concurrency Utilitiesのうち、いくつかをピックアップしておさらいします。


サンプルのための前提と準備

サンプルの実行結果は、JDKJRE共に本家のJava7で、Windows7(32bit版)でコンパイル・実行したものです。他の環境でも動作するとは思いますが、細かな挙動が異なる可能性があります。


はじめに、それぞれのサンプルが小さくなるように、ユーティリティーなどを準備しておきます。また、同様の理由で、サンプルの一部ではmainメソッドとクラスは省略している箇所があります。

  • ユーティリティーメソッドクラスconcurrent.Util
    • logメソッド: ログ出力する
    • sleepメソッド: スリープする(InterruptedExceptionを非チェック例外に変換)
    • waitForSeveralSeconds: 数秒(1~10秒)、上記sleepでスリープする、スリープ前にログを出力
package concurrent;

final class Util {

    private Util() {
    }

    static void log(Object o) {
        log("%s", o);
    }

    static void log(String fmt, Object... args) {
        Thread currentThread = Thread.currentThread();
        String datetime = String.format("%1$tT.%1$tL", System.currentTimeMillis());
        datetime = "--:--:" + datetime.substring(6);
        String threadName = String.format("%s", currentThread.getName());
        System.out.printf("%s [%s] %s%n", datetime, threadName, String.format(fmt, args));
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    static void waitForSeveralSeconds() {
        final long millis = (long)(1000L * (Math.random() * 9 + 1));
        log("waiting ... (it takes %d milliseconds)", millis);
        sleep(millis);
    }

}


  • SleepTaskRunnableの実装クラス)
    • 指定した時間スリープする
package concurrent;

import static concurrent.Util.*;

final class SleepTask implements Runnable {

    private final long sleepMillis;

    SleepTask(long time) {
        this.sleepMillis = time;
    }

    @Override
    public void run() {
        log("begin");
        sleep(sleepMillis);
        log("end");
    }

}


  • SlowDownCallableTaskCallable<Integer>の実装クラス)
    • waitForSeveralSecondsしてから、引数の3倍の値を結果として返す
package concurrent;

import static concurrent.Util.*;
import java.util.concurrent.*;

final class SlowDownCallableTask implements Callable<Integer> {

    private final int input;

    SlowDownCallableTask(int input) {
        this.input = input;
    }

    @Override
    public Integer call() throws Exception {
        log("input=%d", input);
        waitForSeveralSeconds();
        return input * 3;
    }

}

スレッドプールを作ってタスクを処理させる - Executors, ExecutorService

Executors,ExecutorServiceは、スレッドプールに関する機能を集約したものです。
スレッドを生成するのはそれなりに処理コストがかかるので、スレッドをあらかじめ作っておいたり、キャッシュして使いまわしたりします。それがスレッドプールです。
スレッドプールを使う場面では、キャッシュの管理だけではなく、タスクを空きスレッドに割り振って実行させたり、余ったスレッドを破棄したり、足りないスレッドを補充したり、同時実行数を制御したり、処理結果の受け渡しのための待ち合わせをしたりと、さまざまな機能が要求されます。これらを簡単に実現してくれるのが、スレッドプールのさまざまな操作を提供するExecutorServiceインターフェイスと、ExecutorServiceを生成するExecutorsユーティリティークラスです。


まずは、ごく簡単な使い方を見てください。

  • ExecutorsSample1
//import static concurrent.Util.*;
//import java.util.concurrent.*;

ExecutorService pool;
// pool = Executors.newCachedThreadPool();
pool = Executors.newFixedThreadPool(2);
// pool = Executors.newSingleThreadExecutor();
pool.execute(new SleepTask(3000L));
pool.execute(new SleepTask(2000L));
pool.execute(new SleepTask(1000L));
pool.shutdown();
// pool.shutdownNow();
log("end");

ExecutorService#executeExecutor#executeから継承されたメソッド)は、Runnableのタスクとして実行するメソッドです。Threadを使った時のnew Thread(task).start()とするのと近いです。ExecutorService#submitというメソッドもありますが、これについては後述します。

Executors.newXXXThreadXXXという名前のメソッドは、スレッドプール(型はExecutorService)を生成するファクトリーメソッドです。いくつかありますが、ここでは分かりやすいところで3種類を挙げています。

Executors.newCachedThreadPoolは、足りない場合は自動的にスレッドを補充するスレッドプールを作ります。
Executors.newFixedThreadPoolは、スレッド数が指定した数になるスレッドプールを生成します。タスクが投入された際にスレッドが足りない場合は、空きスレッドが出るまでタスクの実行は保留されます。
Executors.newSingleThreadExecutorは、スレッド数が1のスレッドプールを生成します。これはExecutors.newFixedThreadPool(1)とほぼ同じです。


それでは、実行した結果を見てみましょう。

  • 実行結果(ExecutorsSample1) - newFixedThreadPool(2)の場合
--:--:16.841 [pool-1-thread-2] begin
--:--:16.841 [pool-1-thread-1] begin
--:--:16.841 [main] end
--:--:18.875 [pool-1-thread-2] end
--:--:18.876 [pool-1-thread-2] begin
--:--:19.875 [pool-1-thread-1] end
--:--:19.876 [pool-1-thread-2] end

2つのスレッド、pool-1-thread-1pool-1-thread-2は同時に開始していますが、pool-1-thread-2は2番目に投入した2秒スリープするタスクを実行して先に終了するため、1番目のタスクで3秒スリープするpool-1-thread-1より早く空きスレッドになります。その直後、pool-1-thread-2が保留となっていた3番目のタスクを処理しています。
mainスレッドの処理は早々に終了しています。ExecutorService#shutdownは、既に投入済みのタスクは処理が終わるまで待機してからスレッドを終了させます。
また、shutdownを実行しないと、mainスレッドが終了してもJVMは終了しません。
shutdownの代わりにExecutorService#shutdownNow(サンプルではコメントアウトになっている)を使うと、スレッドが実行中でも割り込み(InterruptException)を発生させてタスクを強制終了します。但し、確実にタスクを終了する保証はされない点に注意が必要です。


もうひとつ、デーモンスレッドだけになると、JVMは終了します。mainスレッドは「非デーモンスレッド」です。ということは、スレッドプールのスレッドがすべてデーモンスレッドになっていれば、mainスレッドが終了した時点で非デーモンスレッドがゼロになり、JVMは終了するということですね。(Thread#setDaemon参照。)
スレッドプールのスレッドをデーモンスレッドにするには、デーモンスレッドを生成するThreadFactoryを実装し、スレッドプール生成ファクトリーメソッドを実行する際に渡します。

  • ExecutorsSample2
//import static concurrent.Util.*;
//import java.util.concurrent.*;

ThreadFactory daemonThreadFactory = new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);
        return thread;
    }
};
ExecutorService pool = Executors.newFixedThreadPool(2, daemonThreadFactory);
pool.execute(new SleepTask(3000L));
pool.execute(new SleepTask(2000L));
sleep(2500L);
log("end");

  • 実行結果(ExecutorsSample2)
--:--:38.864 [pool-2-thread-1] begin
--:--:38.864 [pool-1-thread-1] begin
--:--:40.896 [pool-2-thread-1] end
--:--:41.364 [main] end

mainスレッド以外はデーモンスレッドなので、mainスレッドが終了した直後にJVMが終了し、pool-1-thread-1の処理は完了しませんでした。




処理結果を待ち合わせる - Future

並列計算をするには、前項のExecutorServiceを使えばできそうです。
ところが、RunnableのタスクのRunnable#runメソッドは値を返すようになっていません。それに、計算が終わっていないのに結果を返すこともできません。計算が終わるまで待っていたら並列計算の意味がありません。どうやって値を返すようにすれば良いのでしょうか。
これを解決するには、Futureインターフェイスに関連した機能を使います。デザインパターンでいうところのFutureパターンです。

  • FutureTaskSample
//import static concurrent.Util.log;
//import java.util.concurrent.*;

ExecutorService pool = Executors.newCachedThreadPool();
log("start");

// Callable + Future
Future<Integer> future = pool.submit(new SlowDownCallableTask(7));
Integer futureResult = future.get(); // throws InterruptedException, ExecutionException
log("result(Future): %d", futureResult);

// FutureTask
FutureTask<Integer> task = new FutureTask<>(new SlowDownCallableTask(5));
pool.submit(task);
Integer futureTaskResult = task.get(); // throws InterruptedException, ExecutionException
log("result(FutureTask): %d", futureTaskResult);

pool.shutdown();
log("end");


最初の例は、Callablejava.util.concurrent.Callableインタフェース)の結果をFuturejava.util.concurrent.Futureインターフェイス)として受け取って、Future#getで結果が返されるまで待機します。
2番目の例は、FutureTaskjava.util.concurrent.FutureTaskクラス)を使ったものです。FutureTaskは、RunnableFutureの役割を1つのクラスで賄うことができます。
Callableのタスクを実行するには、ExecutorService#submitを使用します。

  • 実行結果(FutureTaskSample)
--:--:04.220 [main] start
--:--:04.270 [pool-1-thread-1] input=7
--:--:04.307 [pool-1-thread-1] waiting ... (it takes 3521 milliseconds)
--:--:07.829 [main] result(Future): 21
--:--:07.830 [pool-1-thread-2] input=5
--:--:07.831 [pool-1-thread-2] waiting ... (it takes 7715 milliseconds)
--:--:15.546 [main] result(FutureTask): 15
--:--:15.547 [main] end

同じグループのすべてスレッドを待ち合わせる(バリアー) - CyclicBarrier

java.util.concurrent.CyclicBarrierクラスについて。
並列処理における「バリアー」という言葉は、「ロック」に比べると格段に馴染みの薄い用語ではないでしょうか。
Wikipediaでは「バリア (計算機科学)*1という項目があります。
再利用できるので、CyclicBarrierという名前になっているようです。

※Java7以降であれば、Phaserjava.util.concurrent.Phaserクラス)も検討してみてください。


APIドキュメントに使用例がありますが、そのままでは動かせないコードなので、以下のサンプルはそのまま(と言ってもmainメソッドを用意+インポート編成が必要...)で動かせるようにしています。細かい処理は削っています。

  • CyclicBarrierSample
int threadCount = 5;
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
final int[] results = new int[threadCount];
Runnable mergeTask = new Runnable() {
    @Override
    public void run() {
        log("merge begin");
        log("array: %s", Arrays.toString(results));
        int total = 0;
        for (int result : results)
            total += result;
        log("total: %d", total);
        log("merge end");
    }
};
final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount + 1, mergeTask);
for (int i = 0; i < threadCount; i++) {
    final int index = i;
    Runnable calcTask = new Runnable() {
        @Override
        public void run() {
            log("index=%d", index);
            results[index] = (index + 1) * 3;
            waitForSeveralSeconds();
            log("await");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException ex) {
                log(ex);
            }
        }
    };
    pool.execute(calcTask);
}
log("await");
try {
    cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
    log(ex);
}
log("end");
pool.shutdown();

並列計算を簡略化した処理になっています。各ワーカースレッド*2calcTaskを実行し、結果をmergeTaskで集計します。

CyclicBarrier#awaitを実行すると、そのスレッドは「バリアーポイント」で待機状態に入ります。待機スレッドが待ち合わせるスレッドの数に達すると、バリアーポイントを通過(「トリップ」する)して、後続の処理を行います。
CyclicBarrierコンストラクタの第1引数は、待ち合わせるスレッドの数を指定します。このサンプルでは、ワーカースレッドに加えてmainスレッドも待ち合わせに参加するので、ワーカースレッド数+1をセットしています。第2引数には、トリップした時に実行されるアクション(バリアーアクション)を指定します。(無くても良い。)このアクションは、最後にバリアーに入ったスレッドにより実行されます。(CyclicBarrierコンストラクタの説明参照。)
CyclicBarrier#awaitには、タイムアウトを設定するバージョン(CyclicBarrier#await(long, java.util.concurrent.TimeUnit))もあります。


それにしても、こういう時のtry-catchは邪魔ですね...

  • 実行結果(CyclicBarrierSample)
--:--:27.424 [main] await
--:--:27.424 [pool-1-thread-1] index=0
--:--:27.424 [pool-1-thread-2] index=1
--:--:27.424 [pool-1-thread-3] index=2
--:--:27.424 [pool-1-thread-4] index=3
--:--:27.424 [pool-1-thread-5] index=4
--:--:27.483 [pool-1-thread-2] waiting ... (it takes 8605 milliseconds)
--:--:27.483 [pool-1-thread-3] waiting ... (it takes 9917 milliseconds)
--:--:27.483 [pool-1-thread-4] waiting ... (it takes 3176 milliseconds)
--:--:27.484 [pool-1-thread-1] waiting ... (it takes 8469 milliseconds)
--:--:27.484 [pool-1-thread-5] waiting ... (it takes 2514 milliseconds)
--:--:29.998 [pool-1-thread-5] await
--:--:30.661 [pool-1-thread-4] await
--:--:35.954 [pool-1-thread-1] await
--:--:36.088 [pool-1-thread-2] await
--:--:37.401 [pool-1-thread-3] await
--:--:37.401 [pool-1-thread-3] merge begin
--:--:37.402 [pool-1-thread-3] array: [3, 6, 9, 12, 15]
--:--:37.402 [pool-1-thread-3] total: 45
--:--:37.403 [main] end
--:--:37.403 [pool-1-thread-3] merge end

待ち合わせるスレッドの数は6です。この例では、pool-1-thread-3が最後にバリアーポイントに到達、「トリップ」し、mergeTaskpool-1-thread-3によって実行されていることが分かります。




一定回数ポイントを通過したら待機解除 - CountDownLatch

java.util.concurrent.CountDownLatchクラスについて。
並列処理における「ラッチ」の意味を調べてみたんですが、はっきりとした定義が分かりませんでした。ラッチ回路が近いのでしょうか。
一般的な言葉でのラッチは閂、掛け金のことを指します。Wikipediaには「(鉄道駅などの)改札」とも。
ここでの定義は、見出しの通り、「一定回数ポイントを通過したら待機解除する仕掛け」(のようなもの)と仮置きしておきましょうか。


CyclicBarrierと少し似たところはありますが、CyclicBarrierと異なるのは、待機解除の条件が、スレッド同士の待ち合わせとは関係ないところです。
CountDownLatchは、何度かカウントダウン(CountDownLatch#countDown)を呼び出し、カウントがゼロになった時点で待機解除するという仕掛けです。待機はCountDownLatch#awaitメソッドで行いますが、カウントダウンする役目は待機している以外のどのスレッドが担っても良いし、同じスレッドが何度カウントダウンしても良いのです。


※Java7以降であれば、Phaserjava.util.concurrent.Phaserクラス)も検討してみてください。


次のサンプルは、先着3スレッドがカウントダウンを行った時点でカウントがゼロになりmainスレッドが待機解除する、という処理です。

  • CountDownLatchSample
//import static concurrent.Util.*;
//import java.util.concurrent.*;

int threadCount = 5;
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
final CountDownLatch countDownLatch = new CountDownLatch(3);
Runnable task = new Runnable() {
    @Override
    public void run() {
        waitForSeveralSeconds();
        log("count down");
        countDownLatch.countDown();
    }
};
for (int i = 0; i < threadCount; i++)
    pool.execute(task);
log("await");
boolean zeroCount = countDownLatch.await(8L, TimeUnit.SECONDS); // throws InterruptedException
// countDownLatch.await(); // throws InterruptedException
if (zeroCount)
    log("*** zero");
else
    log("*** not zero, count = %d", countDownLatch.getCount());
pool.shutdown();

ここではタイムアウト付きで待機している(CountDownLatch#await(long, java.util.concurrent.TimeUnit))ので、8秒経過した時点でカウントダウンがゼロになるのを待たずに待機解除します。

  • 実行結果(CountDownLatchSample)
--:--:50.788 [main] await
--:--:50.788 [pool-1-thread-1] waiting ... (it takes 7597 milliseconds)
--:--:50.788 [pool-1-thread-2] waiting ... (it takes 9696 milliseconds)
--:--:50.788 [pool-1-thread-3] waiting ... (it takes 8587 milliseconds)
--:--:50.788 [pool-1-thread-4] waiting ... (it takes 9837 milliseconds)
--:--:50.788 [pool-1-thread-5] waiting ... (it takes 3313 milliseconds)
--:--:54.137 [pool-1-thread-5] count down
--:--:58.420 [pool-1-thread-1] count down
--:--:58.824 [main] *** not zero, count = 1
--:--:59.409 [pool-1-thread-3] count down
--:--:00.518 [pool-1-thread-2] count down
--:--:00.661 [pool-1-thread-4] count down

このケースでは、5スレッド中3スレッドが終了に8秒以上かかっているので、カウントダウンがゼロになる前にmainスレッドが待機解除しています。




共有オブジェクトの値をアトミックに更新 - java.util.concurrent.atomic.*

java.util.concurrent.atomicパッケージについて。
並列処理におけるアトミックな操作とは、あるスレッドがスレッド間で共有するオブジェクトに対して値の取得・更新などを行う際に、それらの一連の処理が他のスレッドに割り込みされずに一気に行えることが保証されているもののことです。


ところで、Javaの並列処理では、一見、アトミックに操作できているように見えるのに、そうでない場合があります。
次のサンプルをご覧ください。

  • AtomicValueSample1 (not atomic)
import java.util.concurrent.*;

final class NotAtomicValueSample1 {

    int value;

    public static void main(String[] args) throws InterruptedException {
        final NotAtomicValueSample1 o = new NotAtomicValueSample1();
        int threadCount = 100;
        ExecutorService pool = Executors.newFixedThreadPool(threadCount);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++)
                    ++o.value;
                countDownLatch.countDown();
            }
        };
        for (int i = 0; i < threadCount; i++)
            pool.execute(task);
        countDownLatch.await(); // throws InterruptedException
        log("value=%d", o.value);
        pool.shutdown();
    }

}

共有フィールドvalueに対して、100スレッドが100回ずつ加算を行っています。よって、結果は常に10000になるように見えます。環境にもよりますが、実際、ほとんどが10000になるでしょう。

試したところ、7回目までは10000になりましたが、8回目で次の結果が出ました。*3

  • 実行結果(NotAtomicValueSample1)
--:--:31.087 [main] value=9987

マイナス13。13回、加算に失敗したのでしょうか。それとも、処理が実行されなかったのでしょうか。
これはJavaの並列処理についてある程度知っていれば回避できる有名な問題で、Java言語仕様の「17 スレッドとロック - 17.4 メモリモデル」に関連する話になります。

1つ目は、命令の順序変更(リオーダー)に関するもので、大雑把に言ってしまうと、コンパイラの最適化により実際の実行順序が思っていたのと変わってしまう場合があるということです。この問題は、volatile修飾子を付けることで、リオーダーを禁止することができます。
ただし、このケースの場合は、これでは解決しません。
本命は2つ目で、アトミックでない操作がされている場合です。
実はフィールドのインクリメント(++o.value)は、バイトコードレベルでは、1回の操作ではありません。

// ++o.value
getfield        #26; //Field AtomicValueSample1.value:I
iconst_1
iadd
putfield        #26; //Field AtomicValueSample1.value:I

別のスレッドが書き戻す(putfield)前にフィールドの値を取得して、加算して、書き戻した場合には、前回のputfieldの値は上書きされることになります。


Javaのマルチスレッドのメモリモデルについてもっと知りたい方には、下記のj5ik2oさんのエントリーが詳しいです。(長めです。)


この問題を解消する方法のひとつに、java.util.concurrent.atomicパッケージのクラス群を使うものがあります。
このサンプルでは、フィールドはint型なので、同パッケージのAtomicIntegerを使います。

  • AtomicValueSample2
//import java.util.concurrent.*;
//import java.util.concurrent.atomic.*; // AtomicInteger

final class AtomicValueSample2 {

    AtomicInteger value = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final AtomicValueSample2 o = new AtomicValueSample2();
        int threadCount = 100;
        ExecutorService pool = Executors.newFixedThreadPool(threadCount);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++)
                    o.value.incrementAndGet();
                countDownLatch.countDown();
            }
        };
        for (int i = 0; i < threadCount; i++)
            pool.execute(task);
        countDownLatch.await(); // throws InterruptedException
        log("value=%d", o.value.get());
        pool.shutdown();
    }

}

AtomicInteger#incrementAndGetは、アトミックに加算と加算後の値の取得を行うことができる操作です。
他にも、getAndIncrement, getAndDecrement, addAndGet, getAndAddメソッドなどがあります。

  • 実行結果(AtomicValueSample2)
--:--:12.200 [main] value=10000
# 5000回くらい

問題は解消したようです。


ただし、当然ながら、プリミティブ型の値とは異なり、裏では同期処理やオブジェクトへのアクセスが発生することになりますので、シビアな性能が求められる処理での使用は、慎重に検討すべきです。



java.util.concurrent.atomicパッケージ以外にも、アトミックな操作が提供されたクラスがあります。

例えば、ConcurrentMapインターフェイスの実装クラスがあります。


次のサンプルは、HashMapクラスと、ConcurrentMapの実装クラスであるConcurrentHashMapクラスとで同様の処理を書いてみたものです。

  • AtomicValueSample3
//import java.util.*;
//import java.util.concurrent.*;

Map<String, String> m1 = new HashMap<>();
ConcurrentMap<String, String> m2 = new ConcurrentHashMap<>();
String k = "keyString";
String v = "valueString";
// 存在しなければ登録
if (!m1.containsKey(k))
    m1.put(k, v);       // アトミックでない操作
m2.putIfAbsent(k, v);   // アトミックな操作
// 存在したら置き換え
if (m1.containsKey(k))  
    m1.put(k, v);       // アトミックでない操作
m2.replace(k, v);       // アトミックな操作
// 存在したら削除
if (m1.containsKey(k))
    m1.remove(k);       // アトミックでない操作
m2.remove(k);           // アトミックな操作

(実行結果無し)


追記(2014-08-15): アトミックでない操作も含めて、並列処理やマルチスレッドによって惹き起こされるバグは、テストでは発見が困難になりますので、まずは安易に共有オブジェクトを持つことを避けるようにすべきです。どうしても必要な場合は、アトミックでない操作を行わないように注意しながら使いましょう。


その他と、まとめ

他にも同期キューや、セマフォjava.util.concurrent.Semaphoreクラス)、ロック関連(java.util.concurrent.locksパッケージ)などが用意されています。
Java7で追加されたものでは、Fork/Join関連(java.util.concurrent.ForkJoinPoolクラス他)や、バリアー実装のPhaserjava.util.concurrent.Phaser)というのもあります。


この辺は、次回があったら触れたいと思います。

*1:日本語版は300字程度。リンクされていない英語版にはもう少し詳しく書いてあります。英語版=「Barrier (computer science)

*2:バックグラウンド処理でタスクを実行するスレッドのことを、こう呼ぶこともしばしばあります。

*3:発生しない場合は、スレッド数か繰り返し数を増やしてみてください。