広告

Java : SubmissionPublisher (Reactive Streams) - API使用例

SubmissionPublisher (Java SE 21 & JDK 21) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。


概要

(non-null)アイテムを非同期で発行するFlow.Publisherは、現在のサブスクライバが閉じられるまでそれを発行します。 現在の各サブスクライバは、ドロップまたは例外が発生しない限り、新しく送信されたアイテムを同じ順序で受信します。

クラス構成

SubmissionPublisher は、Reactive Streams における Publisher の実装です。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

コンストラクタ

SubmissionPublisher ()

サブスクライバ(少なくとも2つの並列処理レベルをサポートしていない場合を除き、この場合、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()を使用し、Flow.defaultBufferSize()の最大バッファ容量を使用し、メソッドonNextのサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。

try (final var publisher = new SubmissionPublisher<String>()) {

    final var executor = publisher.getExecutor();
    System.out.println(executor.getClass().getSimpleName()); // ForkJoinPool

    final var capacity = publisher.getMaxBufferCapacity();
    System.out.println(capacity); // 256
}

SubmissionPublisher (Executor executor, int maxBufferCapacity)

指定されたExecutorを使用してサブスクライバに非同期に配信するための新しいSubmissionPublisherを作成します。各サブスクライバの最大バッファ・サイズは指定されており、メソッドonNextのサブスクライバ例外ハンドラはありません。

try (final var executor = Executors.newFixedThreadPool(10)) {
    try (final var publisher = new SubmissionPublisher<String>(executor, 1024)) {

        final var executorName = publisher.getExecutor().getClass().getSimpleName();
        System.out.println(executorName); // ThreadPoolExecutor

        final var capacity = publisher.getMaxBufferCapacity();
        System.out.println(capacity); // 1024
    }
}

SubmissionPublisher (Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)

各サブスクライバに対して指定された最大バッファ・サイズでサブスクライバへの非同期配信用に、指定されたExecutorを使用して新しいSubmissionPublisherを作成し、非nullの場合、サブスクライバがonNextメソッドで例外をスローしたときに呼び出されます。

handler 以外のパラメータについては、SubmissionPublisher(Executor executor, int maxBufferCapacity) の使用例をご参照ください。

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), 256, (subscriber, throwable) -> {
    System.out.println("handler : " + throwable);
})
) {
    System.out.println("-- subscribe --");

    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
            subscription.request(1);
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
            throw new IllegalStateException("Error!");
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit --");
    publisher.submit("abc");

    TimeUnit.SECONDS.sleep(1);
}

// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//handler : java.lang.IllegalStateException: Error!
//onError : java.lang.IllegalStateException: Error!

メソッド

void close ()

すでにクローズされていない限り、onCompleteは現在のサブスクライバに信号を送り、後続のパブリッシュを許可しません。

可能であれば try-with-resources文 を使うことをおすすめします。

try (final var publisher = new SubmissionPublisher<String>()) {
    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
}

// 結果
// ↓
//onSubscribe
//onComplete

try-with-resources文を使わない例です。

final var publisher = new SubmissionPublisher<String>();
try {
    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
} finally {
    publisher.close();
}

// 結果
// ↓
//onSubscribe
//onComplete

void closeExceptionally (Throwable error)

すでにクローズされていない限り、onErrorは指定されたエラーを持つ現在のサブスクライバに通知し、その後の発行を許可しません。

try (final var publisher = new SubmissionPublisher<String>()) {
    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });

    publisher.closeExceptionally(new IllegalStateException("Error!"));

    TimeUnit.SECONDS.sleep(1);

    final var ret = publisher.getClosedException();
    System.out.println("closedException : " + ret);

    TimeUnit.SECONDS.sleep(1);
}

// 結果
// ↓
//onSubscribe
//onError : java.lang.IllegalStateException: Error!
//closedException : java.lang.IllegalStateException: Error!

CompletableFuture<Void> consume (Consumer<? super T> consumer)

指定したコンシューマ関数を使用して、公開されたすべてのアイテムを処理します。

