Message
The Reactive Messaging Message class can be used to wrap or unwrap data items between methods and connectors. The message wrapping and unwrapping can be performed explicitly by using org.eclipse.microprofile.reactive.messaging.Message#of(T) or implicitly through the messaging core.
@Outgoing("publisher-payload")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}
@Incoming("publisher-payload")
@Outgoing("wrapped-message")
public Message<String> rewrapMessageManually(Message<Integer> message) {
return Message.of(Integer.toString(message.getPayload()));
}
@Incoming("wrapped-message")
public void consumeImplicitlyUnwrappedMessage(String value) {
System.out.println("Consuming message: " + value);
}Acknowledgement
Message carries a callback for reception acknowledgement, acknowledgement in messaging methods is possible manually by org.eclipse.microprofile.reactive.messaging.Message#ack or automatically according explicit or implicit acknowledgement strategy by messaging core. Explicit strategy configuration is possible with @Acknowledgment annotation which has one required attribute value that expects the strategy type from enum org.eclipse.microprofile.reactive.messaging.Acknowledgment.Strategy. More information about supported signatures and implicit automatic acknowledgement can be found in specification Message acknowledgement.
@Acknowledgment(Acknowledgment.Strategy.NONE) | No acknowledgment |
@Acknowledgment(Acknowledgment.Strategy.MANUAL) | No automatic acknowledgment |
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) | Ack automatically before method invocation or processing |
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) | Ack automatically after method invocation or processing |
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(Message.of("This is Payload", () -> {
System.out.println("This particular message was acked!");
return CompletableFuture.completedFuture(null);
})).buildRs();
}
@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public void receiveAndAckMessage(Message<String> msg) {
msg.ack();
}- Calling ack() will print "This particular message was acked!" to System.out
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(Message.of("This is Payload", () -> {
System.out.println("This particular message was acked!");
return CompletableFuture.completedFuture(null);
})).buildRs();
}
@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public void receiveAndAckMessage(Message<String> msg) {
msg.ack();
}- Calling ack() will print "This particular message was acked!" to System.out
@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(Message.of("This is Payload", () -> {
System.out.println("This particular message was acked!");
return CompletableFuture.completedFuture(null);
})).buildRs();
}
/**
* Prints to the console:
* > This particular message was acked!
* > Method invocation!
*/
@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void receiveAndAckMessage(Message<String> msg) {
System.out.println("Method invocation!");
}@Outgoing("consume-and-ack")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(Message.of("This is Payload", () -> {
System.out.println("This particular message was acked!");
return CompletableFuture.completedFuture(null);
})).buildRs();
}
/**
* Prints to the console:
* > Method invocation!
* > This particular message was acked!
*/
@Incoming("consume-and-ack")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public void receiveAndAckMessage(Message<String> msg) {
System.out.println("Method invocation!");
}