Java : Flow.Subscription (Reactive Streams) - API使用例
Flow.Subscription (Java SE 20 & JDK 20) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。
概要
Flow.Subscription は、Reactive Streams において Subscription の役割を持つインタフェースです。
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
メソッド
void cancel ()
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
void request(long n) {
if (subscription != null) {
subscription.request(n);
}
}
void cancel() {
if (subscription != null) {
subscription.cancel();
}
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit (a1, a2) --");
publisher.submit("a1");
publisher.submit("a2");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- request 3 --");
subscriber.request(3);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- cancel --");
subscriber.cancel();
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit (a3, a4) --");
publisher.submit("a3");
publisher.submit("a4");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit (a1, a2) --
//-- request 3 --
//onNext : a1
//onNext : a2
//-- cancel --
//-- submit (a3, a4) --
//-- close --
void request (long n)
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
void request(long n) {
if (subscription != null) {
subscription.request(n);
}
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit (a1, a2, a3, a4) --");
publisher.submit("a1");
publisher.submit("a2");
publisher.submit("a3");
publisher.submit("a4");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- request 1 --");
subscriber.request(1);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- request 2 --");
subscriber.request(2);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- request 3 --");
subscriber.request(3);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit (a5, a6) --");
publisher.submit("a5");
publisher.submit("a6");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit (a1, a2, a3, a4) --
//-- request 1 --
//onNext : a1
//-- request 2 --
//onNext : a2
//onNext : a3
//-- request 3 --
//onNext : a4
//-- submit (a5, a6) --
//onNext : a5
//onNext : a6
//-- close --
//onComplete