Messaging Connector
Connector for Reactive Messaging is a factory producing Publishers and Subscribers for Channels in Reactive Messaging. Messaging connector is just an implementation of IncomingConnectorFactory, OutgoingConnectorFactory or both.
example-connector:@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
return ReactiveStreams.of("foo", "bar")
.map(Message::of);
}
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.map(Message::getPayload)
.forEach(o -> System.out.println("Connector says: " + o));
}
}mp.messaging.outgoing.to-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.connector: example-connectorConfig config = Config.create();
Messaging.builder()
.config(config)
.connector(new ExampleConnector())
.publisher(Channel.create("to-connector-channel"),
ReactiveStreams.of("fee", "fie")
.map(Message::of)
)
.build()
.start();
> Connector says: fee
> Connector says: fieMessaging.builder()
.connector(new ExampleConnector())
.subscriber(Channel.create("from-connector-channel"),
ReactiveStreams.<Message<String>>builder()
.peek(Message::ack)
.map(Message::getPayload)
.forEach(s -> System.out.println("Consuming: " + s))
)
.build()
.start();
> Consuming: foo
> Consuming: barConfiguration
Messaging connector in Helidon SE can be configured explicitly by API or implicitly by config following notation of MicroProfile Reactive Messaging.
Configuration is being supplied to connector by Messaging implementation, two mandatory attributes are always present:
channel-namename of the channel which has this connector configured as Publisher or Subscriber,Channel.create('name-of-channel')in case of explicit configuration ormp.messaging.incoming.name-of-channel.connector: connector-namein case of implicit configconnectorname of the connector@Connector("connector-name")
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {
String firstPropValue = config.getValue("first-test-prop", String.class);
String secondPropValue = config.getValue("second-test-prop", String.class);
return ReactiveStreams.of(firstPropValue, secondPropValue)
.map(Message::of);
}
}- Config context is merged from channel and connector contexts
Explicit Config
An explicit config for channel’s publisher is possible with Channel.Builder#publisherConfig(Config config) and for subscriber with Channel.Builder#subscriberConfig(Config config). Supplied Helidon Config is merged with mandatory attributes and any implicit config found. Resulting config is served to Connector.
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> fromKafka = Channel.<String>builder()
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build()
)
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();- Prepare channel for connecting kafka connector with specific publisher configuration → listener,
- Channel → connector mapping is automatic when using
KafkaConnector.configBuilder() - Prepare Kafka connector, can be used by any channel
Implicit Config
Implicit config without any hard-coding is possible with Helidon Config following notation of MicroProfile Reactive Messaging.
mp.messaging.incoming.from-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.first-test-prop: foo
mp.messaging.connector.example-connector.second-test-prop: bar- Channel → Connector mapping
- Channel configuration properties
- Connector configuration properties
Config config = Config.create();
Messaging.builder()
.config(config)
.connector(new ExampleConnector())
.listener(Channel.create("from-connector-channel"),
s -> System.out.println("Consuming: " + s))
.build()
.start();
> Consuming: foo
> Consuming: barReusability in MP Messaging
As the API is the same for MicroProfile Reactive Messaging connectors, all that is needed to make connector work in both ways is annotating it with @ApplicationScoped. Such connector is treated as a bean in Helidon MP.
For specific informations about creating messaging connectors for Helidon MP visit Messaging Connector Bean.