Java : Flow.Subscriber (Reactive Streams) with Examples

Flow.Subscriber (Java SE 20 & JDK 20) with Examples.
You will find code examples on most Flow.Subscriber methods.


Summary

A receiver of messages. The methods in this interface are invoked in strict sequential order for each Flow.Subscription.

Class diagram

Please see also : Reactive Streams

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

Methods

void onComplete ()

Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.

class MySubscriber implements Flow.Subscriber<String> {

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("onSubscribe");
        subscription.request(3);
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//onNext : 123
//onNext : XYZ
//-- close --
//onComplete

void onError (Throwable throwable)

Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.

class MySubscriber implements Flow.Subscriber<String> {

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("onSubscribe");
        subscription.request(3);
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext : " + item);

        if ("123".equals(item)) {
            throw new IllegalArgumentException("Error!");
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//onNext : 123
//onError : java.lang.IllegalArgumentException: Error!
//-- close --

void onNext (T item)

Method invoked with a Subscription's next item.

Please see onComplete().

void onSubscribe (Flow.Subscription subscription)

Method invoked prior to invoking any other Subscriber methods for the given Subscription.

Please see onComplete().


Related posts

To top of page