Java : Flow.Processor (Reactive Streams) - API使用例
Flow.Processor (Java SE 21 & JDK 21) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。
概要
Flow.Processor は、Reactive Streams において Processor の役割を持つインタフェースです。
class UpperCaseProcessor extends SubmissionPublisher<String>
implements Flow.Processor<String, String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Processor.onSubscribe");
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println("Processor.onNext : " + item);
// 大文字に変換
submit(item.toUpperCase());
}
@Override
public void onError(Throwable ex) {
System.out.println("Processor.onError");
closeExceptionally(ex);
}
@Override
public void onComplete() {
System.out.println("Processor.onComplete");
close();
}
}
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscriber.onSubscribe");
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println("Subscriber.onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Subscriber.onError");
}
@Override
public void onComplete() {
System.out.println("Subscriber.onComplete");
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
final var processor = new UpperCaseProcessor();
publisher.subscribe(processor);
processor.subscribe(new MySubscriber());
publisher.submit("abc");
publisher.submit("xyz");
TimeUnit.SECONDS.sleep(1);
}
// 結果
// ↓
//Processor.onSubscribe
//Subscriber.onSubscribe
//Processor.onNext : abc
//Processor.onNext : xyz
//Subscriber.onNext : ABC
//Subscriber.onNext : XYZ
//Processor.onComplete
//Subscriber.onComplete
Flow.Publisherで宣言されたメソッド
subscribe
「Java API 使用例 : Flow.Publisher」をご参照ください。
Flow.Subscriberで宣言されたメソッド
onComplete, onError, onNext, onSubscribe
「Java API 使用例 : Flow.Subscriber」をご参照ください。