Maven Coordinates
To enable JMS Connector add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.jms</groupId>
<artifactId>helidon-messaging-jms</artifactId>
</dependency>content_copy
Reactive JMS Connector
Connecting streams to JMS with Reactive Messaging couldn’t be easier.
Config
Connector name: helidon-jms
Attributes
username | User name used to connect JMS session |
password | Password to connect JMS session |
type | Possible values are: queue, topic |
destination | Queue or topic name |
acknowledge-mode | Possible values are: AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages. Default value: AUTO_ACKNOWLEDGE |
transacted | Indicates whether the session will use a local transaction. Default value: false |
message-selector | JMS API message selector expression based on a subset of the SQL92. Expression can only access headers and properties, not the payload. |
client-id | Client identifier for JMS connection. |
durable | True for creating durable consumer (only for topic). Default value: false |
subscriber-name | Subscriber name for durable consumer used to identify subscription. |
non-local | If true then any messages published to the topic using this session’s connection, or any other connection with the same client identifier, will not be added to the durable subscription. Default value: false |
named-factory | Select in case factory is injected as a named bean or configured with name. |
poll-timeout | Timeout for polling for next message in every poll cycle in millis. Default value: 50 |
period-executions | Period for executing poll cycles in millis. Default value: 100 |
session-group-id | When multiple channels share same session-group-id, they share same JMS session and same JDBC connection as well. |
jndi.jms-factory | JNDI name of JMS factory. |
jndi.env-properties | Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url … |
producer.someproperty | property with producer prefix is set to producer instance (for example WLS Unit-of-Order WLMessageProducer.setUnitOfOrder("unit-1") can be configured as producer.unit-of-order=unit-1) |
Configured JMS factory
The simplest possible usage is looking up JMS ConnectionFactory in the naming context.
Example of connector config:
mp.messaging:
incoming.from-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
outgoing.to-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
connector:
helidon-jms:
user: Gandalf
password: mellon
jndi:
jms-factory: ConnectionFactory
env-properties:
java.naming:
factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
provider.url: tcp://localhost:61616content_copy
Injected JMS factory
In case you need more advanced setup, connector can work with injected factory instance.
Inject:
@Produces
@ApplicationScoped
@Named("active-mq-factory")
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(config.get("jms.url").asString().get());
}content_copy
Config:
jms:
url: tcp://127.0.0.1:61616
mp:
messaging:
connector:
helidon-jms:
named-factory: active-mq-factory
outgoing.to-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queuecontent_copy
Consuming
Consuming one by one unwrapped value:
@Incoming("from-jms")
public void consumeJms(String msg) {
System.out.println("JMS says: " + msg);
}content_copy
Consuming one by one, manual ack:
@Incoming("from-jms")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<?> consumeJms(JmsMessage<String> msg) {
System.out.println("JMS says: " + msg.getPayload());
return msg.ack();
}content_copy
Producing
Example of producing to JMS:
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
return ReactiveStreams.of("test1", "test2");
}content_copy
Example of more advanced producing to JMS:
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.correlationId(UUID.randomUUID().toString())
.property("stringProp", "cool property")
.property("byteProp", 4)
.property("intProp", 5)
.onAck(() -> System.out.println("Acked!"))
.build());
}content_copy
Example of even more advanced producing to JMS with custom mapper:
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.customMapper((p, session) -> {
TextMessage textMessage = session.createTextMessage(p);
textMessage.setStringProperty("custom-mapped-property", "XXX" + p);
return textMessage;
})
.build()
);
}content_copy