メモ:ReentrantReadWriteLock で Read-Write Lock
Read-Write LockのおさらいとReentrantReadWriteLock
の実例をまとめました。短めです。
Read-Write Lockとは何か
日本語版には無かったので英語版Wikipediaをご覧ください。
大雑把に説明すると、複数のスレッド間でデータを共有する際に、読み取りだけの場合は複数のスレッドを受け付け、書き込みのときだけ排他ロックをかける仕組みです。
具体的な制御はこんな感じになります。
- 読み取りロックだけならば排他は不要
- 書き込みロックを施錠(lock)しようとするとき
- 読み取りロックだけなら、新しいロック施錠をブロックしてすべてのロックが解錠(unlock)されるのを待機し、全てのロックが解錠されたら排他ロック施錠を行う
- 書き込みロックなら、新しいロック施錠は既にブロックされているので排他ロックが解錠されるのを待機し、排他ロックが解錠されたら排他ロック施錠を行う
Read-Write Lockを使う意義
Javaの例で説明すると、Javaの古い同期コレクションであるVector
,Hashtable
は、ほぼすべてのメソッド*1がsynchronized
メソッドになっていて、読み書きに関係なく排他ロックを行います。
もしコレクションの更新頻度が少なく参照頻度が多いような場合に、読み取りのたびに排他ロックをかけると、大量の処理待ちが発生してしまいます。
これを単純にRead-Write Lockに置き換えるだけでも、参照頻度が多いときのパフォーマンスが向上します。
ちなみに、Java5からConcurrentHashMapクラス (java.util.concurrent.ConcurrentHashMap)
という同期マップ実装が導入されています。これはまた別のもっと複雑な方法で高速化を図っているようです。
ReentrantReadWriteLock
でRead-Write Lock
JavaのConcurrency Utilities APIに、ReadWriteLockインターフェイス (java.util.concurrent.locks.ReadWriteLock)
とその実装のReentrantReadWriteLockクラス (java.util.concurrent.locks.ReentrantReadWriteLock)
があります。
これを使って、実際にRead-Write Lockと一律排他ロックの動きを比べてみましょう。
説明を単純にするために、単一の値を読み書きするValue
インターフェイスを例にして説明します。
SynchronizedValue
:Hashtable
やVector
と同じくsynchronized
で読み書きに関係なく一律排他を行う実装LockValue
:synchronized
の代わりにReentrantLock
(java.util.concurrent.locks.ReentrantLock)
を使った読み書きに関係なく一律排他を行う実装ReadWriteLockValue
:ReentrantReadWriteLock
を使ったRead-Write Lock実装
// import java.util.concurrent.locks.*; interface Value<T> { T read(); void write(T v); } class Sleeper { static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } } class SynchronizedValue<T> implements Value<T> { T o; @Override public T read() { synchronized (this) { Sleeper.sleep(50L); return this.o; } } @Override public void write(T o) { synchronized (this) { Sleeper.sleep(50L); this.o = o; } } } class LockValue<T> implements Value<T> { T o; Lock lock = new ReentrantLock(); @Override public T read() { lock.lock(); try { Sleeper.sleep(50L); return this.o; } finally { lock.unlock(); } } @Override public void write(T o) { lock.lock(); try { Sleeper.sleep(50L); this.o = o; } finally { lock.unlock(); } } } class ReadWriteLockValue<T> implements Value<T> { T o; ReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); @Override public T read() { rlock.lock(); try { Sleeper.sleep(50L); return this.o; } finally { rlock.unlock(); } } @Override public void write(T o) { wlock.lock(); try { Sleeper.sleep(50L); this.o = o; } finally { wlock.unlock(); } } }
わずかに負荷がかかるように、それぞれのアクセッサーに1/20秒のsleepを入れています。
以下の簡易ベンチマークでそれぞれの処理時間を計測してみます。
- 簡易ベンチマーク
// import java.util.concurrent.*; public static void main(String[] args) { benchmark(new SynchronizedValue<Integer>(), 15, 5); benchmark(new LockValue<Integer>(), 15, 5); benchmark(new ReadWriteLockValue<Integer>(), 15, 5); benchmark(new SynchronizedValue<Integer>(), 10, 10); benchmark(new LockValue<Integer>(), 10, 10); benchmark(new ReadWriteLockValue<Integer>(), 10, 10); benchmark(new SynchronizedValue<Integer>(), 0, 20); benchmark(new LockValue<Integer>(), 0, 20); benchmark(new ReadWriteLockValue<Integer>(), 0, 20); } static void benchmark(Value<Integer> v, int nReaders, int nWriters) { int loopCount = 10; String valueClass = v.getClass().getSimpleName(); System.out.printf("開始 readers=%d, writers=%d, loop=%d, value=%s%n", nReaders, nWriters, loopCount, valueClass); ExecutorService threadPool = Executors.newCachedThreadPool(); Phaser phaser = new Phaser(); phaser.register(); for (int i = 0; i < nReaders; i++) { phaser.register(); threadPool.execute(() -> { phaser.arriveAndAwaitAdvance(); try { for (int j = 0; j < loopCount; j++) { v.read(); } } finally { phaser.arriveAndDeregister(); } }); } for (int i = 0; i < nWriters; i++) { phaser.register(); threadPool.execute(() -> { phaser.arriveAndAwaitAdvance(); try { for (int j = 0; j < loopCount; j++) { v.write(j); } } finally { phaser.arriveAndDeregister(); } }); } phaser.arriveAndAwaitAdvance(); System.out.println("全スレッド起動"); long t = System.currentTimeMillis(); phaser.arriveAndAwaitAdvance(); phaser.arriveAndDeregister(); System.out.printf("終了 経過時間=%dミリ秒%n%n", System.currentTimeMillis() - t); threadPool.shutdown(); }
Phaser
というクラスを使っています。Phaser
自体の詳しい説明は省略しますが、ここでは開始と終了のタイミングを同期させるために使用しています。
- ベンチマークの実行結果
開始 readers=15, writers=5, loop=10, value=SynchronizedValue 全スレッド起動 終了 経過時間=10000ミリ秒 開始 readers=15, writers=5, loop=10, value=LockValue 全スレッド起動 終了 経過時間=10000ミリ秒 開始 readers=15, writers=5, loop=10, value=ReadWriteLockValue 全スレッド起動 終了 経過時間=3000ミリ秒 開始 readers=10, writers=10, loop=10, value=SynchronizedValue 全スレッド起動 終了 経過時間=10001ミリ秒 開始 readers=10, writers=10, loop=10, value=LockValue 全スレッド起動 終了 経過時間=9999ミリ秒 開始 readers=10, writers=10, loop=10, value=ReadWriteLockValue 全スレッド起動 終了 経過時間=5751ミリ秒 開始 readers=0, writers=20, loop=10, value=SynchronizedValue 全スレッド起動 終了 経過時間=10000ミリ秒 開始 readers=0, writers=20, loop=10, value=LockValue 全スレッド起動 終了 経過時間=10001ミリ秒 開始 readers=0, writers=20, loop=10, value=ReadWriteLockValue 全スレッド起動 終了 経過時間=10000ミリ秒
誤差はあるものの、SynchronizedValue
とLockValue
はほぼ同じ時間がかかっているのに対して、ReadWriteLockValue
は読み取りスレッドが多いほど処理時間が短くなっているのが分かります。
(おわり)
*1:メソッド自体にsynchronizedが付いていなくても間接的にsynchronizedメソッドが呼ばれています。