argius note

プログラミング関連

メモ:ReentrantReadWriteLock で Read-Write Lock

Read-Write LockのおさらいとReentrantReadWriteLockの実例をまとめました。短めです。


Read-Write Lockとは何か

日本語版には無かったので英語版Wikipediaをご覧ください。


大雑把に説明すると、複数のスレッド間でデータを共有する際に、読み取りだけの場合は複数のスレッドを受け付け、書き込みのときだけ排他ロックをかける仕組みです。
具体的な制御はこんな感じになります。

  • 読み取りロックだけならば排他は不要
  • 書き込みロックを施錠(lock)しようとするとき
    • 読み取りロックだけなら、新しいロック施錠をブロックしてすべてのロックが解錠(unlock)されるのを待機し、全てのロックが解錠されたら排他ロック施錠を行う
    • 書き込みロックなら、新しいロック施錠は既にブロックされているので排他ロックが解錠されるのを待機し、排他ロックが解錠されたら排他ロック施錠を行う

Read-Write Lockを使う意義

Javaの例で説明すると、Javaの古い同期コレクションであるVector,Hashtableは、ほぼすべてのメソッド*1synchronizedメソッドになっていて、読み書きに関係なく排他ロックを行います。
もしコレクションの更新頻度が少なく参照頻度が多いような場合に、読み取りのたびに排他ロックをかけると、大量の処理待ちが発生してしまいます。
これを単純にRead-Write Lockに置き換えるだけでも、参照頻度が多いときのパフォーマンスが向上します。


ちなみに、Java5からConcurrentHashMapクラス (java.util.concurrent.ConcurrentHashMap)という同期マップ実装が導入されています。これはまた別のもっと複雑な方法で高速化を図っているようです。


ReentrantReadWriteLockでRead-Write Lock

JavaのConcurrency Utilities APIに、ReadWriteLockインターフェイス (java.util.concurrent.locks.ReadWriteLock)とその実装のReentrantReadWriteLockクラス (java.util.concurrent.locks.ReentrantReadWriteLock)があります。

これを使って、実際にRead-Write Lockと一律排他ロックの動きを比べてみましょう。


説明を単純にするために、単一の値を読み書きするValueインターフェイスを例にして説明します。

  • SynchronizedValueHashtableVectorと同じくsynchronizedで読み書きに関係なく一律排他を行う実装
  • LockValuesynchronizedの代わりにReentrantLock (java.util.concurrent.locks.ReentrantLock)を使った読み書きに関係なく一律排他を行う実装
  • ReadWriteLockValueReentrantReadWriteLockを使ったRead-Write Lock実装
// import java.util.concurrent.locks.*;

interface Value<T> {
    T read();
    void write(T v);
}

class Sleeper {
    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }
}

class SynchronizedValue<T> implements Value<T> {
    T o;
    @Override
    public T read() {
        synchronized (this) {
            Sleeper.sleep(50L);
            return this.o;
        }
    }
    @Override
    public void write(T o) {
        synchronized (this) {
            Sleeper.sleep(50L);
            this.o = o;
        }
    }
}