try (final var publisher = new SubmissionPublisher<String>()) {

    final var future = publisher.consume((v) -> {
        System.out.println("  consume : " + v);
    });

    System.out.println("-- submit --");
    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    future.thenRun(() -> {
        System.out.println("  thenRun");
    });

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//-- submit --
//  consume : abc
//  consume : 123
//  consume : XYZ
//-- close --
//  thenRun

int estimateMaximumLag ()

現在のすべてのサブスクライバ間で生成されたがまだ消費されていないアイテムの最大数の見積もりを返します。

class MySubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("  onSubscribe");
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("  onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("  onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("  onComplete");
    }

    public void request(long n) {
        subscription.request(n);
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit 3 --");
    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- request 1 --");
    subscriber.request(1);
    TimeUnit.SECONDS.sleep(1);

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- request 2 --");
    subscriber.request(2);
    TimeUnit.SECONDS.sleep(1);

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- request 3 --");
    subscriber.request(3);
    TimeUnit.SECONDS.sleep(1);

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- close --");
}

// 結果
// ↓
//-- subscribe --
//  onSubscribe
//-- submit 3 --
//lag = 3
//demand = -3
//-- request 1 --
//  onNext : abc
//lag = 2
//demand = -2
//-- request 2 --
//  onNext : 123
//  onNext : XYZ
//lag = 0
//demand = 0
//-- request 3 --
//lag = 0
//demand = 3
//-- close --
//  onComplete

long estimateMinimumDemand ()

すべての現在のサブスクライバの中でリクエスト(request経由で)されたがまだ生成されていないアイテムの最小数の見積もりを返します。

このメソッドの使用例は、estimateMaximumLag() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

Throwable getClosedException ()

closeExceptionallyに関連付けられた例外を返します。閉じていない場合、または正常に終了した場合はnullを返します。

このメソッドの使用例は、closeExceptionally(Throwable error) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

Executor getExecutor ()

非同期配信に使用されるExecutorを返します。

このメソッドの使用例は、SubmissionPublisher(Executor executor, int maxBufferCapacity) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

int getMaxBufferCapacity ()

サブスクライバごとの最大バッファ容量を返します。

このメソッドの使用例は、SubmissionPublisher(Executor executor, int maxBufferCapacity) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

int getNumberOfSubscribers ()

現在のサブスクライバの数を返します。

class MySubscriber implements Flow.Subscriber<String> {
    private final String name;

    MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
    }

    @Override
    public void onNext(String item) {
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }

    @Override
    public String toString() {
        return "name = " + name;
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println(publisher.hasSubscribers()); // false
    System.out.println(publisher.getNumberOfSubscribers()); // 0

    publisher.subscribe(new MySubscriber("abc"));

    System.out.println(publisher.hasSubscribers()); // true
    System.out.println(publisher.getNumberOfSubscribers()); // 1

    publisher.subscribe(new MySubscriber("123"));

    System.out.println(publisher.hasSubscribers()); // true
    System.out.println(publisher.getNumberOfSubscribers()); // 2

    publisher.subscribe(new MySubscriber("XYZ"));

    System.out.println(publisher.hasSubscribers()); // true
    System.out.println(publisher.getNumberOfSubscribers()); // 3

    System.out.println("-- subscribers --");
    for (final var subscriber : publisher.getSubscribers()) {
        System.out.println(subscriber);
    }

    // 結果
    // ↓
    //-- subscribers --
    //name = abc
    //name = 123
    //name = XYZ
}

List<Flow.Subscriber<? super T>> getSubscribers ()

サブスクライバでFlow.Subscriberメソッドを呼び出すためではなく、モニタリングとトラッキングのために現在のサブスクライバのリストを返します。

このメソッドの使用例は、getNumberOfSubscribers() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

boolean hasSubscribers ()

このパブリッシャにサブスクライバがある場合はtrueを返します。

このメソッドの使用例は、getNumberOfSubscribers() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

boolean isClosed ()

このサイト運営者が投稿を受け入れていない場合はtrueを返します。

final var publisher = new SubmissionPublisher<String>();
try (publisher) {
    System.out.println(publisher.isClosed()); // false
}
System.out.println(publisher.isClosed()); // true

boolean isSubscribed (Flow.Subscriber<? super T> subscriber)

指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。

class MySubscriber implements Flow.Subscriber<String> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
    }

    @Override
    public void onNext(String item) {
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    final var subscriberA = new MySubscriber();
    final var subscriberB = new MySubscriber();

    System.out.println(publisher.isSubscribed(subscriberA)); // false
    System.out.println(publisher.isSubscribed(subscriberB)); // false

    publisher.subscribe(subscriberA);

    System.out.println(publisher.isSubscribed(subscriberA)); // true
    System.out.println(publisher.isSubscribed(subscriberB)); // false

    publisher.subscribe(subscriberB);

    System.out.println(publisher.isSubscribed(subscriberA)); // true
    System.out.println(publisher.isSubscribed(subscriberB)); // true
}

int offer (T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

指定されたアイテムを、可能であれば現在の各サブスクライバに公開します。これは、onNextメソッドを非同期的に呼び出し、任意のサブスクリプションのリソースを使用できなくしてもブロックします。指定されたタイムアウトまで、またはコールアウト・スレッドが中断するまで、指定されたハンドラ(非nullの場合)が呼び出され、trueが返された場合は1回再試行します。

timeout, unit パラメータ以外については offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) の使用例もご参照ください。

// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("  onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }

    public void request(long n) {
        subscription.request(n);
    }
}

final int capacity = 2;
System.out.println("capacity = " + capacity);

final var items = List.of("a1", "a2", "a3", "a4", "a5");

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), capacity)) {

    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    for (final var item : items) {
        System.out.println("---- item = " + item + " ----");
        System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());

        final var ret = publisher.offer(item, 1, TimeUnit.SECONDS, (sub, i) -> {
            System.out.printf("  onDrop item = %s (%.3f sec.)%n",
                    i, elapsedTime.getAsDouble());
            return false;
        });
        System.out.printf("offer ret = %d (%.3f sec.)%n%n",
                ret, elapsedTime.getAsDouble());
    }

    System.out.println("---- request ----");
    subscriber.request(5);

    TimeUnit.SECONDS.sleep(1);
}

