Java : CyclicBarrier with Examples

CyclicBarrier (Java SE 19 & JDK 19) API Examples.
You will find code examples on most CyclicBarrier methods.


Summary

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

Class diagram

final var current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final Runnable action = () -> {
    System.out.printf("Action! (%f sec.)%n", elapsedTime.getAsDouble());
};

final var barrier = new CyclicBarrier(3, action);
System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(2);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start : waiting = %d (%f sec.)%n",
                        id, barrier.getNumberWaiting(), elapsedTime.getAsDouble());

                final var ret = barrier.await();
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// Result
// ↓
//parties = 3 (0.001453 sec.)
//1 : task start : waiting = 0 (2.006531 sec.)
//2 : task start : waiting = 1 (4.011152 sec.)
//3 : task start : waiting = 2 (6.013911 sec.)
//Action! (6.014346 sec.)
//3 : await ret = 0 (6.014478 sec.)
//2 : await ret = 1 (6.014531 sec.)
//1 : await ret = 2 (6.014516 sec.)

Constructors

CyclicBarrier (int parties)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action when the barrier is tripped.

final var current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var barrier = new CyclicBarrier(3);
System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(2);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start : waiting = %d (%f sec.)%n",
                        id, barrier.getNumberWaiting(), elapsedTime.getAsDouble());

                final var ret = barrier.await();
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// Result
// ↓
//parties = 3 (0.000953 sec.)
//1 : task start : waiting = 0 (2.014584 sec.)
//2 : task start : waiting = 1 (4.022366 sec.)
//3 : task start : waiting = 2 (6.033646 sec.)
//3 : await ret = 0 (6.034173 sec.)
//2 : await ret = 1 (6.034236 sec.)
//1 : await ret = 2 (6.034214 sec.)

CyclicBarrier (int parties, Runnable barrierAction)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

Please see await().

Methods

int await ()

Waits until all parties have invoked await on this barrier.

final var current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final Runnable action = () -> {
    System.out.printf("Action! (%f sec.)%n", elapsedTime.getAsDouble());
};

final var barrier = new CyclicBarrier(3, action);
System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(2);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start : waiting = %d (%f sec.)%n",
                        id, barrier.getNumberWaiting(), elapsedTime.getAsDouble());

                final var ret = barrier.await();
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// Result
// ↓
//parties = 3 (0.001453 sec.)
//1 : task start : waiting = 0 (2.006531 sec.)
//2 : task start : waiting = 1 (4.011152 sec.)
//3 : task start : waiting = 2 (6.013911 sec.)
//Action! (6.014346 sec.)
//3 : await ret = 0 (6.014478 sec.)
//2 : await ret = 1 (6.014531 sec.)
//1 : await ret = 2 (6.014516 sec.)
final var barrier = new CyclicBarrier(3);
System.out.println("parties = " + barrier.getParties());

try (final var executor = Executors.newFixedThreadPool(3)) {

    final var futureA = executor.submit(() -> {
        try {
            System.out.println("A : task start");
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("A : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    executor.submit(() -> {
        try {
            System.out.println("B : task start");
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("B : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    System.out.println("task A cancel!");

    final var ret = futureA.cancel(true);
    System.out.println("cancelled = " + ret);
}

// Result
// ↓
//parties = 3
//A : task start
//B : task start
//task A cancel!
//A : InterruptedException
//B : BrokenBarrierException
//cancelled = true

int await (long timeout, TimeUnit unit)

Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.

Please see also : await()

final var current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final Runnable action = () -> {
    System.out.printf("Action! (%f sec.)%n", elapsedTime.getAsDouble());
};

final var barrier = new CyclicBarrier(3, action);
final var timeout = 5;

System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());
System.out.println("timeout = " + timeout + " sec.");
System.out.println("isBroken = " + barrier.isBroken());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 2; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start (%f sec.)%n", id, elapsedTime.getAsDouble());

                final var ret = barrier.await(timeout, TimeUnit.SECONDS);
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
                System.out.printf("%d : %s : isBroken = %b (%f sec.)%n",
                        id, e.getClass().getSimpleName(), barrier.isBroken(), elapsedTime.getAsDouble());
            }
        });
    }
}

// Result
// ↓
//parties = 3 (0.001176 sec.)
//timeout = 5 sec.
//isBroken = false
//1 : task start (0.117036 sec.)
//2 : task start (0.224736 sec.)
//1 : TimeoutException : isBroken = true (5.127195 sec.)
//2 : BrokenBarrierException : isBroken = true (5.127217 sec.)

int getNumberWaiting ()

Returns the number of parties currently waiting at the barrier.

Please see await().

int getParties ()

Returns the number of parties required to trip this barrier.

final var barrier = new CyclicBarrier(3);
System.out.println("parties = " + barrier.getParties());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(1);

        executor.submit(() -> {
            try {
                System.out.println(id + " : task start : parties = " + barrier.getParties());

                final var ret = barrier.await();
                System.out.println(id + " : await ret = " + ret);
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// Result
// ↓
//parties = 3
//1 : task start : parties = 3
//2 : task start : parties = 3
//3 : task start : parties = 3
//3 : await ret = 0
//2 : await ret = 1
//1 : await ret = 2

boolean isBroken ()

Queries if this barrier is in a broken state.

Please see await(long timeout, TimeUnit unit).

void reset ()

Resets the barrier to its initial state.

final var barrier = new CyclicBarrier(3);
System.out.println("parties = " + barrier.getParties());

try (final var executor = Executors.newFixedThreadPool(3)) {

    executor.submit(() -> {
        try {
            System.out.println("A : task start : waiting = " + barrier.getNumberWaiting());
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("A : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    executor.submit(() -> {
        try {
            System.out.println("B : task start : waiting = " + barrier.getNumberWaiting());
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("B : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    System.out.println("reset!");
    barrier.reset();
}

// Result
// ↓
//parties = 3
//A : task start : waiting = 0
//A : task start : waiting = 1
//reset!
//A : BrokenBarrierException
//B : BrokenBarrierException

Related posts

To top of page