広告

Java : Flow.Subscriber (Reactive Streams) - API使用例

Flow.Subscriber (Java SE 20 & JDK 20) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。


概要

メッセージのレシーバ。 このインタフェースのメソッドは、Flow.Subscriptionごとに厳密な順序で呼び出されます。

クラス構成

Flow.Subscriber は、Reactive Streams において Subscriber の役割を持つインタフェースです。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

メソッド

void onComplete ()

他のSubscriberメソッドがサブスクリプションによって呼び出されていないことが判明した場合に呼び出されるメソッド。

class MySubscriber implements Flow.Subscriber<String> {

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("onSubscribe");
        subscription.request(3);
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//onNext : 123
//onNext : XYZ
//-- close --
//onComplete

void onError (Throwable throwable)

パブリッシャまたはサブスクリプションで遭遇した回復不可能なエラーの際に呼び出されるメソッド。その後はサブスクリプションによって他のサブスクライバ・メソッドが呼び出されません。

class MySubscriber implements Flow.Subscriber<String> {

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("onSubscribe");
        subscription.request(3);
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext : " + item);

        if ("123".equals(item)) {
            throw new IllegalArgumentException("Error!");
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//onNext : 123
//onError : java.lang.IllegalArgumentException: Error!
//-- close --

void onNext (T item)

サブスクリプションの次のアイテムで呼び出されるメソッド。

このメソッドの使用例は、onComplete() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

void onSubscribe (Flow.Subscription subscription)

指定されたサブスクリプションの他のサブスクライバ・メソッドを呼び出す前に呼び出されるメソッド。

このメソッドの使用例は、onComplete() にまとめて記載しました。
そちらのAPI使用例をご参照ください。


関連記事

ページの先頭へ