Java : Flow.Publisher (Reactive Streams) - API使用例
Flow.Publisher (Java SE 21 & JDK 21) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。
概要
Flow.Publisher は、Reactive Streams において Publisher の役割を持つインタフェースです。
主な実装として SubmissionPublisher があります。
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var submission = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final Flow.Publisher<String> publisher = submission;
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
submission.submit("abc");
submission.submit("123");
submission.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
メソッド
void subscribe (Flow.Subscriber<? super T> subscriber)
class MySubscriber implements Flow.Subscriber<String> {
private final String name;
MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(name + " : onSubscribe");
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println(name + " : onNext item = " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println(name + " : onError");
}
@Override
public void onComplete() {
System.out.println(name + " : onComplete");
}
@Override
public String toString() {
return "name = " + name;
}
}
try (final var submission = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final Flow.Publisher<String> publisher = submission;
publisher.subscribe(new MySubscriber("A"));
publisher.subscribe(new MySubscriber("B"));
publisher.subscribe(new MySubscriber("C"));
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
submission.submit("abc");
submission.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//-- subscribe --
//A : onSubscribe
//B : onSubscribe
//C : onSubscribe
//-- submit --
//A : onNext item = abc
//B : onNext item = abc
//C : onNext item = abc
//A : onNext item = XYZ
//B : onNext item = XYZ
//C : onNext item = XYZ
//-- close --
//A : onComplete
//B : onComplete
//C : onComplete