Maven Coordinates

To enable AQ Connector add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.messaging.aq</groupId>
    <artifactId>helidon-messaging-aq</artifactId>
</dependency>
Copied

Reactive Oracle Advanced Queueing Connector

Connecting streams to Oracle AQ with Reactive Messaging couldn’t be easier. This connector extends Helidon’s JMS connector with Oracle AQ’s specific API.

Config

Connector name: helidon-aq

Attributes
datasourcename of the datasource bean used to connect Oracle DB with AQ
urljdbc connection string used to connect Oracle DB with AQ (forbidden when datasource is specified)
usernameUser name used to connect Oracle DB with AQ (forbidden when datasource is specified)
passwordPassword to connect Oracle DB with AQ (forbidden when datasource is specified)
typePossible values are: queue, topic
destinationQueue or topic name
acknowledge-modePossible values are: AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages. Default value: AUTO_ACKNOWLEDGE
transactedIndicates whether the session will use a local transaction. Default value: false
message-selectorJMS API message selector expression based on a subset of the SQL92. Expression can only access headers and properties, not the payload.
client-idClient identifier for JMS connection.
durableTrue for creating durable consumer (only for topic). Default value: false
subscriber-nameSubscriber name for durable consumer used to identify subscription.
non-localIf true then any messages published to the topic using this session’s connection, or any other connection with the same client identifier, will not be added to the durable subscription. Default value: false
named-factorySelect in case factory is injected as a named bean or configured with name.
poll-timeoutTimeout for polling for next message in every poll cycle in millis. Default value: 50
period-executionsPeriod for executing poll cycles in millis. Default value: 100
session-group-idWhen multiple channels share same session-group-id, they share same JMS session and same JDBC connection as well.

Configured JMS factory

The simplest possible usage is leaving construction of AQjmsConnectionFactory to the connector.

Example of connector config:
mp:
  messaging:

    connector:
      helidon-aq:
        transacted: false
        acknowledge-mode: CLIENT_ACKNOWLEDGE
        url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
        user: gandalf
        password: mellon

    outgoing.to-aq:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue

    incoming.from-aq:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue
Copied

Its also possible and preferable to refer to configured datasource, in our example Oracle UCP datasource:

Example of connector config with Oracle UCP datasource:
javax:
  sql:
    DataSource:
      aq-test-ds:
        connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
        URL: jdbc:oracle:thin:@exampledb_high?TNS_ADMIN=/home/gandalf/wallets/Wallet_EXAMPLEDB
        user: gandalf
        password: SuperSecretPassword1234

mp:
  messaging:
    connector:
      helidon-aq:
        transacted: false
        acknowledge-mode: CLIENT_ACKNOWLEDGE
        data-source: aq-test-ds
    outgoing.toJms:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue
    incoming.fromJms:
      connector: helidon-aq
      destination: TESTQUEUE
      type: queue
Copied

Injected JMS factory

In case you need more advanced setup, connector can work with injected AQjmsConnectionFactory

Inject:
    @Produces
    @ApplicationScoped
    @Named("aq-orderdb-factory")
    public AQjmsConnectionFactory connectionFactory() throws JMSException {
        AQjmsQueueConnectionFactory fact = new AQjmsQueueConnectionFactory();
        fact.setJdbcURL(config.get("jdbc.url").asString().get());
        fact.setUsername(config.get("jdbc.user").asString().get());
        fact.setPassword(config.get("jdbc.pass").asString().get());
        return fact;
    }
Copied
Config:
jdbc:
  url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
  user: gandalf
  pass: mellon

mp:
  messaging:
    connector:
      helidon-aq:
        named-factory: aq-orderdb-factory

    outgoing.to-aq:
      connector: helidon-aq
      session-group-id: order-connection-1
      destination: TESTQUEUE
      type: queue

    incoming.from-aq:
      connector: helidon-aq
      session-group-id: order-connection-1
      destination: TESTQUEUE
      type: queue
Copied

Consuming

Consuming one by one unwrapped value:
@Incoming("from-aq")
public void consumeAq(String msg) {
    System.out.println("Oracle AQ says: " + msg);
}
Copied
Consuming one by one, manual ack:
@Incoming("from-aq")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<?> consumeAq(AqMessage<String> msg) {
    // direct commit
    //msg.getDbConnection().commit();
    System.out.println("Oracle AQ says: " + msg.getPayload());
    // ack commits only in non-transacted mode
    return msg.ack();
}
Copied

Producing

Producing to AQ:
@Outgoing("to-aq")
public PublisherBuilder<String> produceToAq() {
    return ReactiveStreams.of("test1", "test2");
}
Copied