Message

The Reactive Messaging Message class can be used to wrap or unwrap data items between methods and connectors. The message wrapping and unwrapping can be performed explicitly by using org.eclipse.microprofile.reactive.messaging.Message#of(T) or implicitly through the messaging core.

Example of explicit and implicit wrapping and unwrapping
@Outgoing("publisher-payload")
public PublisherBuilder<Integer> streamOfMessages() {
    return ReactiveStreams.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}

@Incoming("publisher-payload")
@Outgoing("wrapped-message")
public Message<String> rewrapMessageManually(Message<Integer> message) {
    return Message.of(Integer.toString(message.getPayload()));
}

@Incoming("wrapped-message")
public void consumeImplicitlyUnwrappedMessage(String value) {
    System.out.println("Consuming message: " + value);
}
Copied

Acknowledgement

Message carries a callback for reception acknowledgement, acknowledgement in messaging methods is possible manually by org.eclipse.microprofile.reactive.messaging.Message#ack or automatically according explicit or implicit acknowledgement strategy by messaging core. Explicit strategy configuration is possible with @Acknowledgment annotation which has one required attribute value that expects the strategy type from enum org.eclipse.microprofile.reactive.messaging.Acknowledgment.Strategy. More information about supported signatures and implicit automatic acknowledgement can be found in specification Message acknowledgement.

Acknowledgement strategies
@Acknowledgment(Acknowledgment.Strategy.NONE)No acknowledgment
@Acknowledgment(Acknowledgment.Strategy.MANUAL)No automatic acknowledgment
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)Ack automatically before method invocation or processing
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)Ack automatically after method invocation or processing
Example of manual acknowledgment
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
    return ReactiveStreams.of(Message.of("This is Payload", () -> {
            System.out.println("This particular message was acked!");
            return CompletableFuture.completedFuture(null);
        })).buildRs();
}

@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public void receiveAndAckMessage(Message<String> msg) {
    msg.ack();
}
Copied
  • Calling ack() will print "This particular message was acked!" to System.out
Example of manual acknowledgment
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
    return ReactiveStreams.of(Message.of("This is Payload", () -> {
            System.out.println("This particular message was acked!");
            return CompletableFuture.completedFuture(null);
        })).buildRs();
}

@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public void receiveAndAckMessage(Message<String> msg) {
    msg.ack();
}
Copied
  • Calling ack() will print "This particular message was acked!" to System.out
Example of explicit pre-process acknowledgment
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
    return ReactiveStreams.of(Message.of("This is Payload", () -> {
            System.out.println("This particular message was acked!");
            return CompletableFuture.completedFuture(null);
        })).buildRs();
}

/**
* Prints to the console:
* > This particular message was acked!
* > Method invocation!
*/
@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void receiveAndAckMessage(Message<String> msg) {
    System.out.println("Method invocation!");
}
Copied
Example of explicit post-rocess acknowledgment
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
    return ReactiveStreams.of(Message.of("This is Payload", () -> {
            System.out.println("This particular message was acked!");
            return CompletableFuture.completedFuture(null);
        })).buildRs();
}

/**
* Prints to the console:
* > Method invocation!
* > This particular message was acked!
*/
@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public void receiveAndAckMessage(Message<String> msg) {
    System.out.println("Method invocation!");
}
Copied