Contents

Overview

Helidon implements MicroProfile Reactive Streams Operators specification which defines reactive operators and provides a standardized tool for manipulation with Reactive Streams. You can use MicroProfile Reactive Streams Operators when you want to maintain source-level portability between different implementations.

Maven Coordinates

To enable Reactive Streams, add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
   <groupId>io.helidon.microprofile.reactive-streams</groupId>
   <artifactId>helidon-microprofile-reactive-streams</artifactId>
</dependency>
Copied

Usage

The MicroProfile Reactive Streams Operators specification provides a set of operators within stages, as well as the builders used to prepare graphs of stages from which streams can be built.

Example of simple closed graph usage:
AtomicInteger sum = new AtomicInteger();

ReactiveStreams.of("1", "2", "3", "4", "5")
        .limit(3)
        .map(Integer::parseInt)
        .forEach(sum::addAndGet)
        .run()
        .whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));

// >Sum: 6
Copied
Operators(Stages)
fromIterableCreate new PublisherBuilder from supplied Iterable
ofCreate new PublisherBuilder emitting supplied elements
ofNullableEmpty stream if supplied item is null
iterateCreate infinite stream with every next item created by supplied operator from previous item
generateCreate infinite stream with every item created by invocation of supplier
emptyCreate new PublisherBuilder emitting as a first thing complete signal
failedCreate new PublisherBuilder emitting as a first thing error signal
concatConcat two streams
coupledTwo parallel streams sharing cancel, onError and onComplete signals
limitLimit the size of the stream, when limit is reached completes
peekInvoke consumer for every item passing this operator
filterDrop item when expression result to false
mapTransform items
flatMapFlatten supplied stream to current stream
flatMapIterableFlatten supplied iterable to current stream
flatMapCompletionStageMap elements to completion stage and wait for each to be completed, keeps the order
flatMapRSPublisherMap elements to Publishers and flatten this sub streams to original stream
takeWhileLet items pass until expression is true, first time its false completes
dropWhileDrop items until expression is true, first time its false let everything pass
skipDrop first n items
distinctLet pass only distinct items
viaConnect supplied processor to current stream return supplied processor
onErrorInvoke supplied consumer when onError signal received
onErrorResumeEmit one last supplied item when onError signal received
onErrorResumeWithWhen onError signal received continue emitting from supplied publisher builder
onErrorResumeWithRsPublisherWhen onError signal received continue emitting from supplied publisher
onCompleteInvoke supplied runnable when onComplete signal received
onTerminateInvoke supplied runnable when onComplete or onError signal received
ifEmptyExecutes given java.lang.Runnable when stream is finished without value(empty stream).
toConnect this stream to supplied subscriber
toListCollect all intercepted items to List
collectCollect all intercepted items with provided collector
forEachInvoke supplied Consumer for each intercepted item
ignoreIgnore all onNext signals, wait for onComplete
reduceReduction with provided expression
cancelCancel stream immediately
findFirstReturn first intercepted element

Graphs

Graphs are pre-prepared stream builders with stages, which can be combined to closed graph with methods via and to.

Combining the graphs and running the stream:
// Assembly of stream, nothing is streamed yet
PublisherBuilder<String> publisherStage =
        ReactiveStreams.of("foo", "bar")
                .map(String::trim);

ProcessorBuilder<String, String> processorStage =
        ReactiveStreams.<String>builder()
                .map(String::toUpperCase);

SubscriberBuilder<String, Void> subscriberStage =
        ReactiveStreams.<String>builder()
                .map(s -> "Item received: " + s)
                .forEach(System.out::println);

// Execution of pre-prepared stream
publisherStage
        .via(processorStage)
        .to(subscriberStage).run();

// >Item received:FOO
// >Item received: BAR
Copied

Reference