Maven Coordinates
To enable MicroProfile Reactive Messaging add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging</artifactId>
</dependency>To include health checks for Messaging add the following dependency:
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging-health</artifactId>
</dependency>Reactive Messaging
MicroProfile Reactive Messaging uses CDI beans to produce, consume or process messages over Reactive Streams. Such messaging bean is expected to be either in ApplicationScoped or Dependent scope. Messages are managed by methods annotated by @Incoming and @Outgoing and the invocation is always driven by message core - either at assembly time, or for every message coming from the stream.
Messaging methods are not meant to be invoked directly!
| messaging method | bean method invoked by messaging Specification |
| connector | Reactive Messaging connector |
| channel | named pair of producer and consumer, both sides can be either messaging method or connector |
The bean can have methods annotated by @Incoming, @Outgoing or both.
Consuming methods with @Incoming annotation
The annotation has one required attribute value that defines the channel name.
Such annotated messaging method can function in two ways:
consume every message coming from the stream connected to the channel
prepare reactive stream’s subscriber and connect it to the channel
example-channel-2:@Incoming("example-channel-2")
public void printMessage(String msg) {
System.out.println("Just received message: " + msg);
}example-channel-1:@Incoming("example-channel-2")
public Subscriber<String> printMessage() {
return ReactiveStreams.<String>builder()
.forEach(msg -> System.out.println("Just received message: " + msg))
.build();
}Producing methods with @Outgoing annotation
The annotation has one required attribute value that defines the channel name.
Such annotated messaging method can function in two ways:
produce exactly one message to the stream connected to the channel
prepare reactive stream’s publisher and connect it to the channel
example-channel-1:@Outgoing("example-channel-1")
public String produceMessage() {
return "foo";
}example-channel-1:@Outgoing("example-channel-1")
public Publisher<String> printMessage() {
return ReactiveStreams.of("foo", "bar", "baz").buildRs();
}Processing methods with @Incoming and @Outgoing annotation
Such methods acts as processors, consuming messages from one channel and producing to another.
Such annotated messaging method can function in multiple ways:
process every message
prepare reactive stream’s processor and connect it between the channels
on every message prepare new publisher(equivalent to
flatMapoperator)
example-channel-1 to channel example-channel-2:@Incoming("example-channel-1")
@Outgoing("example-channel-2")
public String processMessage(String msg) {
return msg.toUpperCase();
}example-channel-1 and example-channel-2:@Incoming("example-channel-1")
@Outgoing("example-channel-2")
public Processor<String, String> processMessage() {
return ReactiveStreams.<String>builder()
.map(String::toUpperCase)
.buildRs();
}example-channel-1`as stream to be flattened to channel `example-channel-2:@Incoming("example-channel-1")
@Outgoing("example-channel-2")
public String processMessage(String msg) {
return ReactiveStreams.of(msg.toUpperCase(), msg.toLowerCase()).buildRs();
}Health check
Messaging in Helidon has built in health probes for liveness and readiness. To activate it add the health check dependency.
Liveness - channel is considered UP until
canceloronErrorsignal is intercepted on it.Readiness - channel is considered DOWN until
onSubscribesignal is intercepted on it.
If you check your health endpoints /health/live and /health/ready you will discover every messaging channel to have its own probe.
{
"name": "messaging",
"state": "UP",
"status": "UP",
"data": {
"my-channel-1": "UP",
"my-channel-2": "UP"
}
}