Java : Flow.Publisher (Reactive Streams) con ejemplos

Flow.Publisher (Java SE 21 & JDK 21) en Java con ejemplos.
Encontrará ejemplos de código en la mayoría de los métodos de Flow.Publisher<T>.

Nota :


Summary

Un productor de artículos (y mensajes de control relacionados) recibidos por los Suscriptores. Cada Flow.Subscriber actual recibe los mismos elementos (a través del método onNext) en el mismo orden, a menos que se produzcan caídas o errores. (Traducción automática)

Class diagram

Please see also : Reactive Streams

class MySubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.printf("  onSubscribe (tid=%d)%n",
                Thread.currentThread().threadId());

        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.printf("  onNext (tid=%d) : %s%n",
                Thread.currentThread().threadId(), item);

        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.printf("  onError (tid=%d) : %s%n",
                Thread.currentThread().threadId(), throwable);
    }

    @Override
    public void onComplete() {
        System.out.printf("  onComplete (tid=%d)%n",
                Thread.currentThread().threadId());
    }
}

System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());

try (final var submission = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    final Flow.Publisher<String> publisher = submission;
    publisher.subscribe(new MySubscriber());

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    submission.submit("abc");
    submission.submit("123");
    submission.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//main (tid=1)
//-- subscribe --
//  onSubscribe (tid=32)
//-- submit --
//  onNext (tid=32) : abc
//  onNext (tid=32) : 123
//  onNext (tid=32) : XYZ
//-- close --
//  onComplete (tid=32)

Methods

void subscribe (Flow.Subscriber<? super T> subscriber)

Agrega el suscriptor dado si es posible. (Traducción automática)

class MySubscriber implements Flow.Subscriber<String> {
    private final String name;

    MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(name + " : onSubscribe");
        subscription.request(2);
    }

    @Override
    public void onNext(String item) {
        System.out.println(name + " : onNext item = " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(name + " : onError");
    }

    @Override
    public void onComplete() {
        System.out.println(name + " : onComplete");
    }

    @Override
    public String toString() {
        return "name = " + name;
    }
}

try (final var submission = new SubmissionPublisher<String>()) {

    System.out.println("-- subscribe --");

    final Flow.Publisher<String> publisher = submission;
    publisher.subscribe(new MySubscriber("A"));
    publisher.subscribe(new MySubscriber("B"));
    publisher.subscribe(new MySubscriber("C"));

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- submit --");

    submission.submit("abc");
    submission.submit("XYZ");

    TimeUnit.SECONDS.sleep(1);
    System.out.println("-- close --");
}

// Result
// ↓
//-- subscribe --
//A : onSubscribe
//B : onSubscribe
//C : onSubscribe
//-- submit --
//A : onNext item = abc
//B : onNext item = abc
//C : onNext item = abc
//A : onNext item = XYZ
//B : onNext item = XYZ
//C : onNext item = XYZ
//-- close --
//A : onComplete
//B : onComplete
//C : onComplete

Related posts

  • Ejemplos de API
To top of page