読者です 読者をやめる 読者になる 読者になる

argius note

プログラミング関連

開発しています



非同期でイテレーションを可能にするリスト

Java

リストの生成を行うスレッドAと、リストのイテレーションを行うスレッドBがあって、リストListはそれぞれのスレッドの処理が始まる前にスレッドAからスレッドBに渡しておかなければなりません。
スレッドBは、Listが並列処理かどうかを知らないものとします。
追記(2012-05-17): 全面的に(上書き)修正しました。


とりあえず思いついたのは、未確定の場合はイテレーション中にスレッドを待機させるように実装した未確定状態を持つArrayListの拡張クラスを作って、それをListとして使うことです。
できれば新たにクラスを作らずにできたら良いのですが、わかりませんでした。


拡張クラスは、WaitableIteratorListです。WaitableIteratorList#iteratorが返すIteratorの実装はWaitableIteratorとなります。
JavaプログラムはJava1.6用です。

  • WaitableIteratorList.java
package waitable;

import java.util.*;

final class WaitableIteratorList<E> extends ArrayList<E> {

    final Object lock;

    private boolean finished;

    public WaitableIteratorList() {
        this.lock = new Object();
        this.finished = false;
    }

    private void notifyChange() {
        synchronized (lock) {
            lock.notifyAll();
        }
    }

    public boolean isFinished() {
        return finished;
    }

    public void finish() {
        this.finished = true;
        notifyChange();
    }

    @Override
    public boolean add(E e) {
        final boolean added = super.add(e);
        notifyChange();
        return added;
    }

    @Override
    public void add(int index, E e) {
        super.add(index, e);
        notifyChange();
    }

    @Override
    public Iterator<E> iterator() {
        return new WaitableIterator<E>(this);
    }

}
  • WaitableIteratorList.java
package waitable;

import java.util.*;

final class WaitableIterator<E> implements Iterator<E> {

    private final WaitableIteratorList<E> list;

    private int cursor1;
    private int cursor2;

    public WaitableIterator(WaitableIteratorList<E> list) {
        this.list = list;
        this.cursor1 = 0;
        this.cursor2 = 0;
    }

    @Override
    public boolean hasNext() {
        while (true) {
            if (list.size() > cursor1) {
                cursor2 = cursor1++;
                return true;
            } else if (list.isFinished()) {
                return false;
            }
            synchronized (list.lock) {
                try {
                    list.lock.wait();
                } catch (InterruptedException ex) {
                    // ignore
                }
            }
        }
    }

    @Override
    public E next() {
        return list.get(cursor2++);
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("WaitableIteratorList#remove");
    }

}
package waitable;

import java.util.*;

public final class Main {

    static void p(Object o) {
        System.out.printf("%s %s %s%n", new Date(), Thread.currentThread(), o);
    }

    static void startNewThread(String name, Runnable runnable) {
        Thread t = new Thread(runnable, name);
        // t.setDaemon(true);
        t.start();
    }

    static List<String> getList() {
        final WaitableIteratorList<String> a = new WaitableIteratorList<String>();
        startNewThread("スレッドA", new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 3; i++) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                    p("list <= " + i);
                    a.add("n=" + i);
                }
                a.finish();
            }
        });
        return a;
    }

    public static void main(String... args) {
        p("start");
        final List<String> a = getList();
        final Runnable task = new Runnable() {
            @Override
            public void run() {
                for (String s : a) {
                    p(s);
                }
            }
        };
        startNewThread("スレッドB1", task);
        startNewThread("スレッドB2", task);
        p("end");
    }

}

hasNext()は、リストの要素数が指定したインデックスよりも大きい場合は、そのままtrueを返します。それ以外でisFinished()がfalseの場合は、「要素数が足りないが、まだ追加されるかもしれない」状態なので、フィールドlockのモニタを取得して待機します。
add()などで要素が追加されたら、lockのモニタにnotifyAllして待機を解除させます。
finish()を実行するとisFinished()がtrueになり、これ以降は通常のIteratorの動作となります。
要素を指定のインデックスにインサートする操作は、エラーにはなりませんが期待とは異なる動作になります。

$ java waitable.Main
Fri May 18 00:12:32 JST 2012 Thread[main,5,main] start
Fri May 18 00:12:32 JST 2012 Thread[main,5,main] end
Fri May 18 00:12:33 JST 2012 Thread[スレッドA,5,main] list <= 0
Fri May 18 00:12:33 JST 2012 Thread[スレッドB2,5,main] n=0
Fri May 18 00:12:33 JST 2012 Thread[スレッドB1,5,main] n=0
Fri May 18 00:12:34 JST 2012 Thread[スレッドA,5,main] list <= 1
Fri May 18 00:12:34 JST 2012 Thread[スレッドB2,5,main] n=1
Fri May 18 00:12:34 JST 2012 Thread[スレッドB1,5,main] n=1
Fri May 18 00:12:35 JST 2012 Thread[スレッドA,5,main] list <= 2
Fri May 18 00:12:35 JST 2012 Thread[スレッドB2,5,main] n=2
Fri May 18 00:12:35 JST 2012 Thread[スレッドB1,5,main] n=2
$

スレッドAがリストに値を追加すると、即時にスレッドBたち(1,2)がその要素をprintしているのが分かります。
mainスレッドはスレッドA(getList()内)とスレッドBたちの起動が終わったらすぐに終了しています。スレッドA,Bは共にデーモンスレッドでは無いので、mainスレッドが終了しても処理は続行となります。ちなみに、デーモンスレッドにした場合はmainの"end"が出力されて終わってしまいます。