Java : BlockingQueue (ブロッキング・キュー) - API使用例
BlockingQueue (Java SE 20 & JDK 20) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様のおともにどうぞ。
概要
BlockingQueue インタフェースは、
- 要素を取り出すときに、もし空だったら要素が追加されるまで待機
という操作が可能な Queue です。
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
try (final var executor = Executors.newSingleThreadExecutor()) {
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var future = executor.submit(() -> {
try {
while (true) {
System.out.println(" take ...");
final var value = queue.take();
System.out.printf(" take OK! : value = %s (%f sec.)%n",
value, elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.println(" InterruptedException!");
}
});
System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());
queue.put("aaa");
queue.put("bbb");
queue.put("ccc");
TimeUnit.SECONDS.sleep(5);
System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());
queue.put("XXX");
queue.put("YYY");
queue.put("ZZZ");
TimeUnit.SECONDS.sleep(5);
System.out.println("future.cancel");
future.cancel(true);
}
// 結果
// ↓
//put values (0.002361 sec.)
// take ...
// take OK! : value = aaa (0.003684 sec.)
// take ...
// take OK! : value = bbb (0.003801 sec.)
// take ...
// take OK! : value = ccc (0.003910 sec.)
// take ...
//put values (5.008301 sec.)
// take OK! : value = XXX (5.009003 sec.)
// take ...
// take OK! : value = YYY (5.009187 sec.)
// take ...
// take OK! : value = ZZZ (5.009332 sec.)
// take ...
//future.cancel
// InterruptedException!
メソッド
boolean add (E e)
// capacity = 3
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []
System.out.println(queue.add("aaa")); // true
System.out.println(queue); // [aaa]
System.out.println(queue.add("bbb")); // true
System.out.println(queue); // [aaa, bbb]
System.out.println(queue.add("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]
try {
queue.add("ddd");
} catch (IllegalStateException e) {
System.out.println("IllegalStateException! : " + e.getMessage());
}
// 結果
// ↓
//IllegalStateException! : Queue full
boolean contains (Object o)
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(queue.contains("aaa")); // true
System.out.println(queue.contains("bbb")); // true
System.out.println(queue.contains("XXX")); // false
int drainTo (Collection<? super E> c)
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var c = new ArrayList<String>();
System.out.println(queue.offer("aaa")); // true
System.out.println(queue); // [aaa]
System.out.println(c); // []
System.out.println(queue.drainTo(c)); // 1
System.out.println(queue); // []
System.out.println(c); // [aaa]
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [bbb, ccc]
System.out.println(c); // [aaa]
System.out.println(queue.drainTo(c)); // 2
System.out.println(queue); // []
System.out.println(c); // [aaa, bbb, ccc]
int drainTo (Collection<? super E> c, int maxElements)
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var c = new ArrayList<String>();
System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(c); // []
System.out.println(queue.drainTo(c, 10)); // 3
System.out.println(queue); // []
System.out.println(c); // [aaa, bbb, ccc]
System.out.println(queue.offer("XXX")); // true
System.out.println(queue.offer("YYY")); // true
System.out.println(queue.offer("ZZZ")); // true
System.out.println(queue); // [XXX, YYY, ZZZ]
System.out.println(c); // [aaa, bbb, ccc]
System.out.println(queue.drainTo(c, 2)); // 2
System.out.println(queue); // [ZZZ]
System.out.println(c); // [aaa, bbb, ccc, XXX, YYY]
boolean offer (E e)
// capacity = 3
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []
System.out.println(queue.offer("aaa")); // true
System.out.println(queue); // [aaa]
System.out.println(queue.offer("bbb")); // true
System.out.println(queue); // [aaa, bbb]
System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(queue.offer("ddd")); // false
System.out.println(queue); // [aaa, bbb, ccc]
boolean offer (E e, long timeout, TimeUnit unit)
関連:offer(E e)
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
try (final var executor = Executors.newSingleThreadExecutor()) {
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.printf("remainingCapacity = %d (%f sec.)%n",
queue.remainingCapacity(), elapsedTime.getAsDouble());
executor.submit(() -> {
try {
final var list = List.of("aaa", "bbb", "ccc", "ddd");
for (final var value : list) {
System.out.println(" offer ...");
final var ret = queue.offer(value, 5, TimeUnit.SECONDS);
System.out.printf(" offer ret = %b (%f sec.)%n",
ret, elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.println(" InterruptedException!");
}
});
TimeUnit.SECONDS.sleep(10);
System.out.printf("queue = %s (%f sec.)%n", queue, elapsedTime.getAsDouble());
}
// 結果
// ↓
//remainingCapacity = 3 (0.002475 sec.)
// offer ...
// offer ret = true (0.004959 sec.)
// offer ...
// offer ret = true (0.005337 sec.)
// offer ...
// offer ret = true (0.005599 sec.)
// offer ...
// offer ret = false (5.007736 sec.)
//queue = [aaa, bbb, ccc] (10.013658 sec.)
E poll (long timeout, TimeUnit unit)
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
try (final var executor = Executors.newSingleThreadExecutor()) {
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var future = executor.submit(() -> {
try {
while (true) {
System.out.println(" poll ...");
final var value = queue.poll(5, TimeUnit.SECONDS);
System.out.printf(" poll value = %s (%f sec.)%n",
value, elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.println(" InterruptedException!");
}
});
System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());
queue.put("aaa");
queue.put("bbb");
queue.put("ccc");
TimeUnit.SECONDS.sleep(8);
System.out.println("future.cancel");
future.cancel(true);
}
// 結果
// ↓
//put values (0.005171 sec.)
// poll ...
// poll value = aaa (0.007717 sec.)
// poll ...
// poll value = bbb (0.007882 sec.)
// poll ...
// poll value = ccc (0.008058 sec.)
// poll ...
// poll value = null (5.011975 sec.)
// poll ...
//future.cancel
// InterruptedException!
void put (E e)
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
try (final var executor = Executors.newSingleThreadExecutor()) {
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.printf("remainingCapacity = %d (%f sec.)%n",
queue.remainingCapacity(), elapsedTime.getAsDouble());
final var future = executor.submit(() -> {
try {
final var list = List.of("aaa", "bbb", "ccc", "ddd");
for (final var value : list) {
System.out.println(" put ...");
queue.put(value);
System.out.printf(" put OK! (%f sec.)%n", elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.printf(" InterruptedException! (%f sec.)%n", elapsedTime.getAsDouble());
}
});
TimeUnit.SECONDS.sleep(5);
System.out.printf("queue = %s (%f sec.)%n", queue, elapsedTime.getAsDouble());
System.out.println("future.cancel");
future.cancel(true);
}
// 結果
// ↓
//remainingCapacity = 3 (0.002237 sec.)
// put ...
// put OK! (0.003909 sec.)
// put ...
// put OK! (0.004022 sec.)
// put ...
// put OK! (0.004120 sec.)
// put ...
//queue = [aaa, bbb, ccc] (5.014878 sec.)
//future.cancel
// InterruptedException! (5.015400 sec.)
int remainingCapacity ()
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []
System.out.println(queue.remainingCapacity()); // 3
System.out.println(queue.offer("aaa")); // true
System.out.println(queue); // [aaa]
System.out.println(queue.remainingCapacity()); // 2
System.out.println(queue.offer("bbb")); // true
System.out.println(queue); // [aaa, bbb]
System.out.println(queue.remainingCapacity()); // 1
System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(queue.remainingCapacity()); // 0
queue.clear();
System.out.println(queue); // []
System.out.println(queue.remainingCapacity()); // 3
boolean remove (Object o)
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("BBB")); // true
System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("BBB")); // true
System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, BBB, aaa, BBB, ccc]
System.out.println(queue.remove("aaa")); // true
System.out.println(queue); // [BBB, aaa, BBB, ccc]
System.out.println(queue.remove("BBB")); // true
System.out.println(queue); // [aaa, BBB, ccc]
System.out.println(queue.remove("BBB")); // true
System.out.println(queue); // [aaa, ccc]
System.out.println(queue.remove("XXX")); // false
System.out.println(queue); // [aaa, ccc]
E take ()
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
try (final var executor = Executors.newSingleThreadExecutor()) {
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var future = executor.submit(() -> {
try {
while (true) {
System.out.println(" take ...");
final var value = queue.take();
System.out.printf(" take OK! : value = %s (%f sec.)%n",
value, elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.println(" InterruptedException!");
}
});
System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());
queue.put("aaa");
queue.put("bbb");
queue.put("ccc");
TimeUnit.SECONDS.sleep(5);
System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());
queue.put("XXX");
queue.put("YYY");
queue.put("ZZZ");
TimeUnit.SECONDS.sleep(5);
System.out.println("future.cancel");
future.cancel(true);
}
// 結果
// ↓
//put values (0.002361 sec.)
// take ...
// take OK! : value = aaa (0.003684 sec.)
// take ...
// take OK! : value = bbb (0.003801 sec.)
// take ...
// take OK! : value = ccc (0.003910 sec.)
// take ...
//put values (5.008301 sec.)
// take OK! : value = XXX (5.009003 sec.)
// take ...
// take OK! : value = YYY (5.009187 sec.)
// take ...
// take OK! : value = ZZZ (5.009332 sec.)
// take ...
//future.cancel
// InterruptedException!
Collectionで宣言されたメソッド
addAll, clear, containsAll, equals, hashCode, isEmpty, iterator, parallelStream, removeAll, removeIf, retainAll, size, spliterator, stream, toArray, toArray, toArray
「Java API 使用例 : Collection」をご参照ください。
Iterableで宣言されたメソッド
forEach
「Java API 使用例 : Iterable」をご参照ください。
Queueで宣言されたメソッド
element, peek, poll, remove
「Java API 使用例 : Queue」をご参照ください。
関連記事
- API 使用例
- BlockingQueue (ブロッキング・キュー)
- Callable
- CancellationException
- ConcurrentHashMap.KeySetView (並列処理用セット)
- ConcurrentLinkedDeque (並列処理用・両端キュー)
- ConcurrentLinkedQueue (並列処理用キュー)
- ConcurrentMap (並列処理用マップ)
- ConcurrentModificationException (並列処理例外)
- ConcurrentSkipListSet (並列処理用セット)
- Condition (同期)
- CopyOnWriteArrayList (並列処理用リスト)
- CopyOnWriteArraySet (並列処理用セット)
- CountDownLatch (同期)
- CyclicBarrier (同期)
- Exchanger (同期)
- ExecutionException
- Executor
- ExecutorService
- Executors
- Future
- Future.State
- FutureTask
- InterruptedException (割込み例外)
- Lock (同期)
- Object (オブジェクト)
- Runnable
- Semaphore (セマフォ)
- Thread (スレッド)
- ThreadGroup
- ThreadLocal
- TimeUnit