Contents
Overview
Connecting streams to Oracle AQ with Reactive Messaging couldn’t be easier. This connector extends Helidon’s JMS connector with Oracle’s AQ-specific API.
Maven Coordinates
To enable AQ Connector add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.aq</groupId>
<artifactId>helidon-messaging-aq</artifactId>
</dependency>content_copy
Configuration
Connector name: helidon-aq
Attributes
datasource | name of the datasource bean used to connect Oracle DB with AQ |
url | jdbc connection string used to connect Oracle DB with AQ (forbidden when datasource is specified) |
username | User name used to connect Oracle DB with AQ (forbidden when datasource is specified) |
password | Password to connect Oracle DB with AQ (forbidden when datasource is specified) |
type | Possible values are: queue, topic |
destination | Queue or topic name |
acknowledge-mode | Possible values are: AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages. Default value: AUTO_ACKNOWLEDGE |
transacted | Indicates whether the session will use a local transaction. Default value: false |
message-selector | JMS API message selector expression based on a subset of the SQL92. Expression can only access headers and properties, not the payload. |
client-id | Client identifier for JMS connection. |
durable | True for creating durable consumer (only for topic). Default value: false |
subscriber-name | Subscriber name for durable consumer used to identify subscription. |
non-local | If true then any messages published to the topic using this session’s connection, or any other connection with the same client identifier, will not be added to the durable subscription. Default value: false |
named-factory | Select in case factory is injected as a named bean or configured with name. |
poll-timeout | Timeout for polling for next message in every poll cycle in millis. Default value: 50 |
period-executions | Period for executing poll cycles in millis. Default value: 100 |
session-group-id | When multiple channels share same session-group-id, they share same JMS session and same JDBC connection as well. |
Configured JMS Factory
The simplest possible usage is leaving construction of AQjmsConnectionFactory to the connector.
Example of connector config:
mp:
messaging:
connector:
helidon-aq:
transacted: false
acknowledge-mode: CLIENT_ACKNOWLEDGE
url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
user: gandalf
password: mellon
outgoing.to-aq:
connector: helidon-aq
destination: TESTQUEUE
type: queue
incoming.from-aq:
connector: helidon-aq
destination: TESTQUEUE
type: queuecontent_copy
Its also possible and preferable to refer to configured datasource, in our example Oracle UCP datasource:
Example of connector config with Oracle UCP datasource:
javax:
sql:
DataSource:
aq-test-ds:
connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
URL: jdbc:oracle:thin:@exampledb_high?TNS_ADMIN=/home/gandalf/wallets/Wallet_EXAMPLEDB
user: gandalf
password: SuperSecretPassword1234
mp:
messaging:
connector:
helidon-aq:
transacted: false
acknowledge-mode: CLIENT_ACKNOWLEDGE
data-source: aq-test-ds
outgoing.toJms:
connector: helidon-aq
destination: TESTQUEUE
type: queue
incoming.fromJms:
connector: helidon-aq
destination: TESTQUEUE
type: queuecontent_copy
Injected JMS factory
If you need more advanced configurations, connector can work with injected AQjmsConnectionFactory:
Inject:
@Produces
@ApplicationScoped
@Named("aq-orderdb-factory")
public AQjmsConnectionFactory connectionFactory() throws JMSException {
AQjmsQueueConnectionFactory fact = new AQjmsQueueConnectionFactory();
fact.setJdbcURL(config.get("jdbc.url").asString().get());
fact.setUsername(config.get("jdbc.user").asString().get());
fact.setPassword(config.get("jdbc.pass").asString().get());
return fact;
}content_copy
Config:
jdbc:
url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
user: gandalf
pass: mellon
mp:
messaging:
connector:
helidon-aq:
named-factory: aq-orderdb-factory
outgoing.to-aq:
connector: helidon-aq
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-aq:
connector: helidon-aq
session-group-id: order-connection-1
destination: TESTQUEUE
type: queuecontent_copy
Usage
Consuming
Consuming one by one unwrapped value:
@Incoming("from-aq")
public void consumeAq(String msg) {
System.out.println("Oracle AQ says: " + msg);
}content_copy
Consuming one by one, manual ack:
@Incoming("from-aq")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeAq(AqMessage<String> msg) {
// direct commit
//msg.getDbConnection().commit();
System.out.println("Oracle AQ says: " + msg.getPayload());
// ack commits only in non-transacted mode
return msg.ack();
}content_copy
Producing
Producing to AQ:
@Outgoing("to-aq")
public PublisherBuilder<String> produceToAq() {
return ReactiveStreams.of("test1", "test2");
}content_copy