Java : SubmissionPublisher (Reactive Streams) with Examples
SubmissionPublisher (Java SE 21 & JDK 21) with Examples.
You will find code examples on most SubmissionPublisher methods.
Summary
Please see also : Reactive Streams
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
Constructors
SubmissionPublisher ()
try (final var publisher = new SubmissionPublisher<String>()) {
final var executor = publisher.getExecutor();
System.out.println(executor.getClass().getSimpleName()); // ForkJoinPool
final var capacity = publisher.getMaxBufferCapacity();
System.out.println(capacity); // 256
}
SubmissionPublisher (Executor executor, int maxBufferCapacity)
try (final var executor = Executors.newFixedThreadPool(10)) {
try (final var publisher = new SubmissionPublisher<String>(executor, 1024)) {
final var executorName = publisher.getExecutor().getClass().getSimpleName();
System.out.println(executorName); // ThreadPoolExecutor
final var capacity = publisher.getMaxBufferCapacity();
System.out.println(capacity); // 1024
}
}
SubmissionPublisher (Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
Please see also the SubmissionPublisher(Executor executor, int maxBufferCapacity) for parameters other than handler parameter.
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), 256, (subscriber, throwable) -> {
System.out.println("handler : " + throwable);
})
) {
System.out.println("-- subscribe --");
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
throw new IllegalStateException("Error!");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
TimeUnit.SECONDS.sleep(1);
}
// Result
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//handler : java.lang.IllegalStateException: Error!
//onError : java.lang.IllegalStateException: Error!
Methods
void close ()
try (final var publisher = new SubmissionPublisher<String>()) {
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
// Result
// ↓
//onSubscribe
//onComplete
An example without a try-with-resources statement.
final var publisher = new SubmissionPublisher<String>();
try {
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
} finally {
publisher.close();
}
// Result
// ↓
//onSubscribe
//onComplete
void closeExceptionally (Throwable error)
try (final var publisher = new SubmissionPublisher<String>()) {
publisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String item) {
System.out.println("onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
publisher.closeExceptionally(new IllegalStateException("Error!"));
TimeUnit.SECONDS.sleep(1);
final var ret = publisher.getClosedException();
System.out.println("closedException : " + ret);
TimeUnit.SECONDS.sleep(1);
}
// Result
// ↓
//onSubscribe
//onError : java.lang.IllegalStateException: Error!
//closedException : java.lang.IllegalStateException: Error!
CompletableFuture<Void> consume (Consumer<? super T> consumer)
try (final var publisher = new SubmissionPublisher<String>()) {
final var future = publisher.consume((v) -> {
System.out.println(" consume : " + v);
});
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
future.thenRun(() -> {
System.out.println(" thenRun");
});
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//-- submit --
// consume : abc
// consume : 123
// consume : XYZ
//-- close --
// thenRun
int estimateMaximumLag ()
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(" onSubscribe");
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println(" onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println(" onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println(" onComplete");
}
public void request(long n) {
subscription.request(n);
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit 3 --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- request 1 --");
subscriber.request(1);
TimeUnit.SECONDS.sleep(1);
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- request 2 --");
subscriber.request(2);
TimeUnit.SECONDS.sleep(1);
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- request 3 --");
subscriber.request(3);
TimeUnit.SECONDS.sleep(1);
System.out.println("lag = " + publisher.estimateMaximumLag());
System.out.println("demand = " + publisher.estimateMinimumDemand());
System.out.println("-- close --");
}
// Result
// ↓
//-- subscribe --
// onSubscribe
//-- submit 3 --
//lag = 3
//demand = -3
//-- request 1 --
// onNext : abc
//lag = 2
//demand = -2
//-- request 2 --
// onNext : 123
// onNext : XYZ
//lag = 0
//demand = 0
//-- request 3 --
//lag = 0
//demand = 3
//-- close --
// onComplete
long estimateMinimumDemand ()
Please see estimateMaximumLag().
Throwable getClosedException ()
Please see closeExceptionally(Throwable error).
Executor getExecutor ()
Please see SubmissionPublisher(Executor executor, int maxBufferCapacity).
int getMaxBufferCapacity ()
Please see SubmissionPublisher(Executor executor, int maxBufferCapacity).
int getNumberOfSubscribers ()
class MySubscriber implements Flow.Subscriber<String> {
private final String name;
MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(String item) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
@Override
public String toString() {
return "name = " + name;
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println(publisher.hasSubscribers()); // false
System.out.println(publisher.getNumberOfSubscribers()); // 0
publisher.subscribe(new MySubscriber("abc"));
System.out.println(publisher.hasSubscribers()); // true
System.out.println(publisher.getNumberOfSubscribers()); // 1
publisher.subscribe(new MySubscriber("123"));
System.out.println(publisher.hasSubscribers()); // true
System.out.println(publisher.getNumberOfSubscribers()); // 2
publisher.subscribe(new MySubscriber("XYZ"));
System.out.println(publisher.hasSubscribers()); // true
System.out.println(publisher.getNumberOfSubscribers()); // 3
System.out.println("-- subscribers --");
for (final var subscriber : publisher.getSubscribers()) {
System.out.println(subscriber);
}
// Result
// ↓
//-- subscribers --
//name = abc
//name = 123
//name = XYZ
}
List<Flow.Subscriber<? super T>> getSubscribers ()
Please see getNumberOfSubscribers().
boolean hasSubscribers ()
Please see getNumberOfSubscribers().
boolean isClosed ()
final var publisher = new SubmissionPublisher<String>();
try (publisher) {
System.out.println(publisher.isClosed()); // false
}
System.out.println(publisher.isClosed()); // true
boolean isSubscribed (Flow.Subscriber<? super T> subscriber)
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(String item) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
final var subscriberA = new MySubscriber();
final var subscriberB = new MySubscriber();
System.out.println(publisher.isSubscribed(subscriberA)); // false
System.out.println(publisher.isSubscribed(subscriberB)); // false
publisher.subscribe(subscriberA);
System.out.println(publisher.isSubscribed(subscriberA)); // true
System.out.println(publisher.isSubscribed(subscriberB)); // false
publisher.subscribe(subscriberB);
System.out.println(publisher.isSubscribed(subscriberA)); // true
System.out.println(publisher.isSubscribed(subscriberB)); // true
}
int offer (T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
Please see also the offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) for parameters other than timeout and unit parameters.
final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println(" onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
public void request(long n) {
subscription.request(n);
}
}
final int capacity = 2;
System.out.println("capacity = " + capacity);
final var items = List.of("a1", "a2", "a3", "a4", "a5");
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), capacity)) {
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
for (final var item : items) {
System.out.println("---- item = " + item + " ----");
System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());
final var ret = publisher.offer(item, 1, TimeUnit.SECONDS, (sub, i) -> {
System.out.printf(" onDrop item = %s (%.3f sec.)%n",
i, elapsedTime.getAsDouble());
return false;
});
System.out.printf("offer ret = %d (%.3f sec.)%n%n",
ret, elapsedTime.getAsDouble());
}
System.out.println("---- request ----");
subscriber.request(5);
TimeUnit.SECONDS.sleep(1);
}
// Result
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.010 sec.)
//offer ret = 2 (0.010 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
// onDrop item = a3 (1.023 sec.)
//offer ret = -1 (1.023 sec.)
//
//---- item = a4 ----
//offer (1.023 sec.)
// onDrop item = a4 (2.034 sec.)
//offer ret = -1 (2.034 sec.)
//
//---- item = a5 ----
//offer (2.034 sec.)
// onDrop item = a5 (3.039 sec.)
//offer ret = -1 (3.039 sec.)
//
//---- request ----
// onNext : a1
// onNext : a2
int offer (T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- offer --");
publisher.offer("abc", null);
publisher.offer("123", null);
publisher.offer("XYZ", null);
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- offer --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String item) {
System.out.println(" onNext : " + item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
public void request(long n) {
subscription.request(n);
}
}
final int capacity = 2;
System.out.println("capacity = " + capacity);
final var items = List.of("a1", "a2", "a3", "a4", "a5");
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), capacity)) {
final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);
for (final var item : items) {
System.out.println("---- item = " + item + " ----");
System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());
final var ret = publisher.offer(item, (sub, i) -> {
System.out.printf(" onDrop item = %s (%.3f sec.)%n",
i, elapsedTime.getAsDouble());
return false;
});
System.out.printf("offer ret = %d (%.3f sec.)%n%n",
ret, elapsedTime.getAsDouble());
}
System.out.println("---- request ----");
subscriber.request(5);
TimeUnit.SECONDS.sleep(1);
}
// Result
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.009 sec.)
//offer ret = 2 (0.009 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
// onDrop item = a3 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a4 ----
//offer (0.010 sec.)
// onDrop item = a4 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a5 ----
//offer (0.011 sec.)
// onDrop item = a5 (0.011 sec.)
//offer ret = -1 (0.011 sec.)
//
//---- request ----
// onNext : a1
// onNext : a2
class MySubscriber implements Flow.Subscriber<String> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(3);
}
@Override
public void onNext(String item) {
System.out.println("<- onNext : " + item);
try {
System.out.println(" - sleep ...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
System.out.println("-> onNext end");
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
final int capacity = 1;
System.out.println("capacity = " + capacity);
try (final var publisher = new SubmissionPublisher<String>(
ForkJoinPool.commonPool(), capacity)) {
publisher.subscribe(new MySubscriber());
publisher.offer("abc", (subscriber, item) -> {
System.out.println("** onDrop item = " + item);
return true;
});
publisher.offer("123", (subscriber, item) -> {
System.out.println("** onDrop item = " + item);
return true;
});
publisher.offer("XYZ", (subscriber, item) -> {
System.out.println("** onDrop item = " + item);
try {
System.out.println("** sleep ...");
TimeUnit.SECONDS.sleep(3);
System.out.println("** sleep end");
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
return true;
});
TimeUnit.SECONDS.sleep(5);
}
// Result
// ↓
//capacity = 1
//<- onNext : abc
// - sleep ...
//** onDrop item = XYZ
//** sleep ...
//-> onNext end
//<- onNext : 123
// - sleep ...
//-> onNext end
//** sleep end
//<- onNext : XYZ
// - sleep ...
//-> onNext end
int submit (T item)
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}
@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());
try (final var publisher = new SubmissionPublisher<String>()) {
System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");
TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
}
// Result
// ↓
//main (tid=1)
//-- subscribe --
// onSubscribe (tid=32)
//-- submit --
// onNext (tid=32) : abc
// onNext (tid=32) : 123
// onNext (tid=32) : XYZ
//-- close --
// onComplete (tid=32)
class MySubscriber implements Flow.Subscriber<String> {
private final String name;
MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(2);
}
@Override
public void onNext(String item) {
System.out.println(name + " : onNext : " + item);
try {
System.out.println(name + " : sleep ...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
System.out.println(name + " : onNext end");
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
try (final var publisher = new SubmissionPublisher<String>()) {
publisher.subscribe(new MySubscriber("A"));
publisher.subscribe(new MySubscriber("B"));
TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");
final var ret1 = publisher.submit("abc");
final var ret2 = publisher.submit("XYZ");
System.out.println("ret1 = " + ret1);
System.out.println("ret2 = " + ret2);
System.out.println("-- submit end --");
TimeUnit.SECONDS.sleep(3);
}
// Result
// ↓
//-- submit --
//ret1 = 1
//ret2 = 2
//-- submit end --
//A : onNext : abc
//B : onNext : abc
//A : sleep ...
//B : sleep ...
//A : onNext end
//B : onNext end
//A : onNext : XYZ
//B : onNext : XYZ
//A : sleep ...
//B : sleep ...
//A : onNext end
//B : onNext end
void subscribe (Flow.Subscriber<? super T> subscriber)
Please see submit(T item).
Related posts
- API Examples