広告

Java : CyclicBarrier (同期) - API使用例

CyclicBarrier (Java SE 19 & JDK 19) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様のおともにどうぞ。


概要

スレッド・セットのそれぞれが共通のバリアー・ポイントに達するまで待機することを可能にする同期化支援機能です。 CyclicBarrierは、相互に待機することが必要になることがある、固定サイズのスレッド・パーティが関係するプログラムで有用です。 バリアーは、待機中のスレッドが解放されたあとに再利用できるため、cyclic (循環式)と呼ばれます。

クラス構成

CyclicBarrier クラスを使うと、複数のスレッド(パーティ) からの呼び出しが全員そろうまで、待機(同期) させることができます。

例えば、3 パーティの CyclicBarrier オブジェクトがあるとします。

  1. A スレッドから CyclicBarrier.await メソッドを呼び出すと、メソッドはそのまま待機します。
  2. 追加で B スレッドから await メソッドを呼び出しても、そのまま待機します。
  3. さらに C スレッドから await メソッドを呼び出すと、3 パーティそろったので、A, B, C の await メソッドは復帰します。
// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final Runnable action = () -> {
    System.out.printf("Action! (%f sec.)%n", elapsedTime.getAsDouble());
};

final var barrier = new CyclicBarrier(3, action);
System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(2);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start : waiting = %d (%f sec.)%n",
                        id, barrier.getNumberWaiting(), elapsedTime.getAsDouble());

                final var ret = barrier.await();
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// 結果
// ↓
//parties = 3 (0.001453 sec.)
//1 : task start : waiting = 0 (2.006531 sec.)
//2 : task start : waiting = 1 (4.011152 sec.)
//3 : task start : waiting = 2 (6.013911 sec.)
//Action! (6.014346 sec.)
//3 : await ret = 0 (6.014478 sec.)
//2 : await ret = 1 (6.014531 sec.)
//1 : await ret = 2 (6.014516 sec.)

関連記事 :


コンストラクタ

CyclicBarrier (int parties)

指定された数のパーティ(スレッド)が待機状態にある場合にトリップする、新しいCyclicBarrierを作成します。バリアーのトリップ時に、定義済みのアクションは実行されません。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var barrier = new CyclicBarrier(3);
System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(2);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start : waiting = %d (%f sec.)%n",
                        id, barrier.getNumberWaiting(), elapsedTime.getAsDouble());

                final var ret = barrier.await();
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// 結果
// ↓
//parties = 3 (0.000953 sec.)
//1 : task start : waiting = 0 (2.014584 sec.)
//2 : task start : waiting = 1 (4.022366 sec.)
//3 : task start : waiting = 2 (6.033646 sec.)
//3 : await ret = 0 (6.034173 sec.)
//2 : await ret = 1 (6.034236 sec.)
//1 : await ret = 2 (6.034214 sec.)

CyclicBarrier (int parties, Runnable barrierAction)

指定された数のパーティ(スレッド)が待機状態にある場合にトリップする、新しいCyclicBarrierを作成します。バリアーのトリップ時に、指定されたバリアー・アクションが、最後にバリアーに入ったスレッドにより実行されます。

このメソッドの使用例は、await() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

メソッド

int await ()

すべてのパーティがこのバリアーでawaitを呼び出すまで待機します。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final Runnable action = () -> {
    System.out.printf("Action! (%f sec.)%n", elapsedTime.getAsDouble());
};

final var barrier = new CyclicBarrier(3, action);
System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(2);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start : waiting = %d (%f sec.)%n",
                        id, barrier.getNumberWaiting(), elapsedTime.getAsDouble());

                final var ret = barrier.await();
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// 結果
// ↓
//parties = 3 (0.001453 sec.)
//1 : task start : waiting = 0 (2.006531 sec.)
//2 : task start : waiting = 1 (4.011152 sec.)
//3 : task start : waiting = 2 (6.013911 sec.)
//Action! (6.014346 sec.)
//3 : await ret = 0 (6.014478 sec.)
//2 : await ret = 1 (6.014531 sec.)
//1 : await ret = 2 (6.014516 sec.)
final var barrier = new CyclicBarrier(3);
System.out.println("parties = " + barrier.getParties());

