Maven Coordinates
To enable Reactive Messaging add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging</groupId>
<artifactId>helidon-messaging</artifactId>
</dependency>Reactive Messaging
Asynchronous messaging is a commonly used form of communication in the world of microservices. While its possible to start building your reactive streams directly by combining operators and connecting them to reactive APIs, with Helidon SE Reactive Messaging, you can now use prepared tools for repetitive use case scenarios .
For example connecting your streams to external services usually requires a lot of boiler-plate code for configuration handling, backpressure propagation, acknowledgement and more.
For such tasks there is a system of connectors, emitters and means to orchestrate them in Helidon, called Reactive Messaging. It’s basically an API for connecting and configuring Connectors and Emitters with your reactive streams thru so called Channels.
You may wonder how Reactive Messaging relates to MicroProfile Reactive Messaging. As the making of connectors or even configuring them can be repetitive task leading to the same results, Helidon SE Reactive Messaging supports very same configuration format for connectors as its MicroProfile counterpart does. Also, MP Connectors are reusable in Helidon SE Messaging with some limitation(there is no CDI in Helidon SE). All Messaging connectors in Helidon are made to be universally usable by Helidon MP and SE.
Channel
Channel is a named pair of Publisher and Subscriber. Channels can be connected together by processors. Registering of Publisher or Subscriber for a channel can be done by Messaging API, or configured implicitly for using registered connector for generating such Publisher or Subscriber.
Channel<String> channel1 = Channel.create("channel1");
Messaging.builder()
.publisher(channel1, Multi.just("message 1", "message 2")
.map(Message::of))
.listener(channel1, s -> System.out.println("Intecepted message " + s))
.build()
.start();Processor
Processor is a typical reactive processor acting as a Subscriber to upstream and as a Publisher to downstream. In terms of reactive messaging it is able to connect two channels to one reactive stream.
Channel<String> firstChannel = Channel.create("first-channel");
Channel<String> secondChannel = Channel.create("second-channel");
Messaging.builder()
.publisher(secondChannel, ReactiveStreams.of("test1", "test2", "test3")
.map(Message::of))
.processor(secondChannel, firstChannel, ReactiveStreams.<Message<String>>builder()
.map(Message::getPayload)
.map(String::toUpperCase)
.map(Message::of)
)
.subscriber(firstChannel, ReactiveStreams.<Message<String>>builder()
.peek(Message::ack)
.map(Message::getPayload)
.forEach(s -> System.out.println("Consuming message " + s)))
.build()
.start();
>Consuming message TEST1
>Consuming message TEST2
>Consuming message TEST3Message
Reactive Messaging in Helidon SE uses the same concept of message wrapping as MicroProfile messaging. The only notable difference is that SE Messaging does almost no implicit or automatic acknowledgement due to no magic philosophy of Helidon SE.
Only exception to this are variants of methods Messaging.Builder#listener and Messaging.Builder#processor with consumer or function params, conveniently unwrapping payload for you. After such implicit unwrapping is not possible to do a manual acknowledgement, therefore implicit ack before callback is executed is necessary.
Connector
Connector concept is a way for connecting channels to external sources. To make creation and usage of connectors as easy and versatile as possible, Helidon SE Messaging uses same API for connectors like MicroProfile Reactive Messaging does. This allows connectors to be usable in both flavors of Helidon with one limitation which is that connector has to be able to work without CDI.
Example of such a versatile connectors in Helidon: