Maven Coordinates

To enable Reactive Engine add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.common</groupId>
    <artifactId>helidon-common-reactive</artifactId>
</dependency>
Copied

Reactive Engine

Helidon has its own set of reactive operators that have no dependencies outside of the Helidon ecosystem. These operators can be used with java.util.concurrent.Flow based reactive streams. Stream processing operator chain can be easily constructed by io.helidon.common.reactive.Multi, or io.helidon.common.reactive.Single for streams with single value.

Example of Multi usage:
AtomicInteger sum = new AtomicInteger();

Multi.just("1", "2", "3", "4", "5")
        .limit(3)
        .map(Integer::parseInt)
        .forEach(sum::addAndGet);

System.out.println("Sum: " + sum.get());

> Sum: 6
Copied
Example of Single usage:
Single.just("1")
        .map(Integer::parseInt)
        .map(i -> i + 5)
        .toStage()
        .whenComplete((i, t) -> System.out.println("Result: " + i));

> Result: 6
Copied
Operators
deferCall the given supplier function for each individual downstream Subscriber to return a Flow.Publisher to subscribe to.
mapMap this Multi instance to a new Multi of another type using the given Mapper.
defaultIfEmptySignals the default item if the upstream is empty.
switchIfEmptySwitch to the other publisher if the upstream is empty.
peekInvoke provided consumer for every item in stream.
distinctFilter out all duplicates.
filterFilter stream items with provided predicate.
takeWhileTake the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are sent to downstream, when predicate returns false stream is completed.
dropWhileDrop the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are NOT sent to downstream but being dropped, predicate is never called again after it returns false for the first time.
limitLimit stream to allow only specified number of items to pass.
skipSkip first n items, all the others are emitted.
flatMapTransform each upstream item with the supplied function into a Flow.Publisher, subscribe to them and then flatten their items into a single sequence of items emitted to the downstream.
flatMapTransform each upstream item with the supplied function and flatten the resulting Flow.Publisher to downstream while limiting the maximum number of concurrent inner `Flow.Publisher`s and their in-flight item count, optionally aggregating and delaying all errors until all sources terminate.
flatMapCompletionStageTransform each upstream item with the supplied function and flatten the resulting CompletionStage results to downstream.
flatMapIterableTransform each upstream item with the supplied function and flatten the resulting Iterable to the downstream.
flatMapOptionalTransform each upstream item with the supplied function and flatten the resulting Optional to the downstream as item if present.
observeOnRe-emit the upstream’s signals to the downstream on the given executor’s thread using a default buffer size of 32 and errors skipping ahead of items.
observeOnRe-emit the upstream’s signals to the downstream on the given executor’s thread.
forEachTerminal stage, invokes provided consumer for every item in the stream.
collectListCollect the items of this Multi instance into a Single of List.
collectCollect the items of this Multi instance into a Single.
collectCollect the items of this Multi into a collection provided via a Supplier and mutated by a BiConsumer callback.
collectStreamCollects up upstream items with the help of the callbacks of a java.util.stream.Collector.
reduceCombine subsequent items via a callback function and emit the final value result as a Single.
reduceCombine every upstream item with an accumulator value to produce a new accumulator value and emit the final accumulator value as a Single.
firstGet the first item of this Multi instance as a Single.
fromWrap a CompletionStage into a Multi and signal its outcome non-blockingly.
fromWrap a CompletionStage into a Multi and signal its outcome non-blockingly.
fromCreate a Multi instance wrapped around the given publisher.
fromCreate a Multi instance that publishes the given iterable.
fromCreate a Multi instance that publishes the given Stream.
justCreate a Multi instance that publishes the given items to a single subscriber.
justCreate a Multi instance that publishes the given items to a single subscriber.
singletonCreate a Multi that emits a pre-existing item and then completes.
errorCreate a Multi instance that reports the given exception to its subscriber(s). The exception is reported by invoking Subscriber#onError(java.lang.Throwable) when Publisher#subscribe(Subscriber) is called.
emptyGet a Multi instance that completes immediately.
neverGet a Multi instance that never completes.
concatConcat streams to one.
onTerminateExecutes given java.lang.Runnable when any of signals onComplete, onCancel or onError is received.
ifEmptyExecutes given java.lang.Runnable when stream is finished without value(empty stream).
onCompleteExecutes given java.lang.Runnable when onComplete signal is received.
onErrorExecutes the given java.util.function.Consumer when an onError signal is received.
onCancelExecutes given java.lang.Runnable when a cancel signal is received.
takeUntilRelay upstream items until the other source signals an item or completes.
rangeEmits a range of ever increasing integers.
rangeLongEmits a range of ever increasing longs.
timerSignal 0L and complete the sequence after the given time elapsed.
intervalSignal 0L, 1L and so on periodically to the downstream.
intervalSignal 0L after an initial delay, then 1L, 2L and so on periodically to the downstream.
timeoutSignals a TimeoutException if the upstream doesn’t signal the next item, error or completion within the specified time.
timeoutSwitches to a fallback source if the upstream doesn’t signal the next item, error or completion within the specified time.
onErrorResumejava.util.function.Function providing one item to be submitted as onNext in case of onError signal is received.
onErrorResumeWithResume stream from supplied publisher if onError signal is intercepted.
retryRetry a failing upstream at most the given number of times before giving up.
retryRetry a failing upstream if the predicate returns true.
retryWhenRetry a failing upstream when the given function returns a publisher that signals an item.

Operator chains composition

In the situations when part of the operator chain needs to be prepared in advance, compose and to operators are at hand.

Combining operator chains:
        // Assembly of stream, nothing is streamed yet
        Multi<String> publisherStage =
                Multi.just("foo", "bar")
                        .map(String::trim);

        Function<Multi<T>, Multi<T>> processorStage =
                upstream ->
                    upstream.map(String::toUpperCase);

        // Execution of pre-prepared stream
        publisherStage
                .compose(processorStage)
                .map(s -> "Item received: " + s)
                .forEach(System.out::println);

> Item received: FOO
> Item received: BAR
Copied