Java : Flow.Subscriber (Reactive Streams) with Examples
Flow.Subscriber (Java SE 20 & JDK 20) with Examples.
You will find code examples on most Flow.Subscriber methods.
Summary
Please see also : Reactive Streams
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
Methods
void onComplete ()
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(3);
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//onNext : 123
//onNext : XYZ
//-- close --
//onComplete
void onError (Throwable throwable)
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(3);
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
if ("123".equals(item)) {
throw new IllegalArgumentException("Error!");
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//onNext : 123
//onError : java.lang.IllegalArgumentException: Error!
//-- close --
void onNext (T item)
Please see onComplete().
void onSubscribe (Flow.Subscription subscription)
Please see onComplete().
Related posts
- API Examples