class LockValue<T> implements Value<T> {
    T o;
    Lock lock = new ReentrantLock();
    @Override
    public T read() {
        lock.lock();
        try {
            Sleeper.sleep(50L);
            return this.o;
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void write(T o) {
        lock.lock();
        try {
            Sleeper.sleep(50L);
            this.o = o;
        } finally {
            lock.unlock();
        }
    }
}

class ReadWriteLockValue<T> implements Value<T> {
    T o;
    ReadWriteLock lock = new ReentrantReadWriteLock();
    Lock rlock = lock.readLock();
    Lock wlock = lock.writeLock();
    @Override
    public T read() {
        rlock.lock();
        try {
            Sleeper.sleep(50L);
            return this.o;
        } finally {
            rlock.unlock();
        }
    }
    @Override
    public void write(T o) {
        wlock.lock();
        try {
            Sleeper.sleep(50L);
            this.o = o;
        } finally {
            wlock.unlock();
        }
    }
}

わずかに負荷がかかるように、それぞれのアクセッサーに1/20秒のsleepを入れています。


以下の簡易ベンチマークでそれぞれの処理時間を計測してみます。

// import java.util.concurrent.*;

public static void main(String[] args) {
    benchmark(new SynchronizedValue<Integer>(), 15, 5);
    benchmark(new LockValue<Integer>(), 15, 5);
    benchmark(new ReadWriteLockValue<Integer>(), 15, 5);
    benchmark(new SynchronizedValue<Integer>(), 10, 10);
    benchmark(new LockValue<Integer>(), 10, 10);
    benchmark(new ReadWriteLockValue<Integer>(), 10, 10);
    benchmark(new SynchronizedValue<Integer>(), 0, 20);
    benchmark(new LockValue<Integer>(), 0, 20);
    benchmark(new ReadWriteLockValue<Integer>(), 0, 20);
}

static void benchmark(Value<Integer> v, int nReaders, int nWriters) {
    int loopCount = 10;
    String valueClass = v.getClass().getSimpleName();
    System.out.printf("開始 readers=%d, writers=%d, loop=%d, value=%s%n", nReaders, nWriters, loopCount, valueClass);
    ExecutorService threadPool = Executors.newCachedThreadPool();
    Phaser phaser = new Phaser();
    phaser.register();
    for (int i = 0; i < nReaders; i++) {
        phaser.register();
        threadPool.execute(() -> {
            phaser.arriveAndAwaitAdvance();
            try {
                for (int j = 0; j < loopCount; j++) {
                    v.read();
                }
            } finally {
                phaser.arriveAndDeregister();
            }
        });
    }
    for (int i = 0; i < nWriters; i++) {
        phaser.register();
        threadPool.execute(() -> {
            phaser.arriveAndAwaitAdvance();
            try {
                for (int j = 0; j < loopCount; j++) {
                    v.write(j);
                }
            } finally {
                phaser.arriveAndDeregister();
            }
        });
    }
    phaser.arriveAndAwaitAdvance();
    System.out.println("全スレッド起動");
    long t = System.currentTimeMillis();
    phaser.arriveAndAwaitAdvance();
    phaser.arriveAndDeregister();
    System.out.printf("終了 経過時間=%dミリ秒%n%n", System.currentTimeMillis() - t);
    threadPool.shutdown();
}


Phaserというクラスを使っています。Phaser自体の詳しい説明は省略しますが、ここでは開始と終了のタイミングを同期させるために使用しています。

開始 readers=15, writers=5, loop=10, value=SynchronizedValue
全スレッド起動
終了 経過時間=10000ミリ秒

開始 readers=15, writers=5, loop=10, value=LockValue
全スレッド起動
終了 経過時間=10000ミリ秒

開始 readers=15, writers=5, loop=10, value=ReadWriteLockValue
全スレッド起動
終了 経過時間=3000ミリ秒

開始 readers=10, writers=10, loop=10, value=SynchronizedValue
全スレッド起動
終了 経過時間=10001ミリ秒

開始 readers=10, writers=10, loop=10, value=LockValue
全スレッド起動
終了 経過時間=9999ミリ秒

開始 readers=10, writers=10, loop=10, value=ReadWriteLockValue
全スレッド起動
終了 経過時間=5751ミリ秒

開始 readers=0, writers=20, loop=10, value=SynchronizedValue
全スレッド起動
終了 経過時間=10000ミリ秒

開始 readers=0, writers=20, loop=10, value=LockValue
全スレッド起動
終了 経過時間=10001ミリ秒

開始 readers=0, writers=20, loop=10, value=ReadWriteLockValue
全スレッド起動
終了 経過時間=10000ミリ秒

誤差はあるものの、SynchronizedValueLockValueはほぼ同じ時間がかかっているのに対して、ReadWriteLockValueは読み取りスレッドが多いほど処理時間が短くなっているのが分かります。



(おわり)

*1:メソッド自体にsynchronizedが付いていなくても間接的にsynchronizedメソッドが呼ばれています。