広告

Java : BlockingQueue (ブロッキング・キュー) - API使用例

BlockingQueue (Java SE 20 & JDK 20) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様のおともにどうぞ。


概要

要素の取得時にキューが空でなくなるまで待機したり、要素の格納時にキュー内に空きが生じるまで待機する操作を追加でサポートしたりするQueueです。

クラス構成

BlockingQueue インタフェースは、

  • 要素を取り出すときに、もし空だったら要素が追加されるまで待機

という操作が可能な Queue です。

// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    final var future = executor.submit(() -> {
        try {
            while (true) {
                System.out.println("  take ...");
                final var value = queue.take();

                System.out.printf("  take OK! : value = %s (%f sec.)%n",
                        value, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("aaa");
    queue.put("bbb");
    queue.put("ccc");

    TimeUnit.SECONDS.sleep(5);

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("XXX");
    queue.put("YYY");
    queue.put("ZZZ");

    TimeUnit.SECONDS.sleep(5);

    System.out.println("future.cancel");
    future.cancel(true);
}

// 結果
// ↓
//put values (0.002361 sec.)
//  take ...
//  take OK! : value = aaa (0.003684 sec.)
//  take ...
//  take OK! : value = bbb (0.003801 sec.)
//  take ...
//  take OK! : value = ccc (0.003910 sec.)
//  take ...
//put values (5.008301 sec.)
//  take OK! : value = XXX (5.009003 sec.)
//  take ...
//  take OK! : value = YYY (5.009187 sec.)
//  take ...
//  take OK! : value = ZZZ (5.009332 sec.)
//  take ...
//future.cancel
//  InterruptedException!

メソッド

boolean add (E e)

容量制限に違反することなく、指定された要素をこのキューにすぐに挿入できる場合はそうします。成功した場合はtrueを返し、その時点で使用可能な空き領域が存在しない場合はIllegalStateExceptionをスローします。

// capacity = 3
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []

System.out.println(queue.add("aaa")); // true
System.out.println(queue); // [aaa]

System.out.println(queue.add("bbb")); // true
System.out.println(queue); // [aaa, bbb]

System.out.println(queue.add("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]

try {
    queue.add("ddd");
} catch (IllegalStateException e) {
    System.out.println("IllegalStateException! : " + e.getMessage());
}

// 結果
// ↓
//IllegalStateException! : Queue full

boolean contains (Object o)

指定された要素がキューに含まれている場合にtrueを返します。

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, bbb, ccc]

System.out.println(queue.contains("aaa")); // true
System.out.println(queue.contains("bbb")); // true
System.out.println(queue.contains("XXX")); // false

int drainTo (Collection<? super E> c)

このキューから利用可能なすべての要素を削除し、それらを指定されたコレクションに追加します。

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var c = new ArrayList<String>();

System.out.println(queue.offer("aaa")); // true

System.out.println(queue); // [aaa]
System.out.println(c); // []

System.out.println(queue.drainTo(c)); // 1

System.out.println(queue); // []
System.out.println(c); // [aaa]

System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [bbb, ccc]
System.out.println(c); // [aaa]

System.out.println(queue.drainTo(c)); // 2

System.out.println(queue); // []
System.out.println(c); // [aaa, bbb, ccc]

int drainTo (Collection<? super E> c, int maxElements)

指定された数以内の利用可能な要素をこのキューから削除し、指定されたコレクションに追加します。

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var c = new ArrayList<String>();

System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(c); // []

System.out.println(queue.drainTo(c, 10)); // 3

System.out.println(queue); // []
System.out.println(c); // [aaa, bbb, ccc]

System.out.println(queue.offer("XXX")); // true
System.out.println(queue.offer("YYY")); // true
System.out.println(queue.offer("ZZZ")); // true

System.out.println(queue); // [XXX, YYY, ZZZ]
System.out.println(c); // [aaa, bbb, ccc]

System.out.println(queue.drainTo(c, 2)); // 2

System.out.println(queue); // [ZZZ]
System.out.println(c); // [aaa, bbb, ccc, XXX, YYY]

boolean offer (E e)

指定された要素を、このキューに容量制限に違反することなしにすぐに挿入できる場合には、そうします。成功した場合はtrueを返し、使用可能な空き領域がその時点で存在しない場合はfalseを返します。

// capacity = 3
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []

System.out.println(queue.offer("aaa")); // true
System.out.println(queue); // [aaa]

System.out.println(queue.offer("bbb")); // true
System.out.println(queue); // [aaa, bbb]

System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]

System.out.println(queue.offer("ddd")); // false
System.out.println(queue); // [aaa, bbb, ccc]

boolean offer (E e, long timeout, TimeUnit unit)

指定された要素をこのキューに挿入します。必要に応じて、指定された時間まで空きが生じるのを待機します。

関連:offer(E e)

// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    System.out.printf("remainingCapacity = %d (%f sec.)%n",
            queue.remainingCapacity(), elapsedTime.getAsDouble());

    executor.submit(() -> {
        try {
            final var list = List.of("aaa", "bbb", "ccc", "ddd");
            for (final var value : list) {
                System.out.println("  offer ...");
                final var ret = queue.offer(value, 5, TimeUnit.SECONDS);

                System.out.printf("  offer ret = %b (%f sec.)%n",
                        ret, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(10);

    System.out.printf("queue = %s (%f sec.)%n", queue, elapsedTime.getAsDouble());
}

// 結果
// ↓
//remainingCapacity = 3 (0.002475 sec.)
//  offer ...
//  offer ret = true (0.004959 sec.)
//  offer ...
//  offer ret = true (0.005337 sec.)
//  offer ...
//  offer ret = true (0.005599 sec.)
//  offer ...
//  offer ret = false (5.007736 sec.)
//queue = [aaa, bbb, ccc] (10.013658 sec.)

E poll (long timeout, TimeUnit unit)

このキューの先頭を取得して削除します。必要に応じて、指定された待機時間まで要素が利用可能になるのを待機します。

// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    final var future = executor.submit(() -> {
        try {
            while (true) {
                System.out.println("  poll ...");
                final var value = queue.poll(5, TimeUnit.SECONDS);

                System.out.printf("  poll value = %s (%f sec.)%n",
                        value, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("aaa");
    queue.put("bbb");
    queue.put("ccc");

    TimeUnit.SECONDS.sleep(8);

    System.out.println("future.cancel");
    future.cancel(true);
}

// 結果
// ↓
//put values (0.005171 sec.)
//  poll ...
//  poll value = aaa (0.007717 sec.)
//  poll ...
//  poll value = bbb (0.007882 sec.)
//  poll ...
//  poll value = ccc (0.008058 sec.)
//  poll ...
//  poll value = null (5.011975 sec.)
//  poll ...
//future.cancel
//  InterruptedException!

void put (E e)

指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。

// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    System.out.printf("remainingCapacity = %d (%f sec.)%n",
            queue.remainingCapacity(), elapsedTime.getAsDouble());

    final var future = executor.submit(() -> {
        try {
            final var list = List.of("aaa", "bbb", "ccc", "ddd");
            for (final var value : list) {
                System.out.println("  put ...");
                queue.put(value);

                System.out.printf("  put OK! (%f sec.)%n", elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.printf("  InterruptedException! (%f sec.)%n", elapsedTime.getAsDouble());
        }
    });

    TimeUnit.SECONDS.sleep(5);

    System.out.printf("queue = %s (%f sec.)%n", queue, elapsedTime.getAsDouble());

    System.out.println("future.cancel");
    future.cancel(true);
}

// 結果
// ↓
//remainingCapacity = 3 (0.002237 sec.)
//  put ...
//  put OK! (0.003909 sec.)
//  put ...
//  put OK! (0.004022 sec.)
//  put ...
//  put OK! (0.004120 sec.)
//  put ...
//queue = [aaa, bbb, ccc] (5.014878 sec.)
//future.cancel
//  InterruptedException! (5.015400 sec.)

int remainingCapacity ()

理想的な状態(メモリーやリソースの制限がない状態)で、このキューがブロックせずに受け入れることができる追加要素の数を返します。組込み制限が存在しない場合はInteger.MAX_VALUEを返します。

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

System.out.println(queue); // []
System.out.println(queue.remainingCapacity()); // 3

System.out.println(queue.offer("aaa")); // true

System.out.println(queue); // [aaa]
System.out.println(queue.remainingCapacity()); // 2

System.out.println(queue.offer("bbb")); // true

System.out.println(queue); // [aaa, bbb]
System.out.println(queue.remainingCapacity()); // 1

System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(queue.remainingCapacity()); // 0

queue.clear();

System.out.println(queue); // []
System.out.println(queue.remainingCapacity()); // 3

boolean remove (Object o)

指定された要素の単一のインスタンスがこのキューに存在する場合は、キューから削除します。

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("BBB")); // true
System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("BBB")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, BBB, aaa, BBB, ccc]

System.out.println(queue.remove("aaa")); // true
System.out.println(queue); // [BBB, aaa, BBB, ccc]

System.out.println(queue.remove("BBB")); // true
System.out.println(queue); // [aaa, BBB, ccc]

System.out.println(queue.remove("BBB")); // true
System.out.println(queue); // [aaa, ccc]

System.out.println(queue.remove("XXX")); // false
System.out.println(queue); // [aaa, ccc]

E take ()

このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。

// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    final var future = executor.submit(() -> {
        try {
            while (true) {
                System.out.println("  take ...");
                final var value = queue.take();

                System.out.printf("  take OK! : value = %s (%f sec.)%n",
                        value, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("aaa");
    queue.put("bbb");
    queue.put("ccc");

    TimeUnit.SECONDS.sleep(5);

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("XXX");
    queue.put("YYY");
    queue.put("ZZZ");

    TimeUnit.SECONDS.sleep(5);

    System.out.println("future.cancel");
    future.cancel(true);
}

// 結果
// ↓
//put values (0.002361 sec.)
//  take ...
//  take OK! : value = aaa (0.003684 sec.)
//  take ...
//  take OK! : value = bbb (0.003801 sec.)
//  take ...
//  take OK! : value = ccc (0.003910 sec.)
//  take ...
//put values (5.008301 sec.)
//  take OK! : value = XXX (5.009003 sec.)
//  take ...
//  take OK! : value = YYY (5.009187 sec.)
//  take ...
//  take OK! : value = ZZZ (5.009332 sec.)
//  take ...
//future.cancel
//  InterruptedException!

Collectionで宣言されたメソッド

addAll, clear, containsAll, equals, hashCode, isEmpty, iterator, parallelStream, removeAll, removeIf, retainAll, size, spliterator, stream, toArray, toArray, toArray

Java API 使用例 : Collection」をご参照ください。

Iterableで宣言されたメソッド

forEach

Java API 使用例 : Iterable」をご参照ください。

Queueで宣言されたメソッド

element, peek, poll, remove

Java API 使用例 : Queue」をご参照ください。


関連記事

ページの先頭へ