Maven Coordinates

To enable MicroProfile Reactive Messaging add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
   <groupId>io.helidon.microprofile.messaging</groupId>
   <artifactId>helidon-microprofile-messaging</artifactId>
</dependency>
Copied

To include health checks for Messaging add the following dependency:

<dependency>
   <groupId>io.helidon.microprofile.messaging</groupId>
   <artifactId>helidon-microprofile-messaging-health</artifactId>
</dependency>
Copied

Reactive Messaging

MicroProfile Reactive Messaging uses CDI beans to produce, consume or process messages over Reactive Streams. Such messaging bean is expected to be either in ApplicationScoped or Dependent scope. Messages are managed by methods annotated by @Incoming and @Outgoing and the invocation is always driven by message core - either at assembly time, or for every message coming from the stream.

Messaging methods are not meant to be invoked directly!

Terms definition
messaging methodbean method invoked by messaging Specification
connectorReactive Messaging connector
channelnamed pair of producer and consumer, both sides can be either messaging method or connector

The bean can have methods annotated by @Incoming, @Outgoing or both.

Consuming methods with @Incoming annotation

The annotation has one required attribute value that defines the channel name.

Such annotated messaging method can function in two ways:

  • consume every message coming from the stream connected to the channel

  • prepare reactive stream’s subscriber and connect it to the channel

Example consuming every message from channel example-channel-2:
@Incoming("example-channel-2")
public void printMessage(String msg) {
    System.out.println("Just received message: " + msg);
}
Copied
Example preparing reactive stream subscriber for channel example-channel-1:
@Incoming("example-channel-2")
public Subscriber<String> printMessage() {
    return ReactiveStreams.<String>builder()
                .forEach(msg -> System.out.println("Just received message: " + msg))
                .build();
}
Copied

Producing methods with @Outgoing annotation

The annotation has one required attribute value that defines the channel name.

Such annotated messaging method can function in two ways:

  • produce exactly one message to the stream connected to the channel

  • prepare reactive stream’s publisher and connect it to the channel

Example producing exactly one message to channel example-channel-1:
@Outgoing("example-channel-1")
public String produceMessage() {
    return "foo";
}
Copied
Example preparing reactive stream publisher publishing three messages to the channel example-channel-1:
@Outgoing("example-channel-1")
public Publisher<String> printMessage() {
    return ReactiveStreams.of("foo", "bar", "baz").buildRs();
}
Copied

Processing methods with @Incoming and @Outgoing annotation

Such methods acts as processors, consuming messages from one channel and producing to another.

Such annotated messaging method can function in multiple ways:

  • process every message

  • prepare reactive stream’s processor and connect it between the channels

  • on every message prepare new publisher(equivalent to flatMap operator)

Example processing every message from channel example-channel-1 to channel example-channel-2:
@Incoming("example-channel-1")
@Outgoing("example-channel-2")
public String processMessage(String msg) {
    return msg.toUpperCase();
}
Copied
Example preparing processor stream to be connected between channels example-channel-1 and example-channel-2:
@Incoming("example-channel-1")
@Outgoing("example-channel-2")
public Processor<String, String> processMessage() {
    return ReactiveStreams.<String>builder()
                .map(String::toUpperCase)
                .buildRs();
}
Copied
Example processing every message from channel example-channel-1`as stream to be flattened to channel `example-channel-2:
@Incoming("example-channel-1")
@Outgoing("example-channel-2")
public String processMessage(String msg) {
    return ReactiveStreams.of(msg.toUpperCase(), msg.toLowerCase()).buildRs();
}
Copied

Health check

Messaging in Helidon has built in health probes for liveness and readiness. To activate it add the health check dependency.

  • Liveness - channel is considered UP until cancel or onError signal is intercepted on it.

  • Readiness - channel is considered DOWN until onSubscribe signal is intercepted on it.

If you check your health endpoints /health/live and /health/ready you will discover every messaging channel to have its own probe.

{
    "name": "messaging",
    "state": "UP",
    "status": "UP",
    "data": {
        "my-channel-1": "UP",
        "my-channel-2": "UP"
    }
}
Copied