Java : SubmissionPublisher (Reactive Streams) - API使用例
SubmissionPublisher (Java SE 21 & JDK 21) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様書のおともにどうぞ。
概要
SubmissionPublisher は、Reactive Streams における Publisher の実装です。
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
コンストラクタ
SubmissionPublisher ()
try (final var publisher = new SubmissionPublisher<String>()) {
final var executor = publisher.getExecutor();
System.out.println(executor.getClass().getSimpleName()); // ForkJoinPool
final var capacity = publisher.getMaxBufferCapacity();
System.out.println(capacity); // 256
}
SubmissionPublisher (Executor executor, int maxBufferCapacity)
try (final var executor = Executors.newFixedThreadPool(10)) {
try (final var publisher = new SubmissionPublisher<String>(executor, 1024)) {
final var executorName = publisher.getExecutor().getClass().getSimpleName();
System.out.println(executorName); // ThreadPoolExecutor
final var capacity = publisher.getMaxBufferCapacity();
System.out.println(capacity); // 1024
}
}
SubmissionPublisher (Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
handler 以外のパラメータについては、SubmissionPublisher(Executor executor, int maxBufferCapacity) の使用例をご参照ください。
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), 256, (subscriber, throwable) -> {
System.out.println("handler : " + throwable);
})
) {
System.out.println("-- subscribe --");
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
throw new IllegalStateException("Error!");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
TimeUnit.SECONDS.sleep(1);
}
// 結果
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//handler : java.lang.IllegalStateException: Error!
//onError : java.lang.IllegalStateException: Error!
メソッド
void close ()
可能であれば try-with-resources文 を使うことをおすすめします。
try (final var publisher = new SubmissionPublisher<String>()) {
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
// 結果
// ↓
//onSubscribe
//onComplete
try-with-resources文を使わない例です。
final var publisher = new SubmissionPublisher<String>();
try {
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
} finally {
publisher.close();
}
// 結果
// ↓
//onSubscribe
//onComplete
void closeExceptionally (Throwable error)
try (final var publisher = new SubmissionPublisher<String>()) {
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
publisher.closeExceptionally(new IllegalStateException("Error!"));
TimeUnit.SECONDS.sleep(1);
final var ret = publisher.getClosedException();
System.out.println("closedException : " + ret);
TimeUnit.SECONDS.sleep(1);
}
// 結果
// ↓
//onSubscribe
//onError : java.lang.IllegalStateException: Error!
//closedException : java.lang.IllegalStateException: Error!
CompletableFuture<Void> consume (Consumer<? super T> consumer)
try (final var publisher = new SubmissionPublisher<String>()) {
final var future = publisher.consume((v) -> {
System.out.println(" consume : " + v);
});
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
future.thenRun(() -> {
System.out.println(" thenRun");
});
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//-- submit --
// consume : abc
// consume : 123
// consume : XYZ
//-- close --
// thenRun
int estimateMaximumLag ()
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(" onSubscribe");
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println(" onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println(" onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println(" onComplete");
}
public void request(long n) {
subscription.request(n);
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit 3 --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- request 1 --");
subscriber.request(1);
TimeUnit.SECONDS.sleep(1);
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- request 2 --");
subscriber.request(2);
TimeUnit.SECONDS.sleep(1);
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- request 3 --");
subscriber.request(3);
TimeUnit.SECONDS.sleep(1);
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- close --");
}
// 結果
// ↓
//-- subscribe --
// onSubscribe
//-- submit 3 --
//lag = 3
//demand = -3
//-- request 1 --
// onNext : abc
//lag = 2
//demand = -2
//-- request 2 --
// onNext : 123
// onNext : XYZ
//lag = 0
//demand = 0
//-- request 3 --
//lag = 0
//demand = 3
//-- close --
// onComplete
long estimateMinimumDemand ()
このメソッドの使用例は、estimateMaximumLag() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
Throwable getClosedException ()
このメソッドの使用例は、closeExceptionally(Throwable error) にまとめて記載しました。
そちらのAPI使用例をご参照ください。
Executor getExecutor ()
このメソッドの使用例は、SubmissionPublisher(Executor executor, int maxBufferCapacity) にまとめて記載しました。
そちらのAPI使用例をご参照ください。
int getMaxBufferCapacity ()
このメソッドの使用例は、SubmissionPublisher(Executor executor, int maxBufferCapacity) にまとめて記載しました。
そちらのAPI使用例をご参照ください。
int getNumberOfSubscribers ()
class MySubscriber implements Flow.Subscriber<String> {
private final String name;
MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(String item) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
@Override
public String toString() {
return "name = " + name;
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println(publisher.hasSubscribers()); // false
System.out.println(publisher.getNumberOfSubscribers()); // 0
publisher.subscribe(new MySubscriber("abc"));
System.out.println(publisher.hasSubscribers()); // true
System.out.println(publisher.getNumberOfSubscribers()); // 1
publisher.subscribe(new MySubscriber("123"));
System.out.println(publisher.hasSubscribers()); // true
System.out.println(publisher.getNumberOfSubscribers()); // 2
publisher.subscribe(new MySubscriber("XYZ"));
System.out.println(publisher.hasSubscribers()); // true
System.out.println(publisher.getNumberOfSubscribers()); // 3
System.out.println("-- subscribers --");
for (final var subscriber : publisher.getSubscribers()) {
System.out.println(subscriber);
}
// 結果
// ↓
//-- subscribers --
//name = abc
//name = 123
//name = XYZ
}
List<Flow.Subscriber<? super T>> getSubscribers ()
このメソッドの使用例は、getNumberOfSubscribers() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
boolean hasSubscribers ()
このメソッドの使用例は、getNumberOfSubscribers() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
boolean isClosed ()
final var publisher = new SubmissionPublisher<String>();
try (publisher) {
System.out.println(publisher.isClosed()); // false
}
System.out.println(publisher.isClosed()); // true
boolean isSubscribed (Flow.Subscriber<? super T> subscriber)
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(String item) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
final var subscriberA = new MySubscriber();
final var subscriberB = new MySubscriber();
System.out.println(publisher.isSubscribed(subscriberA)); // false
System.out.println(publisher.isSubscribed(subscriberB)); // false
publisher.subscribe(subscriberA);
System.out.println(publisher.isSubscribed(subscriberA)); // true
System.out.println(publisher.isSubscribed(subscriberB)); // false
publisher.subscribe(subscriberB);
System.out.println(publisher.isSubscribed(subscriberA)); // true
System.out.println(publisher.isSubscribed(subscriberB)); // true
}
int offer (T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
timeout, unit パラメータ以外については offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) の使用例もご参照ください。
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println(" onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
public void request(long n) {
subscription.request(n);
}
}
final int capacity = 2;
System.out.println("capacity = " + capacity);
final var items = List.of("a1", "a2", "a3", "a4", "a5");
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), capacity)) {
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
for (final var item : items) {
System.out.println("---- item = " + item + " ----");
System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());
final var ret = publisher.offer(item, 1, TimeUnit.SECONDS, (sub, i) -> {
System.out.printf(" onDrop item = %s (%.3f sec.)%n",
i, elapsedTime.getAsDouble());
return false;
});
System.out.printf("offer ret = %d (%.3f sec.)%n%n",
ret, elapsedTime.getAsDouble());
}
System.out.println("---- request ----");
subscriber.request(5);
TimeUnit.SECONDS.sleep(1);
}
// 結果
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.010 sec.)
//offer ret = 2 (0.010 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
// onDrop item = a3 (1.023 sec.)
//offer ret = -1 (1.023 sec.)
//
//---- item = a4 ----
//offer (1.023 sec.)
// onDrop item = a4 (2.034 sec.)
//offer ret = -1 (2.034 sec.)
//
//---- item = a5 ----
//offer (2.034 sec.)
// onDrop item = a5 (3.039 sec.)
//offer ret = -1 (3.039 sec.)
//
//---- request ----
// onNext : a1
// onNext : a2
int offer (T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- offer --");
publisher.offer("abc", null);
publisher.offer("123", null);
publisher.offer("XYZ", null);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- offer --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
// 基準となる時刻
final long current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println(" onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
public void request(long n) {
subscription.request(n);
}
}
final int capacity = 2;
System.out.println("capacity = " + capacity);
final var items = List.of("a1", "a2", "a3", "a4", "a5");
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), capacity)) {
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
for (final var item : items) {
System.out.println("---- item = " + item + " ----");
System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());
final var ret = publisher.offer(item, (sub, i) -> {
System.out.printf(" onDrop item = %s (%.3f sec.)%n",
i, elapsedTime.getAsDouble());
return false;
});
System.out.printf("offer ret = %d (%.3f sec.)%n%n",
ret, elapsedTime.getAsDouble());
}
System.out.println("---- request ----");
subscriber.request(5);
TimeUnit.SECONDS.sleep(1);
}
// 結果
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.009 sec.)
//offer ret = 2 (0.009 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
// onDrop item = a3 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a4 ----
//offer (0.010 sec.)
// onDrop item = a4 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a5 ----
//offer (0.011 sec.)
// onDrop item = a5 (0.011 sec.)
//offer ret = -1 (0.011 sec.)
//
//---- request ----
// onNext : a1
// onNext : a2
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(3);
}
@Override
public void onNext(String item) {
System.out.println("<- onNext : " + item);
try {
System.out.println(" - sleep ...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
System.out.println("-> onNext end");
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
final int capacity = 1;
System.out.println("capacity = " + capacity);
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), capacity)) {
publisher.subscribe(new MySubscriber());
publisher.offer("abc", (subscriber, item) -> {
System.out.println("** onDrop item = " + item);
return true;
});
publisher.offer("123", (subscriber, item) -> {
System.out.println("** onDrop item = " + item);
return true;
});
publisher.offer("XYZ", (subscriber, item) -> {
System.out.println("** onDrop item = " + item);
try {
System.out.println("** sleep ...");
TimeUnit.SECONDS.sleep(3);
System.out.println("** sleep end");
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
return true;
});
TimeUnit.SECONDS.sleep(5);
}
// 結果
// ↓
//capacity = 1
//<- onNext : abc
// - sleep ...
//** onDrop item = XYZ
//** sleep ...
//-> onNext end
//<- onNext : 123
// - sleep ...
//-> onNext end
//** sleep end
//<- onNext : XYZ
// - sleep ...
//-> onNext end
int submit (T item)
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// 結果
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
class MySubscriber implements Flow.Subscriber<String> {
private final String name;
MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println(name + " : onNext : " + item);
try {
System.out.println(name + " : sleep ...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
System.out.println(name + " : onNext end");
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
publisher.subscribe(new MySubscriber("A"));
publisher.subscribe(new MySubscriber("B"));
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
final var ret1 = publisher.submit("abc");
final var ret2 = publisher.submit("XYZ");
System.out.println("ret1 = " + ret1);
System.out.println("ret2 = " + ret2);
System.out.println("-- submit end --");
TimeUnit.SECONDS.sleep(3);
}
// 結果
// ↓
//-- submit --
//ret1 = 1
//ret2 = 2
//-- submit end --
//A : onNext : abc
//B : onNext : abc
//A : sleep ...
//B : sleep ...
//A : onNext end
//B : onNext end
//A : onNext : XYZ
//B : onNext : XYZ
//A : sleep ...
//B : sleep ...
//A : onNext end
//B : onNext end
void subscribe (Flow.Subscriber<? super T> subscriber)
このメソッドの使用例は、submit(T item) にまとめて記載しました。
そちらのAPI使用例をご参照ください。