広告

Java : Exchanger (同期) - API使用例

Exchanger (Java SE 19 & JDK 19) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様のおともにどうぞ。


概要

スレッドをペアにして、ペア内の要素を交換できる同期ポイント。 各スレッドは、exchangeメソッドに入ると何らかのオブジェクトを提供し、パートナ・スレッドと照合して、復帰時にそのパートナのオブジェクトを受け取ります。

クラス構成

Exchanger クラスを使うと、2つのスレッド(ペア) で安全(スレッドセーフ) に要素を交換することができます。

例えば、ペアとして A スレッドと B スレッドがあるとします。

  1. A スレッドが 要素x を渡すために exchange メソッドを呼び出すと、相手(B)の準備はまだなのでメソッドはそのまま待機します。
  2. B スレッドが 要素y を渡すために exchange メソッドを呼び出すと、相手(A)の準備はできているので、A, B のメソッドは復帰します。
    1. A スレッドはメソッドの戻り値として、要素y を受け取ります。
    2. B スレッドはメソッドの戻り値として、要素x を受け取ります。
// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var exchanger = new Exchanger<List<String>>();
try (final var executor = Executors.newFixedThreadPool(2)) {

    executor.submit(() -> {
        try {
            final var x = List.of("a", "b", "c");
            System.out.printf("A : task start : x = %s (%f sec.)%n", x, elapsedTime.getAsDouble());

            final var ret = exchanger.exchange(x);

            System.out.printf("A : exchange OK! : ret = %s (%f sec.)%n",
                    ret, elapsedTime.getAsDouble());

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(2);

    executor.submit(() -> {
        try {
            final var x = List.of("X", "Y", "Z");
            System.out.printf("B : task start : x = %s (%f sec.)%n", x, elapsedTime.getAsDouble());

            final var ret = exchanger.exchange(x);

            System.out.printf("B : exchange OK! : ret = %s (%f sec.)%n",
                    ret, elapsedTime.getAsDouble());

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });
}

// 結果
// ↓
//A : task start : x = [a, b, c] (0.002765 sec.)
//B : task start : x = [X, Y, Z] (2.019055 sec.)
//B : exchange OK! : ret = [a, b, c] (2.019486 sec.)
//A : exchange OK! : ret = [X, Y, Z] (2.019660 sec.)

関連記事 :


コンストラクタ

Exchanger ()

新しいExchangerを作成します。

このメソッドの使用例は、exchange(V x) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

メソッド

V exchange (V x)

現在のスレッドは、割り込みが発生しないかぎり、もう一方のスレッドがこの交換ポイントに達するまで待機し、指定されたオブジェクトをそのスレッドに転送して、代わりにもう一方のスレッドのオブジェクトを受け取ります。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var exchanger = new Exchanger<List<String>>();
try (final var executor = Executors.newFixedThreadPool(2)) {

    executor.submit(() -> {
        try {
            final var x = List.of("a", "b", "c");
            System.out.printf("A : task start : x = %s (%f sec.)%n", x, elapsedTime.getAsDouble());

            final var ret = exchanger.exchange(x);

            System.out.printf("A : exchange OK! : ret = %s (%f sec.)%n",
                    ret, elapsedTime.getAsDouble());

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(2);

    executor.submit(() -> {
        try {
            final var x = List.of("X", "Y", "Z");
            System.out.printf("B : task start : x = %s (%f sec.)%n", x, elapsedTime.getAsDouble());

            final var ret = exchanger.exchange(x);

            System.out.printf("B : exchange OK! : ret = %s (%f sec.)%n",
                    ret, elapsedTime.getAsDouble());

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });
}

// 結果
// ↓
//A : task start : x = [a, b, c] (0.002765 sec.)
//B : task start : x = [X, Y, Z] (2.019055 sec.)
//B : exchange OK! : ret = [a, b, c] (2.019486 sec.)
//A : exchange OK! : ret = [X, Y, Z] (2.019660 sec.)
final var exchanger = new Exchanger<List<String>>();
try (final var executor = Executors.newFixedThreadPool(2)) {

    final var future = executor.submit(() -> {
        try {
            final var x = List.of("a", "b", "c");
            System.out.println("A : task start : x = " + x);

            final var ret = exchanger.exchange(x);
            System.out.println("A : exchange OK! : ret = " + ret);

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(2);

    System.out.println("future : cancel!");

    final var ret = future.cancel(true);
    System.out.println("cancelled = " + ret);
}

// 結果
// ↓
//A : task start : x = [a, b, c]
//future : cancel!
//InterruptedException!
//cancelled = true

V exchange (V x, long timeout, TimeUnit unit)

現在のスレッドは、割り込みが発生するか、指定された待機時間が経過しないかぎり、もう一方のスレッドがこの交換ポイントに達するまで待機し、指定されたオブジェクトをそのスレッドに転送して、代わりにもう一方のスレッドのオブジェクトを受け取ります。

timeoutunit パラメータ以外については exchange(V x) の使用例をご参照ください。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var exchanger = new Exchanger<List<String>>();
try (final var executor = Executors.newFixedThreadPool(2)) {

    executor.submit(() -> {
        try {
            final var x = List.of("a", "b", "c");
            System.out.printf("A : task start : x = %s (%f sec.)%n", x, elapsedTime.getAsDouble());

            final var ret = exchanger.exchange(x, 5, TimeUnit.SECONDS);

            System.out.printf("A : exchange OK! : ret = %s (%f sec.)%n",
                    ret, elapsedTime.getAsDouble());

        } catch (InterruptedException | TimeoutException e) {
            System.out.printf("A : %s (%f sec.)%n",
                    e.getClass().getSimpleName(), elapsedTime.getAsDouble());
        }
    });
}

// 結果
// ↓
//A : task start : x = [a, b, c] (0.002542 sec.)
//A : TimeoutException (5.018461 sec.)

関連記事

ページの先頭へ