Contents

Overview

Connecting streams to JMS with Reactive Messaging couldn’t be easier.

Maven Coordinates

To enable JMS Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.messaging.jms</groupId>
    <artifactId>helidon-messaging-jms</artifactId>
</dependency>
Copied

Configuration

Connector name: helidon-jms

Configuration options

KeyKindTypeDefault ValueDescription
acknowledge-modeVALUEi.h.m.c.j.AcknowledgeModeAUTO_ACKNOWLEDGEJMS acknowledgement mode
destinationVALUEString Queue or topic name
jndi-initial-context-propertiesMAPString Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url
jndi-initial-factoryVALUEString JNDI initial factory
jndi-jms-factoryVALUEString JNDI name of JMS factory
jndi-provider-urlVALUEString JNDI provider url
message-selectorVALUEString JMS API message selector expression based on a subset of the SQL92
named-factoryVALUEString To select from manually configured jakarta.jms.ConnectionFactory ConnectionFactories over JmsConnector.JmsConnectorBuilder#connectionFactory(String, jakarta.jms.ConnectionFactory) JmsConnectorBuilder#connectionFactory()
passwordVALUEString Password used for creating JMS connection
period-executionsVALUELong100Period for executing poll cycles in millis
poll-timeoutVALUELong50Timeout for polling for next message in every poll cycle in millis
queueVALUEString Use supplied destination name and Type#QUEUE QUEUE as type
session-group-idVALUEString When multiple channels share same session-group-id, they share same JMS session
topicVALUEString Use supplied destination name and Type#TOPIC TOPIC as type
transactedVALUEBooleanfalseIndicates whether the session will use a local transaction
typeVALUEi.h.m.c.j.TypeQUEUESpecify if connection is Type#QUEUE queue or Type#TOPIC topic
usernameVALUEString User name used for creating JMS connection

Besides the configuration options above, custom attributes can be passed over configuration.

Custom Attributes Examples
jndi.destinationJNDI destination identifier.
jndi.env-propertiesEnvironment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url …​
producer.somepropertyproperty with producer prefix is set to producer instance (for example WLS Unit-of-Order WLMessageProducer.setUnitOfOrder("unit-1") can be configured as producer.unit-of-order=unit-1)

Configured JMS factory

The simplest possible usage is looking up JMS ConnectionFactory in the naming context.

Example of connector config:
mp.messaging:

  incoming.from-jms:
    connector: helidon-jms
    destination: messaging-test-queue-1
    type: queue

  outgoing.to-jms:
    connector: helidon-jms
    destination: messaging-test-queue-1
    type: queue

  connector:
    helidon-jms:
      user: Gandalf
      password: mellon
      jndi:
        jms-factory: ConnectionFactory
        env-properties:
          java.naming:
            factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
            provider.url: tcp://localhost:61616
Copied

Injected JMS factory

In case you need more advanced setup, connector can work with injected factory instance.

Inject:
@Produces
@ApplicationScoped
@Named("active-mq-factory")
public ConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory(config.get("jms.url").asString().get());
}
Copied
Config:
jms:
  url: tcp://127.0.0.1:61616

mp:
  messaging:
    connector:
      helidon-jms:
        named-factory: active-mq-factory

    outgoing.to-jms:
      connector: helidon-jms
      session-group-id: order-connection-1
      destination: TESTQUEUE
      type: queue

    incoming.from-jms:
      connector: helidon-jms
      session-group-id: order-connection-1
      destination: TESTQUEUE
      type: queue
Copied

Usage

Consuming

Consuming one by one unwrapped value:
@Incoming("from-jms")
public void consumeJms(String msg) {
    System.out.println("JMS says: " + msg);
}
Copied
Consuming one by one, manual ack:
@Incoming("from-jms")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeJms(JmsMessage<String> msg) {
    System.out.println("JMS says: " + msg.getPayload());
    return msg.ack();
}
Copied

Producing

Example of producing to JMS:
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
    return ReactiveStreams.of("test1", "test2");
}
Copied
Example of more advanced producing to JMS:
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
    return ReactiveStreams.of("test1", "test2")
            .map(s -> JmsMessage.builder(s)
                    .correlationId(UUID.randomUUID().toString())
                    .property("stringProp", "cool property")
                    .property("byteProp", 4)
                    .property("intProp", 5)
                    .onAck(() -> CompletableFuture.completedStage(null)
                            .thenRun(() -> System.out.println("Acked!")))
                    .build());
}
Copied
Example of even more advanced producing to JMS with custom mapper:
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
    return ReactiveStreams.of("test1", "test2")
            .map(s -> JmsMessage.builder(s)
                    .customMapper((p, session) -> {
                        TextMessage textMessage = session.createTextMessage(p);
                        textMessage.setStringProperty("custom-mapped-property", "XXX" + p);
                        return textMessage;
                    })
                    .build()
            );
}
Copied