Java : Flow.Processor (Reactive Streams) with Examples
Flow.Processor (Java SE 21 & JDK 21) with Examples.
You will find code examples on most Flow.Processor methods.
Summary
Please see also : Reactive Streams
class UpperCaseProcessor extends SubmissionPublisher<String>
implements Flow.Processor<String, String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Processor.onSubscribe");
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println("Processor.onNext : " + item);
submit(item.toUpperCase());
}
@Override
public void onError(Throwable ex) {
System.out.println("Processor.onError");
closeExceptionally(ex);
}
@Override
public void onComplete() {
System.out.println("Processor.onComplete");
close();
}
}
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscriber.onSubscribe");
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println("Subscriber.onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Subscriber.onError");
}
@Override
public void onComplete() {
System.out.println("Subscriber.onComplete");
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
final var processor = new UpperCaseProcessor();
publisher.subscribe(processor);
processor.subscribe(new MySubscriber());
publisher.submit("abc");
publisher.submit("xyz");
TimeUnit.SECONDS.sleep(1);
}
// Result
// ↓
//Processor.onSubscribe
//Subscriber.onSubscribe
//Processor.onNext : abc
//Processor.onNext : xyz
//Subscriber.onNext : ABC
//Subscriber.onNext : XYZ
//Processor.onComplete
//Subscriber.onComplete