Java : BlockingQueue with Examples

BlockingQueue (Java SE 20 & JDK 20) API Examples.
You will find code examples on most BlockingQueue methods.


Summary

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

Class diagram

final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    final var future = executor.submit(() -> {
        try {
            while (true) {
                System.out.println("  take ...");
                final var value = queue.take();

                System.out.printf("  take OK! : value = %s (%f sec.)%n",
                        value, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("aaa");
    queue.put("bbb");
    queue.put("ccc");

    TimeUnit.SECONDS.sleep(5);

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("XXX");
    queue.put("YYY");
    queue.put("ZZZ");

    TimeUnit.SECONDS.sleep(5);

    System.out.println("future.cancel");
    future.cancel(true);
}

// Result
// ↓
//put values (0.002361 sec.)
//  take ...
//  take OK! : value = aaa (0.003684 sec.)
//  take ...
//  take OK! : value = bbb (0.003801 sec.)
//  take ...
//  take OK! : value = ccc (0.003910 sec.)
//  take ...
//put values (5.008301 sec.)
//  take OK! : value = XXX (5.009003 sec.)
//  take ...
//  take OK! : value = YYY (5.009187 sec.)
//  take ...
//  take OK! : value = ZZZ (5.009332 sec.)
//  take ...
//future.cancel
//  InterruptedException!

Methods

boolean add (E e)

Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available.

// capacity = 3
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []

System.out.println(queue.add("aaa")); // true
System.out.println(queue); // [aaa]

System.out.println(queue.add("bbb")); // true
System.out.println(queue); // [aaa, bbb]

System.out.println(queue.add("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]

try {
    queue.add("ddd");
} catch (IllegalStateException e) {
    System.out.println("IllegalStateException! : " + e.getMessage());
}

// Result
// ↓
//IllegalStateException! : Queue full

boolean contains (Object o)

Returns true if this queue contains the specified element.

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, bbb, ccc]

System.out.println(queue.contains("aaa")); // true
System.out.println(queue.contains("bbb")); // true
System.out.println(queue.contains("XXX")); // false

int drainTo (Collection<? super E> c)

Removes all available elements from this queue and adds them to the given collection.

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var c = new ArrayList<String>();

System.out.println(queue.offer("aaa")); // true

System.out.println(queue); // [aaa]
System.out.println(c); // []

System.out.println(queue.drainTo(c)); // 1

System.out.println(queue); // []
System.out.println(c); // [aaa]

System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [bbb, ccc]
System.out.println(c); // [aaa]

System.out.println(queue.drainTo(c)); // 2

System.out.println(queue); // []
System.out.println(c); // [aaa, bbb, ccc]

int drainTo (Collection<? super E> c, int maxElements)

Removes at most the given number of available elements from this queue and adds them to the given collection.

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
final var c = new ArrayList<String>();

System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("bbb")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(c); // []

System.out.println(queue.drainTo(c, 10)); // 3

System.out.println(queue); // []
System.out.println(c); // [aaa, bbb, ccc]

System.out.println(queue.offer("XXX")); // true
System.out.println(queue.offer("YYY")); // true
System.out.println(queue.offer("ZZZ")); // true

System.out.println(queue); // [XXX, YYY, ZZZ]
System.out.println(c); // [aaa, bbb, ccc]

System.out.println(queue.drainTo(c, 2)); // 2

System.out.println(queue); // [ZZZ]
System.out.println(c); // [aaa, bbb, ccc, XXX, YYY]

boolean offer (E e)

Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available.

// capacity = 3
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue); // []

System.out.println(queue.offer("aaa")); // true
System.out.println(queue); // [aaa]

System.out.println(queue.offer("bbb")); // true
System.out.println(queue); // [aaa, bbb]

System.out.println(queue.offer("ccc")); // true
System.out.println(queue); // [aaa, bbb, ccc]

System.out.println(queue.offer("ddd")); // false
System.out.println(queue); // [aaa, bbb, ccc]

boolean offer (E e, long timeout, TimeUnit unit)

Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.

Please see also : offer(E e)

final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    System.out.printf("remainingCapacity = %d (%f sec.)%n",
            queue.remainingCapacity(), elapsedTime.getAsDouble());

    executor.submit(() -> {
        try {
            final var list = List.of("aaa", "bbb", "ccc", "ddd");
            for (final var value : list) {
                System.out.println("  offer ...");
                final var ret = queue.offer(value, 5, TimeUnit.SECONDS);

                System.out.printf("  offer ret = %b (%f sec.)%n",
                        ret, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(10);

    System.out.printf("queue = %s (%f sec.)%n", queue, elapsedTime.getAsDouble());
}

// Result
// ↓
//remainingCapacity = 3 (0.002475 sec.)
//  offer ...
//  offer ret = true (0.004959 sec.)
//  offer ...
//  offer ret = true (0.005337 sec.)
//  offer ...
//  offer ret = true (0.005599 sec.)
//  offer ...
//  offer ret = false (5.007736 sec.)
//queue = [aaa, bbb, ccc] (10.013658 sec.)

E poll (long timeout, TimeUnit unit)

Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.

final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    final var future = executor.submit(() -> {
        try {
            while (true) {
                System.out.println("  poll ...");
                final var value = queue.poll(5, TimeUnit.SECONDS);

                System.out.printf("  poll value = %s (%f sec.)%n",
                        value, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("aaa");
    queue.put("bbb");
    queue.put("ccc");

    TimeUnit.SECONDS.sleep(8);

    System.out.println("future.cancel");
    future.cancel(true);
}

// Result
// ↓
//put values (0.005171 sec.)
//  poll ...
//  poll value = aaa (0.007717 sec.)
//  poll ...
//  poll value = bbb (0.007882 sec.)
//  poll ...
//  poll value = ccc (0.008058 sec.)
//  poll ...
//  poll value = null (5.011975 sec.)
//  poll ...
//future.cancel
//  InterruptedException!

void put (E e)

Inserts the specified element into this queue, waiting if necessary for space to become available.

final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    System.out.printf("remainingCapacity = %d (%f sec.)%n",
            queue.remainingCapacity(), elapsedTime.getAsDouble());

    final var future = executor.submit(() -> {
        try {
            final var list = List.of("aaa", "bbb", "ccc", "ddd");
            for (final var value : list) {
                System.out.println("  put ...");
                queue.put(value);

                System.out.printf("  put OK! (%f sec.)%n", elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.printf("  InterruptedException! (%f sec.)%n", elapsedTime.getAsDouble());
        }
    });

    TimeUnit.SECONDS.sleep(5);

    System.out.printf("queue = %s (%f sec.)%n", queue, elapsedTime.getAsDouble());

    System.out.println("future.cancel");
    future.cancel(true);
}

// Result
// ↓
//remainingCapacity = 3 (0.002237 sec.)
//  put ...
//  put OK! (0.003909 sec.)
//  put ...
//  put OK! (0.004022 sec.)
//  put ...
//  put OK! (0.004120 sec.)
//  put ...
//queue = [aaa, bbb, ccc] (5.014878 sec.)
//future.cancel
//  InterruptedException! (5.015400 sec.)

int remainingCapacity ()

Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

System.out.println(queue); // []
System.out.println(queue.remainingCapacity()); // 3

System.out.println(queue.offer("aaa")); // true

System.out.println(queue); // [aaa]
System.out.println(queue.remainingCapacity()); // 2

System.out.println(queue.offer("bbb")); // true

System.out.println(queue); // [aaa, bbb]
System.out.println(queue.remainingCapacity()); // 1

System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, bbb, ccc]
System.out.println(queue.remainingCapacity()); // 0

queue.clear();

System.out.println(queue); // []
System.out.println(queue.remainingCapacity()); // 3

boolean remove (Object o)

Removes a single instance of the specified element from this queue, if it is present.

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("BBB")); // true
System.out.println(queue.offer("aaa")); // true
System.out.println(queue.offer("BBB")); // true
System.out.println(queue.offer("ccc")); // true

System.out.println(queue); // [aaa, BBB, aaa, BBB, ccc]

System.out.println(queue.remove("aaa")); // true
System.out.println(queue); // [BBB, aaa, BBB, ccc]

System.out.println(queue.remove("BBB")); // true
System.out.println(queue); // [aaa, BBB, ccc]

System.out.println(queue.remove("BBB")); // true
System.out.println(queue); // [aaa, ccc]

System.out.println(queue.remove("XXX")); // false
System.out.println(queue); // [aaa, ccc]

E take ()

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

try (final var executor = Executors.newSingleThreadExecutor()) {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    final var future = executor.submit(() -> {
        try {
            while (true) {
                System.out.println("  take ...");
                final var value = queue.take();

                System.out.printf("  take OK! : value = %s (%f sec.)%n",
                        value, elapsedTime.getAsDouble());
            }
        } catch (InterruptedException e) {
            System.out.println("  InterruptedException!");
        }
    });

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("aaa");
    queue.put("bbb");
    queue.put("ccc");

    TimeUnit.SECONDS.sleep(5);

    System.out.printf("put values (%f sec.)%n", elapsedTime.getAsDouble());

    queue.put("XXX");
    queue.put("YYY");
    queue.put("ZZZ");

    TimeUnit.SECONDS.sleep(5);

    System.out.println("future.cancel");
    future.cancel(true);
}

// Result
// ↓
//put values (0.002361 sec.)
//  take ...
//  take OK! : value = aaa (0.003684 sec.)
//  take ...
//  take OK! : value = bbb (0.003801 sec.)
//  take ...
//  take OK! : value = ccc (0.003910 sec.)
//  take ...
//put values (5.008301 sec.)
//  take OK! : value = XXX (5.009003 sec.)
//  take ...
//  take OK! : value = YYY (5.009187 sec.)
//  take ...
//  take OK! : value = ZZZ (5.009332 sec.)
//  take ...
//future.cancel
//  InterruptedException!

Methods declared in Collection

addAll, clear, containsAll, equals, hashCode, isEmpty, iterator, parallelStream, removeAll, removeIf, retainAll, size, spliterator, stream, toArray, toArray, toArray

Please see the link below.

Methods declared in Iterable

forEach

Please see the link below.

Methods declared in Queue

element, peek, poll, remove

Please see the link below.


Related posts

To top of page