Project Reactor

Table of Contents

Core Features

The main artefact of the Reactor project is reactor-core, a reactive library that focuses on the specification of Reactive Streams and targets Java 8.

Reactor introduces composable reactive types which implement Publisher but which also provide a rich operator vocabulary: Flux and Mono. A Flux object is a reactive sequence of 0 .. N items whereas a Mono object is a single-value-or-empty (0 .. 1) result.

This distinction carries a bit of semantic information into the type, suggesting the asynchronous processing's rough cardinality. For example, an HTTP request only produces one response, so doing a count operation doesn't make much sense. Expressing the result of such an HTTP request as a Mono<HttpResponse> is therefore makes more sense than expressing it as a Flux<HttpResponse>, because it only offers operators applicable to a background of zero items or one object.

Operators that change the maximum cardinality of the processing also switch to the relevant type. For instance, the count operator exists in Flux, but it returns a Mono<Long>.

Flux, an Asynchronous Sequence of 0-N Items

A Flux<T> is a standard Publisher<T>, describing an asynchronous sequence of items emitted from 0 to N, preferably terminated by either a completion signal or an error. As in the Reactive Streams spec, these three signal types translate into calls to the onNext, onComplete, and onError methods of a downstream subscriber.

With this broad range of potential signals Flux is the reactive form of general usage. Notice that all events, including terminating ones, are optional: no onNext event, but a onComplete event is an empty finite series, but delete the onComplete series and you have an infinite empty sequence (not particularly useful, except for cancelation testing). Infinite sequences are likewise not necessarily empty. For instance, Flux.interval(Duration) produces an infinite Flux<Long> and emits regular ticks from a clock.

The following image shows how a Flux transforms items:

Mono, an Asynchronous 0-1 Result

A Mono<T> is a specialized Publisher<T> which emits a maximum of one item and then terminates (optionally) with a onComplete signal or onError.

This only provides a subset of the operators available for a Flux, and some operators (notably those combining the Mono with another Publisher) turn to a Flux. Mono#concatWith(Publisher), for instance, returns a Flux while Mono#then(Mono) returns another Mono.

Note that you can use a Mono to represent asynchronous processes with no value that have only the concept of completion (similar to a Runnable). You may use an empty Mono<Void> to construct one.

Simple Ways to Create a Flux or Mono and Subscribe to It

The easiest way to get started with Flux and Mono is to use one of the numerous factory methods found in their respective classes.

For instance, to create a sequence of String, you can either enumerate them or put them in a collection and create the Flux from it, as follows:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Other examples of factory methods include the following:

Mono<String> noData = Mono.empty(); 
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 

Line1: Notice the factory method honors the generic type even though it has no value.
Line3: The first parameter is the start of the range, while the second parameter is the number of items to produce.

Flux and Mono use Java 8 lambdas when it comes to subscribing. You have a wide range of variants of .subscribe() that take lambdas for different callbacks combinations as shown in the following method signatures:

subscribe(Consumer<? super T> consumer); 
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

Line1: Subscribe and trigger the sequence.
Line2: Do something with each produced value.
Line3: Deal with values but also react to an error.
Line5: Deal with values and errors but also run some code when the sequence successfully completes.
Line8: Deal with values and errors and successful completion but also do something with the Subscription produced by this subscribe call.