Java : SubmissionPublisher (Reactive Streams) with Examples

SubmissionPublisher (Java SE 21 & JDK 21) with Examples.
You will find code examples on most SubmissionPublisher methods.


Summary

A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered.

Class diagram

Please see also : Reactive Streams

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

Constructors

SubmissionPublisher ()

Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task), with maximum buffer capacity of Flow.defaultBufferSize(), and no handler for Subscriber exceptions in method onNext.

try (final var publisher = new SubmissionPublisher<String>()) {

    final var executor = publisher.getExecutor();
    System.out.println(executor.getClass().getSimpleName()); // ForkJoinPool

    final var capacity = publisher.getMaxBufferCapacity();
    System.out.println(capacity); // 256
}

SubmissionPublisher (Executor executor, int maxBufferCapacity)

Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and no handler for Subscriber exceptions in method onNext.

try (final var executor = Executors.newFixedThreadPool(10)) {
    try (final var publisher = new SubmissionPublisher<String>(executor, 1024)) {

        final var executorName = publisher.getExecutor().getClass().getSimpleName();
        System.out.println(executorName); // ThreadPoolExecutor

        final var capacity = publisher.getMaxBufferCapacity();
        System.out.println(capacity); // 1024
    }
}

SubmissionPublisher (Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)

Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and, if non-null, the given handler invoked when any Subscriber throws an exception in method onNext.

Please see also the SubmissionPublisher(Executor executor, int maxBufferCapacity) for parameters other than handler parameter.

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), 256, (subscriber, throwable) -> {
    System.out.println("handler : " + throwable);
})
) {
    System.out.println("-- subscribe --");

    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
            subscription.request(1);
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
            throw new IllegalStateException("Error!");
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit --");
    publisher.submit("abc");

    TimeUnit.SECONDS.sleep(1);
}

// Result
// ↓
//-- subscribe --
//onSubscribe
//-- submit --
//onNext : abc
//handler : java.lang.IllegalStateException: Error!
//onError : java.lang.IllegalStateException: Error!

Methods

void close ()

Unless already closed, issues onComplete signals to current subscribers, and disallows subsequent attempts to publish.

try (final var publisher = new SubmissionPublisher<String>()) {
    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
}

// Result
// ↓
//onSubscribe
//onComplete

An example without a try-with-resources statement.

final var publisher = new SubmissionPublisher<String>();
try {
    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
} finally {
    publisher.close();
}

// Result
// ↓
//onSubscribe
//onComplete

void closeExceptionally (Throwable error)

Unless already closed, issues onError signals to current subscribers with the given error, and disallows subsequent attempts to publish.

try (final var publisher = new SubmissionPublisher<String>()) {
    publisher.subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNext : " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });

    publisher.closeExceptionally(new IllegalStateException("Error!"));

    TimeUnit.SECONDS.sleep(1);

    final var ret = publisher.getClosedException();
    System.out.println("closedException : " + ret);

    TimeUnit.SECONDS.sleep(1);
}

// Result
// ↓
//onSubscribe
//onError : java.lang.IllegalStateException: Error!
//closedException : java.lang.IllegalStateException: Error!

CompletableFuture<Void> consume (Consumer<? super T> consumer)

Processes all published items using the given Consumer function.

