Java : Flow.Publisher (Reactive Streams) with Examples

Flow.Publisher (Java SE 21 & JDK 21) with Examples.
You will find code examples on most Flow.Publisher<T> methods.


Summary

A producer of items (and related control messages) received by Subscribers. Each current Flow.Subscriber receives the same items (via method onNext) in the same order, unless drops or errors are encountered.

Class diagram

Please see also : Reactive Streams

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var submission = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    final Flow.Publisher<String> publisher = submission;
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    submission.submit("abc");
    submission.submit("123");
    submission.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

Methods

void subscribe (Flow.Subscriber<? super T> subscriber)

Adds the given Subscriber if possible.

class MySubscriber implements Flow.Subscriber<String> {
    private final String name;

    MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(name + " : onSubscribe");
        subscription.request(2);
    }

    @Override
    public void onNext(String item) {
        System.out.println(name + " : onNext item = " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(name + " : onError");
    }

    @Override
    public void onComplete() {
        System.out.println(name + " : onComplete");
    }

    @Override
    public String toString() {
        return "name = " + name;
    }
}

try (final var submission = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    final Flow.Publisher<String> publisher = submission;
    publisher.subscribe(new MySubscriber("A"));
    publisher.subscribe(new MySubscriber("B"));
    publisher.subscribe(new MySubscriber("C"));

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    submission.submit("abc");
    submission.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//-- subscribe --
//A : onSubscribe
//B : onSubscribe
//C : onSubscribe
//-- submit --
//A : onNext item = abc
//B : onNext item = abc
//C : onNext item = abc
//A : onNext item = XYZ
//B : onNext item = XYZ
//C : onNext item = XYZ
//-- close --
//A : onComplete
//B : onComplete
//C : onComplete

Related posts

To top of page