Contents
Overview
Asynchronous messaging is a commonly used form of communication in the world of microservices. While it is possible to start building your reactive streams directly by combining operators and connecting them to reactive APIs, with Helidon SE Reactive Messaging, you can now use prepared tools for repetitive use case scenarios .
Maven Coordinates
To enable Reactive Messaging, add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging</groupId>
<artifactId>helidon-messaging</artifactId>
</dependency>Usage
Connecting your streams to external services usually requires a lot of boilerplate code for configuration handling, backpressure propagation, acknowledgement and more.
In Helidon there is a system of connectors, emitters and means to orchestrate these tasks called Reactive Messaging. It’s basically an API for connecting and configuring connectors and emitters with your reactive streams through Channels.
Reactive Messaging relates to MicroProfile Reactive Messaging as the making of connectors and configuring them can be a repetitive task that ultimately leads to the same results. Helidon SE Reactive Messaging supports the very same configuration format for connectors as its MicroProfile counterpart does. Also, MP Connectors are reusable in Helidon SE Messaging with some limitations such as there is no CDI in Helidon SE. All Messaging connectors in Helidon are made to be universally usable by Helidon MP and SE.
Channel
A channel is a named pair of Publisher and Subscriber. Channels can be connected together by processors. Registering a Publisher or Subscriber for a channel can be done by Messaging API, or configured implicitly using registered connectors to generate the Publisher or Subscriber.
Channel<String> channel1 = Channel.create("channel1");
Messaging.builder()
.publisher(channel1, Multi.just("message 1", "message 2")
.map(Message::of))
.listener(channel1, s -> System.out.println("Intecepted message " + s))
.build()
.start();Processor
Processor is a typical reactive processor acting as a Subscriber to upstream and as a Publisher to downstream. In terms of reactive messaging, it is able to connect two channels to one reactive stream.
Channel<String> firstChannel = Channel.create("first-channel");
Channel<String> secondChannel = Channel.create("second-channel");
Messaging.builder()
.publisher(secondChannel, Multi.just("test1", "test2", "test3")
.map(Message::of))
.processor(secondChannel, firstChannel, ReactiveStreams.<Message<String>>builder()
.map(Message::getPayload)
.map(String::toUpperCase)
.map(Message::of))
.subscriber(firstChannel, ReactiveStreams.<Message<String>>builder()
.peek(Message::ack)
.map(Message::getPayload)
.forEach(s -> System.out.println("Consuming message " + s)))
.build()
.start();
// >Consuming message TEST1
// >Consuming message TEST2
// >Consuming message TEST3Message
Reactive Messaging in Helidon SE uses the same concept of message wrapping as MicroProfile messaging. The only notable difference is that SE Messaging does almost no implicit or automatic acknowledgement due to no magic philosophy of Helidon SE.
The only exception to this are the variants of the methods Messaging.Builder#listener and Messaging.Builder#processor configured with consumer or function parameters which will conveniently unwrap the payload for you. Once the payload is automatically unwrapped, it is not possible to do a manual acknowledgement, therefore an implicit acknowledgement is executed before the callback.
Connectors
Connectors are used to connect channels to external sources. To make the creation and usage of connectors as easy and versatile as possible, Helidon SE Messaging uses the same API for connectors that MicroProfile Reactive Messaging does. This allows connectors to be used in both flavors of Helidon with one limitation which is that the connector has to be able to work without CDI.
Examples of versatile connectors in Helidon include the following:
Messaging Connector
A connector for Reactive Messaging is a factory that produces Publishers and Subscribers for Channels in Reactive Messaging. Messaging connector is just an implementation of IncomingConnectorFactory, OutgoingConnectorFactory or both.
example-connector:@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
return ReactiveStreams.of("foo", "bar")
.map(Message::of);
}
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.map(Message::getPayload)
.forEach(o -> System.out.println("Connector says: " + o));
}
}mp.messaging.outgoing.to-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.connector: example-connectorMessaging.builder()
.config(config)
.connector(new ExampleConnector())
.publisher(Channel.create("to-connector-channel"),
ReactiveStreams.of("fee", "fie")
.map(Message::of))
.build()
.start();
// > Connector says: fee
// > Connector says: fieMessaging.builder()
.config(config)
.connector(new ExampleConnector())
.subscriber(Channel.create("from-connector-channel"),
ReactiveStreams.<Message<String>>builder()
.peek(Message::ack)
.map(Message::getPayload)
.forEach(s -> System.out.println("Consuming: " + s)))
.build()
.start();
// > Consuming: foo
// > Consuming: barConfiguration for Messaging Connector
A messaging connector in Helidon SE can be configured explicitly by API or implicitly by config following the notation of MicroProfile Reactive Messaging.
Configuration that is supplied to connector by the Messaging implementation must include two mandatory attributes:
channel-namewhich is the name of the channel that has the connector configured as Publisher or Subscriber, orChannel.create('name-of-channel')in case of explicit configuration ormp.messaging.incoming.name-of-channel.connector: connector-namein case of implicit configconnectorname of the connector@Connector("connector-name")
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
String firstPropValue = config.getValue("first-test-prop", String.class);
String secondPropValue = config.getValue("second-test-prop", String.class);
return ReactiveStreams.of(firstPropValue, secondPropValue)
.map(Message::of);
}
}- Config context is merged from channel and connector contexts
Explicit Config for Messaging Connector
An explicit config for channel’s publisher is possible with Channel.Builder#publisherConfig(Config config) and for a subscriber with the Channel.Builder#subscriberConfig(Config config). The supplied Helidon Config is merged with the mandatory attributes and any implicit configuration found. The resulting configuration is then served to the Connector.
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> fromKafka = Channel.<String>builder()
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build())
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();- Prepare channel for connecting kafka connector with specific publisher configuration → listener,
- Channel → connector mapping is automatic when using
KafkaConnector.configBuilder() - Prepare Kafka connector, can be used by any channel
Implicit Config for Messaging Connector
Implicit config without any hard-coding is possible with Helidon Config following notation of MicroProfile Reactive Messaging.
mp.messaging.incoming.from-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.first-test-prop: foo
mp.messaging.connector.example-connector.second-test-prop: bar- Channel → Connector mapping
- Channel configuration properties
- Connector configuration properties
Messaging.builder()
.config(config)
.connector(new ExampleConnector())
.listener(Channel.create("from-connector-channel"),
s -> System.out.println("Consuming: " + s))
.build()
.start();
// > Consuming: foo
// > Consuming: barRe-usability in MP Messaging
As the API is the same for MicroProfile Reactive Messaging connectors, all that is needed to make connector work in both ways is annotating it with @ApplicationScoped. Such connector is treated as a bean in Helidon MP.
For specific information about creating messaging connectors for Helidon MP visit MicroProfile Reactive Messaging.
Kafka Connector
<dependency>
<groupId>io.helidon.messaging.kafka</groupId>
<artifactId>helidon-messaging-kafka</artifactId>
</dependency>Reactive Kafka Connector
Connecting streams to Kafka with Reactive Messaging couldn’t be easier.
Explicit Config with Config Builder for Kafka Connector
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> fromKafka = Channel.<String>builder()
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build())
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();- Prepare a channel for connecting kafka connector with specific publisher configuration → listener
- Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
- Prepare Kafka connector, can be used by any channel
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build())
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();- Prepare a channel for connecting kafka connector with specific publisher configuration → listener
- Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
- Prepare Kafka connector, can be used by any channel
Implicit Helidon Config for Kafka Connector
mp.messaging:
incoming.from-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
auto.offset.reset: latest
enable.auto.commit: true
group.id: example-group-id
outgoing.to-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
connector:
helidon-kafka:
bootstrap.servers: localhost:9092
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer- Kafka client consumer’s property auto.offset.reset configuration for
from-kafkachannel only - Kafka client’s property bootstrap.servers configuration for all channels using the connector
Channel<String> fromKafka = Channel.create("from-kafka");
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.config(config)
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();- Prepare Kafka connector, can be used by any channel
Channel<String> toKafka = Channel.create("to-kafka");
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.config(config)
.publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();- Prepare Kafka connector, can be used by any channel
Don’t forget to check out the examples with pre-configured Kafka docker image, for easy testing:
JMS Connector
<dependency>
<groupId>io.helidon.messaging.jms</groupId>
<artifactId>helidon-messaging-jms</artifactId>
</dependency>Reactive JMS Connector
Connecting streams to JMS with Reactive Messaging couldn’t be easier.
Explicit Config with Config Builder for JMS Connector
Channel<String> fromJms = Channel.<String>builder()
.name("from-jms")
.publisherConfig(JmsConnector.configBuilder()
.jndiInitialFactory(ActiveMQInitialContextFactory.class)
.jndiProviderUrl("tcp://127.0.0.1:61616")
.type(Type.QUEUE)
.destination("se-example-queue-1")
.build())
.build();
JmsConnector jmsConnector = JmsConnector.create();
Messaging messaging = Messaging.builder()
.connector(jmsConnector)
.listener(fromJms, payload -> {
System.out.println("Jms says: " + payload);
})
.build()
.start();- Prepare a channel for connecting jms connector with specific publisher configuration → listener
- Channel → connector mapping is automatic when using JmsConnector.configBuilder()
- Prepare JMS connector, can be used by any channel
Channel<String> toJms = Channel.<String>builder()
.subscriberConfig(JmsConnector.configBuilder()
.jndiInitialFactory(ActiveMQInitialContextFactory.class)
.jndiProviderUrl("tcp://127.0.0.1:61616")
.type(Type.QUEUE)
.destination("se-example-queue-1")
.build()
).build();
JmsConnector jmsConnector = JmsConnector.create();
Messaging messaging = Messaging.builder()
.publisher(toJms, Multi.just("test1", "test2").map(Message::of))
.connector(jmsConnector)
.build()
.start();- Prepare a channel for connecting jms connector with specific publisher configuration → listener
- Channel → connector mapping is automatic when using JmsConnector.configBuilder()
- Prepare JMS connector, can be used by any channel
Implicit Helidon Config for JMS Connector
mp.messaging:
incoming.from-jms:
connector: helidon-jms
destination: se-example-queue-1
session-group-id: session-group-1
type: queue
outgoing.to-jms:
connector: helidon-jms
destination: se-example-queue-1
type: queue
connector:
helidon-jms:
jndi:
jms-factory: ConnectionFactory
env-properties:
java.naming.factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url: tcp://127.0.0.1:61616Channel<String> fromJms = Channel.create("from-jms");
JmsConnector jmsConnector = JmsConnector.create();
Messaging messaging = Messaging.builder()
.config(config)
.connector(jmsConnector)
.listener(fromJms, payload -> {
System.out.println("Jms says: " + payload);
})
.build()
.start();- Prepare JMS connector, can be used by any channel
Channel<String> toJms = Channel.create("to-jms");
JmsConnector jmsConnector = JmsConnector.create();
Messaging messaging = Messaging.builder()
.config(config)
.publisher(toJms, Multi.just("test1", "test2").map(Message::of))
.connector(jmsConnector)
.build()
.start();- Prepare JMS connector, can be used by any channel
Don’t forget to check out the examples with pre-configured ActiveMQ docker image, for easy testing:
AQ Connector
<dependency>
<groupId>io.helidon.messaging.aq</groupId>
<artifactId>helidon-messaging-aq</artifactId>
</dependency>Reactive Oracle AQ Connector
Sending and Receiving
PoolDataSource pds = PoolDataSourceFactory.getPoolDataSource();
pds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
pds.setURL(jdbcUrl);
pds.setUser("frank");
pds.setPassword("frank");
AqConnector connector = AqConnector.builder()
.dataSource("test-ds", pds)
.build();
Channel<String> toAq = Channel.<String>builder()
.name("toAq")
.subscriberConfig(AqConnector.configBuilder()
.queue("example_queue_1")
.dataSource("test-ds")
.build())
.build();
Channel<String> fromAq = Channel.<String>builder()
.name("fromAq")
.publisherConfig(AqConnector.configBuilder()
.queue("example_queue_1")
.dataSource("test-ds")
.build())
.build();
Messaging.builder()
.connector(connector)
.publisher(toAq,
Multi.just("Hello", "world", "from", "Oracle", "DB!")
.map(Message::of))
.listener(fromAq, s -> System.out.println("Message received: " + s))
.build()
.start();- Prepare Oracle UCP
- Setup AQ connector and provide datasource with an identifier
test-ds - Setup channel for sending messages to queue
example_queue_1with datasourcetest-ds - Setup channel for receiving messages from queue
example_queue_1with datasourcetest-ds - Register connector and channels
- Add a publisher for several test messages to publish them to
example_queue_1immediately - Subscribe callback for any message coming from
example_queue_1