try (final var publisher = new SubmissionPublisher<String>()) {

    final var future = publisher.consume((v) -> {
        System.out.println("  consume : " + v);
    });

    System.out.println("-- submit --");
    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    future.thenRun(() -> {
        System.out.println("  thenRun");
    });

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//-- submit --
//  consume : abc
//  consume : 123
//  consume : XYZ
//-- close --
//  thenRun

int estimateMaximumLag ()

Returns an estimate of the maximum number of items produced but not yet consumed among all current subscribers.

class MySubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("  onSubscribe");
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("  onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("  onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("  onComplete");
    }

    public void request(long n) {
        subscription.request(n);
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    TimeUnit.SECONDS.sleep(1);

    System.out.println("-- submit 3 --");
    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- request 1 --");
    subscriber.request(1);
    TimeUnit.SECONDS.sleep(1);

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- request 2 --");
    subscriber.request(2);
    TimeUnit.SECONDS.sleep(1);

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- request 3 --");
    subscriber.request(3);
    TimeUnit.SECONDS.sleep(1);

    System.out.println("lag = " + publisher.estimateMaximumLag());
    System.out.println("demand = " + publisher.estimateMinimumDemand());

    System.out.println("-- close --");
}

// Result
// ↓
//-- subscribe --
//  onSubscribe
//-- submit 3 --
//lag = 3
//demand = -3
//-- request 1 --
//  onNext : abc
//lag = 2
//demand = -2
//-- request 2 --
//  onNext : 123
//  onNext : XYZ
//lag = 0
//demand = 0
//-- request 3 --
//lag = 0
//demand = 3
//-- close --
//  onComplete

long estimateMinimumDemand ()

Returns an estimate of the minimum number of items requested (via request) but not yet produced, among all current subscribers.

Please see estimateMaximumLag().

Throwable getClosedException ()

Returns the exception associated with closeExceptionally, or null if not closed or if closed normally.

Please see closeExceptionally(Throwable error).

Executor getExecutor ()

Returns the Executor used for asynchronous delivery.

Please see SubmissionPublisher(Executor executor, int maxBufferCapacity).

int getMaxBufferCapacity ()

Returns the maximum per-subscriber buffer capacity.

Please see SubmissionPublisher(Executor executor, int maxBufferCapacity).

int getNumberOfSubscribers ()

Returns the number of current subscribers.

class MySubscriber implements Flow.Subscriber<String> {
    private final String name;

    MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
    }

    @Override
    public void onNext(String item) {
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }

    @Override
    public String toString() {
        return "name = " + name;
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println(publisher.hasSubscribers()); // false
    System.out.println(publisher.getNumberOfSubscribers()); // 0

    publisher.subscribe(new MySubscriber("abc"));

    System.out.println(publisher.hasSubscribers()); // true
    System.out.println(publisher.getNumberOfSubscribers()); // 1

    publisher.subscribe(new MySubscriber("123"));

    System.out.println(publisher.hasSubscribers()); // true
    System.out.println(publisher.getNumberOfSubscribers()); // 2

    publisher.subscribe(new MySubscriber("XYZ"));

    System.out.println(publisher.hasSubscribers()); // true
    System.out.println(publisher.getNumberOfSubscribers()); // 3

    System.out.println("-- subscribers --");
    for (final var subscriber : publisher.getSubscribers()) {
        System.out.println(subscriber);
    }

    // Result
    // ↓
    //-- subscribers --
    //name = abc
    //name = 123
    //name = XYZ
}

List<Flow.Subscriber<? super T>> getSubscribers ()

Returns a list of current subscribers for monitoring and tracking purposes, not for invoking Flow.Subscriber methods on the subscribers.

Please see getNumberOfSubscribers().

boolean hasSubscribers ()

Returns true if this publisher has any subscribers.

Please see getNumberOfSubscribers().

boolean isClosed ()

Returns true if this publisher is not accepting submissions.

final var publisher = new SubmissionPublisher<String>();
try (publisher) {
    System.out.println(publisher.isClosed()); // false
}
System.out.println(publisher.isClosed()); // true

boolean isSubscribed (Flow.Subscriber<? super T> subscriber)

Returns true if the given Subscriber is currently subscribed.

class MySubscriber implements Flow.Subscriber<String> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
    }

    @Override
    public void onNext(String item) {
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    final var subscriberA = new MySubscriber();
    final var subscriberB = new MySubscriber();

    System.out.println(publisher.isSubscribed(subscriberA)); // false
    System.out.println(publisher.isSubscribed(subscriberB)); // false

    publisher.subscribe(subscriberA);

    System.out.println(publisher.isSubscribed(subscriberA)); // true
    System.out.println(publisher.isSubscribed(subscriberB)); // false

    publisher.subscribe(subscriberB);

    System.out.println(publisher.isSubscribed(subscriberA)); // true
    System.out.println(publisher.isSubscribed(subscriberB)); // true
}

int offer (T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method, blocking while resources for any subscription are unavailable, up to the specified timeout or until the caller thread is interrupted, at which point the given handler (if non-null) is invoked, and if it returns true, retried once.

Please see also the offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) for parameters other than timeout and unit parameters.

final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("  onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }

    public void request(long n) {
        subscription.request(n);
    }
}

final int capacity = 2;
System.out.println("capacity = " + capacity);

final var items = List.of("a1", "a2", "a3", "a4", "a5");

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), capacity)) {

    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    for (final var item : items) {
        System.out.println("---- item = " + item + " ----");
        System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());

        final var ret = publisher.offer(item, 1, TimeUnit.SECONDS, (sub, i) -> {
            System.out.printf("  onDrop item = %s (%.3f sec.)%n",
                    i, elapsedTime.getAsDouble());
            return false;
        });
        System.out.printf("offer ret = %d (%.3f sec.)%n%n",
                ret, elapsedTime.getAsDouble());
    }

    System.out.println("---- request ----");
    subscriber.request(5);

    TimeUnit.SECONDS.sleep(1);
}

// Result
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.010 sec.)
//offer ret = 2 (0.010 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
//  onDrop item = a3 (1.023 sec.)
//offer ret = -1 (1.023 sec.)
//
//---- item = a4 ----
//offer (1.023 sec.)
//  onDrop item = a4 (2.034 sec.)
//offer ret = -1 (2.034 sec.)
//
//---- item = a5 ----
//offer (2.034 sec.)
//  onDrop item = a5 (3.039 sec.)
//offer ret = -1 (3.039 sec.)
//
//---- request ----
//  onNext : a1
//  onNext : a2

