Java : Flow.Publisher (Reactive Streams) with Examples
Flow.Publisher (Java SE 21 & JDK 21) with Examples.
You will find code examples on most Flow.Publisher<T> methods.
Summary
A producer of items (and related control messages) received by Subscribers. Each current Flow.Subscriber receives the same items (via method onNext) in the same order, unless drops or errors are encountered.
Please see also : Reactive Streams
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var submission = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final Flow.Publisher<String> publisher = submission;
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
submission.submit("abc");
submission.submit("123");
submission.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
Methods
void subscribe (Flow.Subscriber<? super T> subscriber)
Adds the given Subscriber if possible.
class MySubscriber implements Flow.Subscriber<String> {
private final String name;
MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(name + " : onSubscribe");
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println(name + " : onNext item = " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println(name + " : onError");
}
@Override
public void onComplete() {
System.out.println(name + " : onComplete");
}
@Override
public String toString() {
return "name = " + name;
}
}
try (final var submission = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final Flow.Publisher<String> publisher = submission;
publisher.subscribe(new MySubscriber("A"));
publisher.subscribe(new MySubscriber("B"));
publisher.subscribe(new MySubscriber("C"));
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
submission.submit("abc");
submission.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//-- subscribe --
//A : onSubscribe
//B : onSubscribe
//C : onSubscribe
//-- submit --
//A : onNext item = abc
//B : onNext item = abc
//C : onNext item = abc
//A : onNext item = XYZ
//B : onNext item = XYZ
//C : onNext item = XYZ
//-- close --
//A : onComplete
//B : onComplete
//C : onComplete