Contents
Overview
Helidon implements MicroProfile Reactive Streams Operators specification which defines reactive operators and provides a standardized tool for manipulation with Reactive Streams. You can use MicroProfile Reactive Streams Operators when you want to maintain source-level portability between different implementations.
Maven Coordinates
To enable Reactive Streams add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.microprofile.reactive-streams</groupId>
<artifactId>helidon-microprofile-reactive-streams</artifactId>
</dependency>content_copy
Usage
The MicroProfile Reactive Streams Operators specification provides a set of operators within stages, as well as the builders used to prepare graphs of stages from which streams can be built.
Example of simple closed graph usage:
AtomicInteger sum = new AtomicInteger();
ReactiveStreams.of("1", "2", "3", "4", "5")
.limit(3)
.map(Integer::parseInt)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
> Sum: 6content_copy
Operators(Stages)
| fromIterable | Create new PublisherBuilder from supplied Iterable |
| of | Create new PublisherBuilder emitting supplied elements |
| ofNullable | Empty stream if supplied item is null |
| iterate | Create infinite stream with every next item created by supplied operator from previous item |
| generate | Create infinite stream with every item created by invocation of supplier |
| empty | Create new PublisherBuilder emitting as a first thing complete signal |
| failed | Create new PublisherBuilder emitting as a first thing error signal |
| concat | Concat two streams |
| coupled | Two parallel streams sharing cancel, onError and onComplete signals |
| limit | Limit the size of the stream, when limit is reached completes |
| peek | Invoke consumer for every item passing this operator |
| filter | Drop item when expression result to false |
| map | Transform items |
| flatMap | Flatten supplied stream to current stream |
| flatMapIterable | Flatten supplied iterable to current stream |
| flatMapCompletionStage | Map elements to completion stage and wait for each to be completed, keeps the order |
| flatMapRSPublisher | Map elements to Publishers and flatten this sub streams to original stream |
| takeWhile | Let items pass until expression is true, first time its false completes |
| dropWhile | Drop items until expression is true, first time its false let everything pass |
| skip | Drop first n items |
| distinct | Let pass only distinct items |
| via | Connect supplied processor to current stream return supplied processor |
| onError | Invoke supplied consumer when onError signal received |
| onErrorResume | Emit one last supplied item when onError signal received |
| onErrorResumeWith | When onError signal received continue emitting from supplied publisher builder |
| onErrorResumeWithRsPublisher | When onError signal received continue emitting from supplied publisher |
| onComplete | Invoke supplied runnable when onComplete signal received |
| onTerminate | Invoke supplied runnable when onComplete or onError signal received |
| ifEmpty | Executes given java.lang.Runnable when stream is finished without value(empty stream). |
| to | Connect this stream to supplied subscriber |
| toList | Collect all intercepted items to List |
| collect | Collect all intercepted items with provided collector |
| forEach | Invoke supplied Consumer for each intercepted item |
| ignore | Ignore all onNext signals, wait for onComplete |
| reduce | Reduction with provided expression |
| cancel | Cancel stream immediately |
| findFirst | Return first intercepted element |
Graphs
Graphs are pre-prepared stream builders with stages, which can be combined to close graph with methods via and to.
Combining the graphs and running the stream:
// Assembly of stream, nothing is streamed yet
PublisherBuilder<String> publisherStage =
ReactiveStreams.of("foo", "bar")
.map(String::trim);
ProcessorBuilder<String, String> processorStage =
ReactiveStreams.<String>builder()
.map(String::toUpperCase);
SubscriberBuilder<String, Void> subscriberStage =
ReactiveStreams.<String>builder()
.map(s -> "Item received: " + s)
.forEach(System.out::println);
// Execution of pre-prepared stream
publisherStage
.via(processorStage)
.to(subscriberStage).run();
> Item received: FOO
> Item received: BARcontent_copy