Maven Coordinates
To enable Reactive Engine add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-reactive</artifactId>
</dependency>content_copy
Reactive Engine
Helidon has its own set of reactive operators that have no dependencies outside of the Helidon ecosystem. These operators can be used with java.util.concurrent.Flow based reactive streams. Stream processing operator chain can be easily constructed by io.helidon.common.reactive.Multi, or io.helidon.common.reactive.Single for streams with single value.
Example of Multi usage:
AtomicInteger sum = new AtomicInteger();
Multi.just("1", "2", "3", "4", "5")
.limit(3)
.map(Integer::parseInt)
.forEach(sum::addAndGet);
System.out.println("Sum: " + sum.get());
> Sum: 6content_copy
Example of Single usage:
Single.just("1")
.map(Integer::parseInt)
.map(i -> i + 5)
.toStage()
.whenComplete((i, t) -> System.out.println("Result: " + i));
> Result: 6content_copy
Operators
| defer | Call the given supplier function for each individual downstream Subscriber to return a Flow.Publisher to subscribe to. |
| map | Map this Multi instance to a new Multi of another type using the given Mapper. |
| defaultIfEmpty | Signals the default item if the upstream is empty. |
| switchIfEmpty | Switch to the other publisher if the upstream is empty. |
| peek | Invoke provided consumer for every item in stream. |
| distinct | Filter out all duplicates. |
| filter | Filter stream items with provided predicate. |
| takeWhile | Take the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are sent to downstream, when predicate returns false stream is completed. |
| dropWhile | Drop the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are NOT sent to downstream but being dropped, predicate is never called again after it returns false for the first time. |
| limit | Limit stream to allow only specified number of items to pass. |
| skip | Skip first n items, all the others are emitted. |
| flatMap | Transform each upstream item with the supplied function into a Flow.Publisher, subscribe to them and then flatten their items into a single sequence of items emitted to the downstream. |
| flatMap | Transform each upstream item with the supplied function and flatten the resulting Flow.Publisher to downstream while limiting the maximum number of concurrent inner `Flow.Publisher`s and their in-flight item count, optionally aggregating and delaying all errors until all sources terminate. |
| flatMapCompletionStage | Transform each upstream item with the supplied function and flatten the resulting CompletionStage results to downstream. |
| flatMapIterable | Transform each upstream item with the supplied function and flatten the resulting Iterable to the downstream. |
| flatMapOptional | Transform each upstream item with the supplied function and flatten the resulting Optional to the downstream as item if present. |
| observeOn | Re-emit the upstream’s signals to the downstream on the given executor’s thread using a default buffer size of 32 and errors skipping ahead of items. |
| observeOn | Re-emit the upstream’s signals to the downstream on the given executor’s thread. |
| forEach | Terminal stage, invokes provided consumer for every item in the stream. |
| collectList | Collect the items of this Multi instance into a Single of List. |
| collect | Collect the items of this Multi instance into a Single. |
| collect | Collect the items of this Multi into a collection provided via a Supplier and mutated by a BiConsumer callback. |
| collectStream | Collects up upstream items with the help of the callbacks of a java.util.stream.Collector. |
| reduce | Combine subsequent items via a callback function and emit the final value result as a Single. |
| reduce | Combine every upstream item with an accumulator value to produce a new accumulator value and emit the final accumulator value as a Single. |
| first | Get the first item of this Multi instance as a Single. |
| from | Wrap a CompletionStage into a Multi and signal its outcome non-blockingly. |
| from | Wrap a CompletionStage into a Multi and signal its outcome non-blockingly. |
| from | Create a Multi instance wrapped around the given publisher. |
| from | Create a Multi instance that publishes the given iterable. |
| from | Create a Multi instance that publishes the given Stream. |
| just | Create a Multi instance that publishes the given items to a single subscriber. |
| just | Create a Multi instance that publishes the given items to a single subscriber. |
| singleton | Create a Multi that emits a pre-existing item and then completes. |
| error | Create a Multi instance that reports the given exception to its subscriber(s). The exception is reported by invoking Subscriber#onError(java.lang.Throwable) when Publisher#subscribe(Subscriber) is called. |
| empty | Get a Multi instance that completes immediately. |
| never | Get a Multi instance that never completes. |
| concat | Concat streams to one. |
| onTerminate | Executes given java.lang.Runnable when any of signals onComplete, onCancel or onError is received. |
| ifEmpty | Executes given java.lang.Runnable when stream is finished without value(empty stream). |
| onComplete | Executes given java.lang.Runnable when onComplete signal is received. |
| onError | Executes the given java.util.function.Consumer when an onError signal is received. |
| onCancel | Executes given java.lang.Runnable when a cancel signal is received. |
| takeUntil | Relay upstream items until the other source signals an item or completes. |
| range | Emits a range of ever increasing integers. |
| rangeLong | Emits a range of ever increasing longs. |
| timer | Signal 0L and complete the sequence after the given time elapsed. |
| interval | Signal 0L, 1L and so on periodically to the downstream. |
| interval | Signal 0L after an initial delay, then 1L, 2L and so on periodically to the downstream. |
| timeout | Signals a TimeoutException if the upstream doesn’t signal the next item, error or completion within the specified time. |
| timeout | Switches to a fallback source if the upstream doesn’t signal the next item, error or completion within the specified time. |
| onErrorResume | java.util.function.Function providing one item to be submitted as onNext in case of onError signal is received. |
| onErrorResumeWith | Resume stream from supplied publisher if onError signal is intercepted. |
| retry | Retry a failing upstream at most the given number of times before giving up. |
| retry | Retry a failing upstream if the predicate returns true. |
| retryWhen | Retry a failing upstream when the given function returns a publisher that signals an item. |
Operator chains composition
In the situations when part of the operator chain needs to be prepared in advance, compose and to operators are at hand.
Combining operator chains:
// Assembly of stream, nothing is streamed yet
Multi<String> publisherStage =
Multi.just("foo", "bar")
.map(String::trim);
Function<Multi<T>, Multi<T>> processorStage =
upstream ->
upstream.map(String::toUpperCase);
// Execution of pre-prepared stream
publisherStage
.compose(processorStage)
.map(s -> "Item received: " + s)
.forEach(System.out::println);
> Item received: FOO
> Item received: BARcontent_copy