int offer (T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method.

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- offer --");

    publisher.offer("abc", null);
    publisher.offer("123", null);
    publisher.offer("XYZ", null);

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- offer --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)
final long current = System.nanoTime();
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String item) {
        System.out.println("  onNext : " + item);
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }

    public void request(long n) {
        subscription.request(n);
    }
}

final int capacity = 2;
System.out.println("capacity = " + capacity);

final var items = List.of("a1", "a2", "a3", "a4", "a5");

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), capacity)) {

    final var subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    for (final var item : items) {
        System.out.println("---- item = " + item + " ----");
        System.out.printf("offer (%.3f sec.)%n", elapsedTime.getAsDouble());

        final var ret = publisher.offer(item, (sub, i) -> {
            System.out.printf("  onDrop item = %s (%.3f sec.)%n",
                    i, elapsedTime.getAsDouble());
            return false;
        });
        System.out.printf("offer ret = %d (%.3f sec.)%n%n",
                ret, elapsedTime.getAsDouble());
    }

    System.out.println("---- request ----");
    subscriber.request(5);

    TimeUnit.SECONDS.sleep(1);
}

// Result
// ↓
//capacity = 2
//---- item = a1 ----
//offer (0.008 sec.)
//offer ret = 1 (0.009 sec.)
//
//---- item = a2 ----
//offer (0.009 sec.)
//offer ret = 2 (0.009 sec.)
//
//---- item = a3 ----
//offer (0.010 sec.)
//  onDrop item = a3 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a4 ----
//offer (0.010 sec.)
//  onDrop item = a4 (0.010 sec.)
//offer ret = -1 (0.010 sec.)
//
//---- item = a5 ----
//offer (0.011 sec.)
//  onDrop item = a5 (0.011 sec.)
//offer ret = -1 (0.011 sec.)
//
//---- request ----
//  onNext : a1
//  onNext : a2
class MySubscriber implements Flow.Subscriber<String> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(3);
    }

    @Override
    public void onNext(String item) {
        System.out.println("<- onNext : " + item);

        try {
            System.out.println(" - sleep ...");
            TimeUnit.SECONDS.sleep(1);

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        } finally {
            System.out.println("-> onNext end");
        }
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }
}

final int capacity = 1;
System.out.println("capacity = " + capacity);

try (final var publisher = new SubmissionPublisher<String>(
        ForkJoinPool.commonPool(), capacity)) {

    publisher.subscribe(new MySubscriber());

    publisher.offer("abc", (subscriber, item) -> {
        System.out.println("** onDrop item = " + item);
        return true;
    });

    publisher.offer("123", (subscriber, item) -> {
        System.out.println("** onDrop item = " + item);
        return true;
    });

    publisher.offer("XYZ", (subscriber, item) -> {
        System.out.println("** onDrop item = " + item);
        try {
            System.out.println("** sleep ...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("** sleep end");

        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }

        return true;
    });

    TimeUnit.SECONDS.sleep(5);
}

// Result
// ↓
//capacity = 1
//<- onNext : abc
// - sleep ...
//** onDrop item = XYZ
//** sleep ...
//-> onNext end
//<- onNext : 123
// - sleep ...
//-> onNext end
//** sleep end
//<- onNext : XYZ
// - sleep ...
//-> onNext end

int submit (T item)

Publishes the given item to each current subscriber by asynchronously invoking its onNext method, blocking uninterruptibly while resources for any subscriber are unavailable.

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var publisher = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    publisher.submit("abc");
    publisher.submit("123");
    publisher.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)
class MySubscriber implements Flow.Subscriber<String> {

    private final String name;

    MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(2);
    }

    @Override
    public void onNext(String item) {
        System.out.println(name + " : onNext : " + item);
        try {
            System.out.println(name + " :   sleep ...");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        } finally {
            System.out.println(name + " : onNext end");
        }
    }

    @Override
    public void onError(Throwable throwable) {
    }

    @Override
    public void onComplete() {
    }
}

try (final var publisher = new SubmissionPublisher<String>()) {

    publisher.subscribe(new MySubscriber("A"));
    publisher.subscribe(new MySubscriber("B"));

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    final var ret1 = publisher.submit("abc");
    final var ret2 = publisher.submit("XYZ");

    System.out.println("ret1 = " + ret1);
    System.out.println("ret2 = " + ret2);

    System.out.println("-- submit end --");
    TimeUnit.SECONDS.sleep(3);
}

// Result
// ↓
//-- submit --
//ret1 = 1
//ret2 = 2
//-- submit end --
//A : onNext : abc
//B : onNext : abc
//A :   sleep ...
//B :   sleep ...
//A : onNext end
//B : onNext end
//A : onNext : XYZ
//B : onNext : XYZ
//A :   sleep ...
//B :   sleep ...
//A : onNext end
//B : onNext end

void subscribe (Flow.Subscriber<? super T> subscriber)

Adds the given Subscriber unless already subscribed.

Please see submit(T item).


Related posts

To top of page