Reactive Streams Operators
Maven Coordinates
To enable Reactive Streams add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.microprofile.reactive-streams</groupId>
<artifactId>helidon-microprofile-reactive-streams</artifactId>
</dependency>content_copy
Reactive Streams Operators
Implementation of MicroProfile Reactive Streams Operators specification. A standardised tool for manipulation with Reactive Streams, provides set of operators as so called stages, and the builders to prepare graphs of stages for streams to be build from.
Example of simple closed graph usage:
AtomicInteger sum = new AtomicInteger();
ReactiveStreams.of("1", "2", "3", "4", "5")
.limit(3)
.map(Integer::parseInt)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
> Sum: 6content_copy
Operators(Stages)
| fromIterable | Create new PublisherBuilder from supplied Iterable |
| of | Create new PublisherBuilder emitting supplied elements |
| ofNullable | Empty stream if supplied item is null |
| iterate | Create infinite stream with every next item created by supplied operator from previous item |
| generate | Create infinite stream with every item created by invocation of supplier |
| empty | Create new PublisherBuilder emitting as a first thing complete signal |
| failed | Create new PublisherBuilder emitting as a first thing error signal |
| concat | Concat two streams |
| coupled | Two parallel streams sharing cancel, onError and onComplete signals |
| limit | Limit the size of the stream, when limit is reached completes |
| peek | Invoke consumer for every item passing this operator |
| filter | Drop item when expression result to false |
| map | Transform items |
| flatMap | Flatten supplied stream to current stream |
| flatMapIterable | Flatten supplied iterable to current stream |
| flatMapCompletionStage | Map elements to completion stage and wait for each to be completed, keeps the order |
| flatMapRSPublisher | Map elements to Publishers and flatten this sub streams to original stream |
| takeWhile | Let items pass until expression is true, first time its false completes |
| dropWhile | Drop items until expression is true, first time its false let everything pass |
| skip | Drop first n items |
| distinct | Let pass only distinct items |
| via | Connect supplied processor to current stream return supplied processor |
| onError | Invoke supplied consumer when onError signal received |
| onErrorResume | Emit one last supplied item when onError signal received |
| onErrorResumeWith | When onError signal received continue emitting from supplied publisher builder |
| onErrorResumeWithRsPublisher | When onError signal received continue emitting from supplied publisher |
| onComplete | Invoke supplied runnable when onComplete signal received |
| onTerminate | Invoke supplied runnable when onComplete or onError signal received |
| ifEmpty | Executes given java.lang.Runnable when stream is finished without value(empty stream). |
| to | Connect this stream to supplied subscriber |
| toList | Collect all intercepted items to List |
| collect | Collect all intercepted items with provided collector |
| forEach | Invoke supplied Consumer for each intercepted item |
| ignore | Ignore all onNext signals, wait for onComplete |
| reduce | Reduction with provided expression |
| cancel | Cancel stream immediately |
| findFirst | Return first intercepted element |
Graphs
Graphs are pre-prepared stream builders with stages, which can be combined together to closed graph with methods via and to.
Combining the graphs and running the stream:
// Assembly of stream, nothing is streamed yet
PublisherBuilder<String> publisherStage =
ReactiveStreams.of("foo", "bar")
.map(String::trim);
ProcessorBuilder<String, String> processorStage =
ReactiveStreams.<String>builder()
.map(String::toUpperCase);
SubscriberBuilder<String, Void> subscriberStage =
ReactiveStreams.<String>builder()
.map(s -> "Item received: " + s)
.forEach(System.out::println);
// Execution of pre-prepared stream
publisherStage
.via(processorStage)
.to(subscriberStage).run();
> Item received: FOO
> Item received: BARcontent_copy