// 結果
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.010 sec.)
//offer ret = 2 (0.010 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
//  onDrop item = a3 (1.023 sec.)
//offer ret = -1 (1.023 sec.)
//
//---- item = a4 ----
//offer (1.023 sec.)
//  onDrop item = a4 (2.034 sec.)
//offer ret = -1 (2.034 sec.)
//
//---- item = a5 ----
//offer (2.034 sec.)
//  onDrop item = a5 (3.039 sec.)
//offer ret = -1 (3.039 sec.)
//
//---- request ----
//  onNext : a1
//  onNext : a2

int offer (T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

onNextメソッドを非同期に呼び出すことによって、可能であれば、指定されたアイテムを各現在のサブスクライバにパブリッシュします。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- offer --");

    publisher.offer("abc", null);
    publisher.offer("123", null);
    publisher.offer("XYZ", null);

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- offer --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)
// 基準となる時刻
final long current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("  onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }

    public void request(long n) {
        subscription.request(n);
    }
}

final int capacity = 2;
System.out.println("capacity = " + capacity);

final var items = List.of("a1", "a2", "a3", "a4", "a5");

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), capacity)) {

    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    for (final var item : items) {
        System.out.println("---- item = " + item + " ----");
        System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());

        final var ret = publisher.offer(item, (sub, i) -> {
            System.out.printf("  onDrop item = %s (%.3f sec.)%n",
                    i, elapsedTime.getAsDouble());
            return false;
        });
        System.out.printf("offer ret = %d (%.3f sec.)%n%n",
                ret, elapsedTime.getAsDouble());
    }

    System.out.println("---- request ----");
    subscriber.request(5);

    TimeUnit.SECONDS.sleep(1);
}

// 結果
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.009 sec.)
//offer ret = 2 (0.009 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
//  onDrop item = a3 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a4 ----
//offer (0.010 sec.)
//  onDrop item = a4 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a5 ----
//offer (0.011 sec.)
//  onDrop item = a5 (0.011 sec.)
//offer ret = -1 (0.011 sec.)
//
//---- request ----
//  onNext : a1
//  onNext : a2
class MySubscriber implements Flow.Subscriber<String> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(3);
    }

    @Override
    public void onNext(String item) {
        System.out.println("<- onNext : " + item);

        try {
            System.out.println(" - sleep ...");
            TimeUnit.SECONDS.sleep(1);

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        } finally {
            System.out.println("-> onNext end");
        }
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }
}

final int capacity = 1;
System.out.println("capacity = " + capacity);

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), capacity)) {

    publisher.subscribe(new MySubscriber());

    publisher.offer("abc", (subscriber, item) -> {
        System.out.println("** onDrop item = " + item);
        return true;
    });

    publisher.offer("123", (subscriber, item) -> {
        System.out.println("** onDrop item = " + item);
        return true;
    });

    publisher.offer("XYZ", (subscriber, item) -> {
        System.out.println("** onDrop item = " + item);
        try {
            System.out.println("** sleep ...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("** sleep end");

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }

        return true;
    });

    TimeUnit.SECONDS.sleep(5);
}

// 結果
// ↓
//capacity = 1
//<- onNext : abc
// - sleep ...
//** onDrop item = XYZ
//** sleep ...
//-> onNext end
//<- onNext : 123
// - sleep ...
//-> onNext end
//** sleep end
//<- onNext : XYZ
// - sleep ...
//-> onNext end

int submit (T item)

onNextメソッドを非同期に呼び出すことによって、現在の各サブスクライバに指定されたアイテムをパブリッシュし、任意のサブスクライバのリソースを使用できない間中断しないようにブロックします。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)
class MySubscriber implements Flow.Subscriber<String> {

    private final String name;

    MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(2);
    }

    @Override
    public void onNext(String item) {
        System.out.println(name + " : onNext : " + item);
        try {
            System.out.println(name + " :   sleep ...");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        } finally {
            System.out.println(name + " : onNext end");
        }
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    publisher.subscribe(new MySubscriber("A"));
    publisher.subscribe(new MySubscriber("B"));

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    final var ret1 = publisher.submit("abc");
    final var ret2 = publisher.submit("XYZ");

    System.out.println("ret1 = " + ret1);
    System.out.println("ret2 = " + ret2);

    System.out.println("-- submit end --");
    TimeUnit.SECONDS.sleep(3);
}

// 結果
// ↓
//-- submit --
//ret1 = 1
//ret2 = 2
//-- submit end --
//A : onNext : abc
//B : onNext : abc
//A :   sleep ...
//B :   sleep ...
//A : onNext end
//B : onNext end
//A : onNext : XYZ
//B : onNext : XYZ
//A :   sleep ...
//B :   sleep ...
//A : onNext end
//B : onNext end

void subscribe (Flow.Subscriber<? super T> subscriber)

指定されたサブスクライバを、すでにサブスクライブしていない限り追加します。

このメソッドの使用例は、submit(T item) にまとめて記載しました。
そちらのAPI使用例をご参照ください。


関連記事

ページの先頭へ