Contents
Overview
Connecting streams to Oracle AQ with Reactive Messaging couldn’t be easier. This connector extends Helidon’s JMS connector with Oracle’s AQ-specific API.
Maven Coordinates
To enable AQ Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.aq</groupId>
<artifactId>helidon-messaging-aq</artifactId>
</dependency>content_copy
Configuration
Connector name: helidon-aq
Configuration options
| Key | Kind | Type | Default Value | Description |
|---|---|---|---|---|
acknowledge-mode | VALUE | i.h.m.c.j.AcknowledgeMode | AUTO_ACKNOWLEDGE | JMS acknowledgement mode |
client-id | VALUE | String | Client identifier for JMS connection | |
data-source | VALUE | String | Mapping to javax.sql.DataSource DataSource supplied with io.helidon.messaging.connectors.aq.AqConnector.AqConnectorBuilder#dataSource(String, javax.sql.DataSource) AqConnectorBuilder.dataSource() | |
destination | VALUE | String | Queue or topic name | |
durable | VALUE | Boolean | false | Indicates whether the consumer should be created as durable (only relevant for topic destinations) |
message-selector | VALUE | String | JMS API message selector expression based on a subset of the SQL92 | |
named-factory | VALUE | String | Select jakarta.jms.ConnectionFactory ConnectionFactory in case factory is injected as a named bean or configured with name | |
non-local | VALUE | Boolean | false | When set to true, messages published by this connection, or any connection with the same client identifier, will not be delivered to this durable subscription |
password | VALUE | String | Password used for creating JMS connection | |
period-executions | VALUE | Long | 100 | Period for executing poll cycles in millis |
poll-timeout | VALUE | Long | 50 | Timeout for polling for next message in every poll cycle in millis |
queue | VALUE | String | Use supplied destination name and Type#QUEUE QUEUE as type | |
session-group-id | VALUE | String | When multiple channels share same session-group-id, they share same JMS session | |
subscriber-name | VALUE | String | Subscriber name used to identify a durable subscription | |
topic | VALUE | String | Use supplied destination name and Type#TOPIC TOPIC as type | |
transacted | VALUE | Boolean | false | Indicates whether the session will use a local transaction |
type | VALUE | i.h.m.c.j.Type | QUEUE | Specify if connection is io.helidon.messaging.connectors.jms.Type#QUEUE queue or io.helidon.messaging.connectors.jms.Type#TOPIC topic |
username | VALUE | String | User name used for creating JMS connection |
Configured JMS Factory
The simplest possible usage is leaving construction of AQjmsConnectionFactory to the connector.
Example of connector config:
mp:
messaging:
connector:
helidon-aq:
transacted: false
acknowledge-mode: CLIENT_ACKNOWLEDGE
url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
user: gandalf
password: mellon
outgoing.to-aq:
connector: helidon-aq
destination: TESTQUEUE
type: queue
incoming.from-aq:
connector: helidon-aq
destination: TESTQUEUE
type: queuecontent_copy
Its also possible and preferable to refer to configured datasource, in our example Oracle UCP datasource:
Example of connector config with Oracle UCP datasource:
javax:
sql:
DataSource:
aq-test-ds:
connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
URL: jdbc:oracle:thin:@exampledb_high?TNS_ADMIN=/home/gandalf/wallets/Wallet_EXAMPLEDB
user: gandalf
password: SuperSecretPassword1234
mp:
messaging:
connector:
helidon-aq:
transacted: false
acknowledge-mode: CLIENT_ACKNOWLEDGE
data-source: aq-test-ds
outgoing.toJms:
connector: helidon-aq
destination: TESTQUEUE
type: queue
incoming.fromJms:
connector: helidon-aq
destination: TESTQUEUE
type: queuecontent_copy
Injected JMS factory
If you need more advanced configurations, connector can work with injected AQjmsConnectionFactory:
Inject:
@Produces
@ApplicationScoped
@Named("aq-orderdb-factory")
public AQjmsConnectionFactory connectionFactory() throws JMSException {
AQjmsQueueConnectionFactory fact = new AQjmsQueueConnectionFactory();
fact.setJdbcURL(config.get("jdbc.url").asString().get());
fact.setUsername(config.get("jdbc.user").asString().get());
fact.setPassword(config.get("jdbc.pass").asString().get());
return fact;
}content_copy
Config:
jdbc:
url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
user: gandalf
pass: mellon
mp:
messaging:
connector:
helidon-aq:
named-factory: aq-orderdb-factory
outgoing.to-aq:
connector: helidon-aq
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-aq:
connector: helidon-aq
session-group-id: order-connection-1
destination: TESTQUEUE
type: queuecontent_copy
Usage
Consuming
Consuming one by one unwrapped value:
@Incoming("from-aq")
public void consumeAq(String msg) {
System.out.println("Oracle AQ says: " + msg);
}content_copy
Consuming one by one, manual ack:
@Incoming("from-aq")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeAq(AqMessage<String> msg) {
// direct commit
//msg.getDbConnection().commit();
System.out.println("Oracle AQ says: " + msg.getPayload());
// ack commits only in non-transacted mode
return msg.ack();
}content_copy
Producing
Producing to AQ:
@Outgoing("to-aq")
public PublisherBuilder<String> produceToAq() {
return ReactiveStreams.of("test1", "test2");
}content_copy