Reactive Streams



Table of Contents

Introduction

Reactive Streams is an initiative to give non-blocking back pressure a standard for asynchronous stream processing. This includes work to runtime environments (JVM and JavaScript) as well as network protocols. Reactive Streams' main purpose is to control the sharing of stream data across an asynchronous boundary. Backpressure is an integral part of this model to allow a bounding of the queues that mediate between threads. Care has been taken to require all aspects of a ReactiveStreams implementation to be entirely non-blocking and synchronous behavior.

Reactive Streams is a standard and specification for JVM's Stream-oriented libraries

  • process a potentially unbounded number of elements in sequence
  • asynchronously passing elements between components,
  • the backpressure mechanism is non-blocking

The specification for Reactive Streams is composed of the following parts:
  • The API sets out the styles for implementing Reactive Streams and achieving interoperability between various implementations.
  • The Technology Compatibility Kit (TCK) is a standard test suite to test implementation conformance.

Implementations are free to implement additional features which are not covered by the specification as long as they comply with the API requirements and pass the TCK tests.

API Components

The API consists of the following components which Reactive Stream implementations require to provide:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

A Publisher is a supplier of a theoretically infinite number of sequenced elements, publishing them as requested by its Subscriber(s).

The following protocol gives the possible invocation sequences for methods on the Subscriber in response to a call to Publisher.subscribe(Subscriber):

onSubscribe onNext* (onError | onComplete)?

This means that onSubscribe is always signaled, followed by a potentially infinite number of onNext signals (as requested by Subscriber) followed by a onError signal if a malfunction occurs, or a onComplete signal if no more elements are available — all so long as the Subscription is not cancelled.

Publisher

a Publisher<T> is responsible for publishing elements of type T and provides a subscribe method for subscribers to connect to it

public class MyPublisher implements Publisher<Integer> {
    private final Iterator<Integer> iterator;
    MyPublisher(int count) {
        this.iterator = IntStream.rangeClosed(1, count).iterator();
    }
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        iterator.forEachRemaining(subscriber::onNext);
        subscriber.onComplete();
    }
}

Subscriber

a Subscriber connects to a Publisher, receives a confirmation via onSubscribe, then receive data via the onNext callbacks and additional signals via onError and onComplete.

public class MySubscriber<T> implements Subscriber<T> {
    @Override
    public void onSubscribe(Subscription subscription) {}

    @Override
    public void onNext(Integer item) {
        System.out.println("item = [" + item + "]");
    }

    @Override
    public void onError(Throwable throwable) {}

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
}		

Subscription

a Subscription represents a link between a Publisher and a Subscriber, and allows for backpressuring the publisher with request or terminating the link with cancel.

public class MySubscriber<T> implements Subscriber<T> {
    public List<T> consumedElements = new LinkedList<>();
 
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("item = [" + item + "]");
    }

    @Override
    public void onError(Throwable throwable) {}

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
}		

Processor

a Processor combines the capabilities of a Publisher and a Subscriber in a single interface.

public class MyProcessor<T, R> extends SubmissionPublisher<R> 
			implements Flow.Processor<T, R> {
    private Function<T, R> function;
    private Flow.Subscription subscription;
    public MyProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }
    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }
    @Override
    public void onComplete() {
        close();
    }
}