非同期でイテレーションを可能にするリスト
リストの生成を行うスレッドAと、リストのイテレーションを行うスレッドBがあって、リストList
スレッドBは、List
追記(2012-05-17): 全面的に(上書き)修正しました。
とりあえず思いついたのは、未確定の場合はイテレーション中にスレッドを待機させるように実装した未確定状態を持つArrayListの拡張クラスを作って、それをList
できれば新たにクラスを作らずにできたら良いのですが、わかりませんでした。
拡張クラスは、WaitableIteratorListです。WaitableIteratorList#iteratorが返すIteratorの実装はWaitableIteratorとなります。
JavaプログラムはJava1.6用です。
- WaitableIteratorList.java
package waitable; import java.util.*; final class WaitableIteratorList<E> extends ArrayList<E> { final Object lock; private boolean finished; public WaitableIteratorList() { this.lock = new Object(); this.finished = false; } private void notifyChange() { synchronized (lock) { lock.notifyAll(); } } public boolean isFinished() { return finished; } public void finish() { this.finished = true; notifyChange(); } @Override public boolean add(E e) { final boolean added = super.add(e); notifyChange(); return added; } @Override public void add(int index, E e) { super.add(index, e); notifyChange(); } @Override public Iterator<E> iterator() { return new WaitableIterator<E>(this); } }
- WaitableIteratorList.java
package waitable; import java.util.*; final class WaitableIterator<E> implements Iterator<E> { private final WaitableIteratorList<E> list; private int cursor1; private int cursor2; public WaitableIterator(WaitableIteratorList<E> list) { this.list = list; this.cursor1 = 0; this.cursor2 = 0; } @Override public boolean hasNext() { while (true) { if (list.size() > cursor1) { cursor2 = cursor1++; return true; } else if (list.isFinished()) { return false; } synchronized (list.lock) { try { list.lock.wait(); } catch (InterruptedException ex) { // ignore } } } } @Override public E next() { return list.get(cursor2++); } @Override public void remove() { throw new UnsupportedOperationException("WaitableIteratorList#remove"); } }
- Main.java
package waitable; import java.util.*; public final class Main { static void p(Object o) { System.out.printf("%s %s %s%n", new Date(), Thread.currentThread(), o); } static void startNewThread(String name, Runnable runnable) { Thread t = new Thread(runnable, name); // t.setDaemon(true); t.start(); } static List<String> getList() { final WaitableIteratorList<String> a = new WaitableIteratorList<String>(); startNewThread("スレッドA", new Runnable() { @Override public void run() { for (int i = 0; i < 3; i++) { try { Thread.sleep(1000L); } catch (InterruptedException ex) { // ignore } p("list <= " + i); a.add("n=" + i); } a.finish(); } }); return a; } public static void main(String... args) { p("start"); final List<String> a = getList(); final Runnable task = new Runnable() { @Override public void run() { for (String s : a) { p(s); } } }; startNewThread("スレッドB1", task); startNewThread("スレッドB2", task); p("end"); } }
hasNext()は、リストの要素数が指定したインデックスよりも大きい場合は、そのままtrueを返します。それ以外でisFinished()がfalseの場合は、「要素数が足りないが、まだ追加されるかもしれない」状態なので、フィールドlockのモニタを取得して待機します。
add()などで要素が追加されたら、lockのモニタにnotifyAllして待機を解除させます。
finish()を実行するとisFinished()がtrueになり、これ以降は通常のIteratorの動作となります。
要素を指定のインデックスにインサートする操作は、エラーにはなりませんが期待とは異なる動作になります。
$ java waitable.Main Fri May 18 00:12:32 JST 2012 Thread[main,5,main] start Fri May 18 00:12:32 JST 2012 Thread[main,5,main] end Fri May 18 00:12:33 JST 2012 Thread[スレッドA,5,main] list <= 0 Fri May 18 00:12:33 JST 2012 Thread[スレッドB2,5,main] n=0 Fri May 18 00:12:33 JST 2012 Thread[スレッドB1,5,main] n=0 Fri May 18 00:12:34 JST 2012 Thread[スレッドA,5,main] list <= 1 Fri May 18 00:12:34 JST 2012 Thread[スレッドB2,5,main] n=1 Fri May 18 00:12:34 JST 2012 Thread[スレッドB1,5,main] n=1 Fri May 18 00:12:35 JST 2012 Thread[スレッドA,5,main] list <= 2 Fri May 18 00:12:35 JST 2012 Thread[スレッドB2,5,main] n=2 Fri May 18 00:12:35 JST 2012 Thread[スレッドB1,5,main] n=2 $
スレッドAがリストに値を追加すると、即時にスレッドBたち(1,2)がその要素をprintしているのが分かります。
mainスレッドはスレッドA(getList()内)とスレッドBたちの起動が終わったらすぐに終了しています。スレッドA,Bは共にデーモンスレッドでは無いので、mainスレッドが終了しても処理は続行となります。ちなみに、デーモンスレッドにした場合はmainの"end"が出力されて終わってしまいます。