Introduction
Reactive Streams is an initiative to give non-blocking back pressure a standard for asynchronous stream processing. This includes work to runtime environments (JVM and JavaScript) as well as network protocols. Reactive Streams' main purpose is to control the sharing of stream data across an asynchronous boundary. Backpressure is an integral part of this model to allow a bounding of the queues that mediate between threads. Care has been taken to require all aspects of a ReactiveStreams implementation to be entirely non-blocking and synchronous behavior.
Reactive Streams is a standard and specification for JVM's Stream-oriented libraries
- process a potentially unbounded number of elements in sequence
- asynchronously passing elements between components,
- the backpressure mechanism is non-blocking
The specification for Reactive Streams is composed of the following parts:
- The API sets out the styles for implementing Reactive Streams and achieving interoperability between various implementations.
- The Technology Compatibility Kit (TCK) is a standard test suite to test implementation conformance.
Implementations are free to implement additional features which are not covered by the specification as long as they comply with the API requirements and pass the TCK tests.
API Components
The API consists of the following components which Reactive Stream implementations require to provide:
- Publisher
- Subscriber
- Subscription
- Processor
A Publisher is a supplier of a theoretically infinite number of sequenced elements, publishing them as requested by its Subscriber(s).
The following protocol gives the possible invocation sequences for methods on the Subscriber in response to a call to Publisher.subscribe(Subscriber):
onSubscribe onNext* (onError | onComplete)?
This means that onSubscribe is always signaled, followed by a potentially infinite number of onNext signals (as requested by Subscriber) followed by a onError signal if a malfunction occurs, or a onComplete signal if no more elements are available — all so long as the Subscription is not cancelled.
Publisher
a Publisher<T>
is responsible for publishing elements of type T and provides a subscribe method for subscribers to connect to it
public class MyPublisher implements Publisher<Integer> {
private final Iterator<Integer> iterator;
MyPublisher(int count) {
this.iterator = IntStream.rangeClosed(1, count).iterator();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
iterator.forEachRemaining(subscriber::onNext);
subscriber.onComplete();
}
}
Subscriber
a Subscriber
public class MySubscriber<T> implements Subscriber<T> {
@Override
public void onSubscribe(Subscription subscription) {}
@Override
public void onNext(Integer item) {
System.out.println("item = [" + item + "]");
}
@Override
public void onError(Throwable throwable) {}
@Override
public void onComplete() {
System.out.println("complete");
}
}
Subscription
a Subscription represents a link between a Publisher and a Subscriber, and allows for backpressuring the publisher with request or terminating the link with cancel.
public class MySubscriber<T> implements Subscriber<T> {
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("item = [" + item + "]");
}
@Override
public void onError(Throwable throwable) {}
@Override
public void onComplete() {
System.out.println("complete");
}
}
Processor
a Processor combines the capabilities of a Publisher and a Subscriber in a single interface.
public class MyProcessor<T, R> extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public MyProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}