Contents

Overview

Connecting streams to Oracle AQ with Reactive Messaging couldn’t be easier. This connector extends Helidon’s JMS connector with Oracle’s AQ-specific API.

Maven Coordinates

To enable AQ Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.messaging.aq</groupId>
    <artifactId>helidon-messaging-aq</artifactId>
</dependency>
Copied

Configuration

Connector name: helidon-aq

Configuration options

KeyKindTypeDefault ValueDescription
acknowledge-modeVALUEi.h.m.c.j.AcknowledgeModeAUTO_ACKNOWLEDGEJMS acknowledgement mode
client-idVALUEString Client identifier for JMS connection
data-sourceVALUEString Mapping to javax.sql.DataSource DataSource supplied with io.helidon.messaging.connectors.aq.AqConnector.AqConnectorBuilder#dataSource(String, javax.sql.DataSource) AqConnectorBuilder.dataSource()
destinationVALUEString Queue or topic name
durableVALUEBooleanfalseIndicates whether the consumer should be created as durable (only relevant for topic destinations)
message-selectorVALUEString JMS API message selector expression based on a subset of the SQL92
named-factoryVALUEString Select jakarta.jms.ConnectionFactory ConnectionFactory in case factory is injected as a named bean or configured with name
non-localVALUEBooleanfalseWhen set to true, messages published by this connection, or any connection with the same client identifier, will not be delivered to this durable subscription
passwordVALUEString Password used for creating JMS connection
period-executionsVALUELong100Period for executing poll cycles in millis
poll-timeoutVALUELong50Timeout for polling for next message in every poll cycle in millis
queueVALUEString Use supplied destination name and Type#QUEUE QUEUE as type
session-group-idVALUEString When multiple channels share same session-group-id, they share same JMS session
subscriber-nameVALUEString Subscriber name used to identify a durable subscription
topicVALUEString Use supplied destination name and Type#TOPIC TOPIC as type
transactedVALUEBooleanfalseIndicates whether the session will use a local transaction
typeVALUEi.h.m.c.j.TypeQUEUESpecify if connection is io.helidon.messaging.connectors.jms.Type#QUEUE queue or io.helidon.messaging.connectors.jms.Type#TOPIC topic
usernameVALUEString User name used for creating JMS connection

Configured JMS Factory

The simplest possible usage is leaving construction of AQjmsConnectionFactory to the connector.

Example of connector config:
mp:
  messaging:

    connector:
      helidon-aq:
        transacted: false
        acknowledge-mode: CLIENT_ACKNOWLEDGE
        url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
        user: gandalf
        password: mellon

    outgoing.to-aq:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue

    incoming.from-aq:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue
Copied

Its also possible and preferable to refer to configured datasource, in our example Oracle UCP datasource:

Example of connector config with Oracle UCP datasource:
javax:
  sql:
    DataSource:
      aq-test-ds:
        connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
        URL: jdbc:oracle:thin:@exampledb_high?TNS_ADMIN=/home/gandalf/wallets/Wallet_EXAMPLEDB
        user: gandalf
        password: SuperSecretPassword1234

mp:
  messaging:
    connector:
      helidon-aq:
        transacted: false
        acknowledge-mode: CLIENT_ACKNOWLEDGE
        data-source: aq-test-ds
    outgoing.toJms:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue
    incoming.fromJms:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue
Copied

Injected JMS factory

If you need more advanced configurations, connector can work with injected AQjmsConnectionFactory:

Inject:
@Produces
@ApplicationScoped
@Named("aq-orderdb-factory")
public AQjmsConnectionFactory connectionFactory() throws JMSException {
    AQjmsQueueConnectionFactory fact = new AQjmsQueueConnectionFactory();
    fact.setJdbcURL(config.get("jdbc.url").asString().get());
    fact.setUsername(config.get("jdbc.user").asString().get());
    fact.setPassword(config.get("jdbc.pass").asString().get());
    return fact;
}
Copied
Config:
jdbc:
  url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
  user: gandalf
  pass: mellon

mp:
  messaging:
    connector:
      helidon-aq:
        named-factory: aq-orderdb-factory

    outgoing.to-aq:
      connector: helidon-aq
      session-group-id: order-connection-1
      destination: TESTQUEUE
      type: queue

    incoming.from-aq:
      connector: helidon-aq
      session-group-id: order-connection-1
      destination: TESTQUEUE
      type: queue
Copied

Usage

Consuming

Consuming one by one unwrapped value:
@Incoming("from-aq")
public void consumeAq(String msg) {
    System.out.println("Oracle AQ says: " + msg);
}
Copied
Consuming one by one, manual ack:
@Incoming("from-aq")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeAq(AqMessage<String> msg) {
    // direct commit
    //msg.getDbConnection().commit();
    System.out.println("Oracle AQ says: " + msg.getPayload());
    // ack commits only in non-transacted mode
    return msg.ack();
}
Copied

Producing

Producing to AQ:
@Outgoing("to-aq")
public PublisherBuilder<String> produceToAq() {
    return ReactiveStreams.of("test1", "test2");
}
Copied