Contents
Overview
Connecting streams to JMS with Reactive Messaging couldn’t be easier.
Maven Coordinates
To enable JMS Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.jms</groupId>
<artifactId>helidon-messaging-jms</artifactId>
</dependency>content_copy
Configuration
Connector name: helidon-jms
Configuration options
| Key | Kind | Type | Default Value | Description |
|---|---|---|---|---|
acknowledge-mode | VALUE | i.h.m.c.j.AcknowledgeMode | AUTO_ACKNOWLEDGE | JMS acknowledgement mode |
destination | VALUE | String | Queue or topic name | |
jndi-initial-context-properties | MAP | String | Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url | |
jndi-initial-factory | VALUE | String | JNDI initial factory | |
jndi-jms-factory | VALUE | String | JNDI name of JMS factory | |
jndi-provider-url | VALUE | String | JNDI provider url | |
message-selector | VALUE | String | JMS API message selector expression based on a subset of the SQL92 | |
named-factory | VALUE | String | To select from manually configured jakarta.jms.ConnectionFactory ConnectionFactories over JmsConnector.JmsConnectorBuilder#connectionFactory(String, jakarta.jms.ConnectionFactory) JmsConnectorBuilder#connectionFactory() | |
password | VALUE | String | Password used for creating JMS connection | |
period-executions | VALUE | Long | 100 | Period for executing poll cycles in millis |
poll-timeout | VALUE | Long | 50 | Timeout for polling for next message in every poll cycle in millis |
queue | VALUE | String | Use supplied destination name and Type#QUEUE QUEUE as type | |
session-group-id | VALUE | String | When multiple channels share same session-group-id, they share same JMS session | |
topic | VALUE | String | Use supplied destination name and Type#TOPIC TOPIC as type | |
transacted | VALUE | Boolean | false | Indicates whether the session will use a local transaction |
type | VALUE | i.h.m.c.j.Type | QUEUE | Specify if connection is Type#QUEUE queue or Type#TOPIC topic |
username | VALUE | String | User name used for creating JMS connection |
Besides the configuration options above, custom attributes can be passed over configuration.
Custom Attributes Examples
jndi.destination | JNDI destination identifier. |
jndi.env-properties | Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url … |
producer.someproperty | property with producer prefix is set to producer instance (for example WLS Unit-of-Order WLMessageProducer.setUnitOfOrder("unit-1") can be configured as producer.unit-of-order=unit-1) |
Configured JMS factory
The simplest possible usage is looking up JMS ConnectionFactory in the naming context.
Example of connector config:
mp.messaging:
incoming.from-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
outgoing.to-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
connector:
helidon-jms:
user: Gandalf
password: mellon
jndi:
jms-factory: ConnectionFactory
env-properties:
java.naming:
factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
provider.url: tcp://localhost:61616content_copy
Injected JMS factory
In case you need more advanced setup, connector can work with injected factory instance.
Inject:
@Produces
@ApplicationScoped
@Named("active-mq-factory")
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(config.get("jms.url").asString().get());
}content_copy
Config:
jms:
url: tcp://127.0.0.1:61616
mp:
messaging:
connector:
helidon-jms:
named-factory: active-mq-factory
outgoing.to-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queuecontent_copy
Usage
Consuming
Consuming one by one unwrapped value:
@Incoming("from-jms")
public void consumeJms(String msg) {
System.out.println("JMS says: " + msg);
}content_copy
Consuming one by one, manual ack:
@Incoming("from-jms")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeJms(JmsMessage<String> msg) {
System.out.println("JMS says: " + msg.getPayload());
return msg.ack();
}content_copy
Producing
Example of producing to JMS:
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
return ReactiveStreams.of("test1", "test2");
}content_copy
Example of more advanced producing to JMS:
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.correlationId(UUID.randomUUID().toString())
.property("stringProp", "cool property")
.property("byteProp", 4)
.property("intProp", 5)
.onAck(() -> CompletableFuture.completedStage(null)
.thenRun(() -> System.out.println("Acked!")))
.build());
}content_copy
Example of even more advanced producing to JMS with custom mapper:
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.customMapper((p, session) -> {
TextMessage textMessage = session.createTextMessage(p);
textMessage.setStringProperty("custom-mapped-property", "XXX" + p);
return textMessage;
})
.build()
);
}content_copy