try (final var executor = Executors.newFixedThreadPool(3)) {

    final var futureA = executor.submit(() -> {
        try {
            System.out.println("A : task start");
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("A : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    executor.submit(() -> {
        try {
            System.out.println("B : task start");
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("B : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    System.out.println("task A cancel!");

    final var ret = futureA.cancel(true);
    System.out.println("cancelled = " + ret);
}

// 結果
// ↓
//parties = 3
//A : task start
//B : task start
//task A cancel!
//A : InterruptedException
//B : BrokenBarrierException
//cancelled = true

int await (long timeout, TimeUnit unit)

すべてのパーティがこのバリアーでawaitを呼び出すか、指定された待機時間が経過するまで待機します。

timeoutunit パラメータ以外については await() の使用例をご参照ください。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final Runnable action = () -> {
    System.out.printf("Action! (%f sec.)%n", elapsedTime.getAsDouble());
};

final var barrier = new CyclicBarrier(3, action);
final var timeout = 5;

System.out.printf("parties = %d (%f sec.)%n", barrier.getParties(), elapsedTime.getAsDouble());
System.out.println("timeout = " + timeout + " sec.");
System.out.println("isBroken = " + barrier.isBroken());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 2; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            try {
                System.out.printf("%d : task start (%f sec.)%n", id, elapsedTime.getAsDouble());

                final var ret = barrier.await(timeout, TimeUnit.SECONDS);
                System.out.printf("%d : await ret = %d (%f sec.)%n",
                        id, ret, elapsedTime.getAsDouble());
            } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
                System.out.printf("%d : %s : isBroken = %b (%f sec.)%n",
                        id, e.getClass().getSimpleName(), barrier.isBroken(), elapsedTime.getAsDouble());
            }
        });
    }
}

// 結果
// ↓
//parties = 3 (0.001176 sec.)
//timeout = 5 sec.
//isBroken = false
//1 : task start (0.117036 sec.)
//2 : task start (0.224736 sec.)
//1 : TimeoutException : isBroken = true (5.127195 sec.)
//2 : BrokenBarrierException : isBroken = true (5.127217 sec.)

int getNumberWaiting ()

バリアーで現在待機しているパーティの数を返します。

このメソッドの使用例は、await() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

int getParties ()

このバリアーのトリップに必要なパーティの数を返します。

final var barrier = new CyclicBarrier(3);
System.out.println("parties = " + barrier.getParties());

try (final var executor = Executors.newFixedThreadPool(3)) {
    for (int i = 1; i <= 3; i++) {
        final var id = i;
        TimeUnit.SECONDS.sleep(1);

        executor.submit(() -> {
            try {
                System.out.println(id + " : task start : parties = " + barrier.getParties());

                final var ret = barrier.await();
                System.out.println(id + " : await ret = " + ret);
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.println(e.getClass().getSimpleName());
            }
        });
    }
}

// 結果
// ↓
//parties = 3
//1 : task start : parties = 3
//2 : task start : parties = 3
//3 : task start : parties = 3
//3 : await ret = 0
//2 : await ret = 1
//1 : await ret = 2

boolean isBroken ()

このバリアーが故障状態にあるかどうかを問い合わせます。

このメソッドの使用例は、await(long timeout, TimeUnit unit) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

void reset ()

バリアーを初期状態にリセットします。

final var barrier = new CyclicBarrier(3);
System.out.println("parties = " + barrier.getParties());

try (final var executor = Executors.newFixedThreadPool(3)) {

    executor.submit(() -> {
        try {
            System.out.println("A : task start : waiting = " + barrier.getNumberWaiting());
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("A : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    executor.submit(() -> {
        try {
            System.out.println("B : task start : waiting = " + barrier.getNumberWaiting());
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println("B : " + e.getClass().getSimpleName());
        }
    });

    TimeUnit.SECONDS.sleep(1);

    System.out.println("reset!");
    barrier.reset();
}

// 結果
// ↓
//parties = 3
//A : task start : waiting = 0
//A : task start : waiting = 1
//reset!
//A : BrokenBarrierException
//B : BrokenBarrierException

関連記事

ページの先頭へ