広告

Java : Flow.Subscription (Reactive Streams) - API使用例

Flow.Subscription (Java SE 20 & JDK 20) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。


概要

Flow.PublisherとFlow.Subscriberをリンクするメッセージ制御。 サブスクライブ者はリクエストがあった場合にのみ商品を受け取り、いつでも取り消すことができます。 このインタフェースのメソッドは、サブスクライバによってのみ呼び出されるようになっています。他の文脈での使用法には未定義の効果があります。

クラス構成

Flow.Subscription は、Reactive Streams において Subscription の役割を持つインタフェースです。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// 結果
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

メソッド

void cancel ()

(eventually)がメッセージの受信を停止するように、サブスクライバに指示します。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("onSubscribe");
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    void request(long n) {
        if (subscription != null) {
            subscription.request(n);
        }
    }

    void cancel() {
        if (subscription != null) {
            subscription.cancel();
        }
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit (a1, a2) --");

    publisher.submit("a1");
    publisher.submit("a2");

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- request 3 --");
    subscriber.request(3);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- cancel --");
    subscriber.cancel();

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit (a3, a4) --");

    publisher.submit("a3");
    publisher.submit("a4");

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- close --");
}

// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit (a1, a2) --
//-- request 3 --
//onNext : a1
//onNext : a2
//-- cancel --
//-- submit (a3, a4) --
//-- close --

void request (long n)

指定された番号のnを、このサブスクリプションの現在の満たされていない需要に追加します。

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("onSubscribe");
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    void request(long n) {
        if (subscription != null) {
            subscription.request(n);
        }
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit (a1, a2, a3, a4) --");

    publisher.submit("a1");
    publisher.submit("a2");
    publisher.submit("a3");
    publisher.submit("a4");

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- request 1 --");
    subscriber.request(1);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- request 2 --");
    subscriber.request(2);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- request 3 --");
    subscriber.request(3);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit (a5, a6) --");

    publisher.submit("a5");
    publisher.submit("a6");

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- close --");
}

// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit (a1, a2, a3, a4) --
//-- request 1 --
//onNext : a1
//-- request 2 --
//onNext : a2
//onNext : a3
//-- request 3 --
//onNext : a4
//-- submit (a5, a6) --
//onNext : a5
//onNext : a6
//-- close --
//onComplete

関連記事

ページの先頭へ