Messaging Connector

Connector for Reactive Messaging is a factory producing Publishers and Subscribers for Channels in Reactive Messaging. Messaging connector is just an implementation of IncomingConnectorFactory, OutgoingConnectorFactory or both.

Example connector example-connector:
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {

   @Override
   public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
       return ReactiveStreams.of("foo", "bar")
               .map(Message::of);
   }

   @Override
   public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
       return ReactiveStreams.<Message<?>>builder()
               .map(Message::getPayload)
               .forEach(o -> System.out.println("Connector says: " + o));
   }
}
Copied
Example of channel to connector mapping config:
mp.messaging.outgoing.to-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.connector: example-connector
Copied
Example producing to connector:
Config config = Config.create();

Messaging.builder()
         .config(config)
         .connector(new ExampleConnector())
         .publisher(Channel.create("to-connector-channel"),
                ReactiveStreams.of("fee", "fie")
                    .map(Message::of)
         )
         .build()
         .start();

> Connector says: fee
> Connector says: fie
Copied
Example consuming from connector:
Messaging.builder()
        .connector(new ExampleConnector())
        .subscriber(Channel.create("from-connector-channel"),
                ReactiveStreams.<Message<String>>builder()
                    .peek(Message::ack)
                    .map(Message::getPayload)
                    .forEach(s -> System.out.println("Consuming: " + s))
        )
        .build()
        .start();

> Consuming: foo
> Consuming: bar
Copied

Configuration

Messaging connector in Helidon SE can be configured explicitly by API or implicitly by config following notation of MicroProfile Reactive Messaging.

Configuration is being supplied to connector by Messaging implementation, two mandatory attributes are always present:

  • channel-name name of the channel which has this connector configured as Publisher or Subscriber, Channel.create('name-of-channel') in case of explicit configuration or mp.messaging.incoming.name-of-channel.connector: connector-name in case of implicit config

  • connector name of the connector @Connector("connector-name")

Example connector accessing configuration:
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory {

    @Override
    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {

        String firstPropValue = config.getValue("first-test-prop", String.class);
        String secondPropValue = config.getValue("second-test-prop", String.class);

        return ReactiveStreams.of(firstPropValue, secondPropValue)
                .map(Message::of);
    }
}
Copied
  • Config context is merged from channel and connector contexts

Explicit Config

An explicit config for channel’s publisher is possible with Channel.Builder#publisherConfig(Config config) and for subscriber with Channel.Builder#subscriberConfig(Config config). Supplied Helidon Config is merged with mandatory attributes and any implicit config found. Resulting config is served to Connector.

Example consuming from Kafka connector with explicit config:
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();

Channel<String> fromKafka = Channel.<String>builder()
        .name("from-kafka")
        .publisherConfig(KafkaConnector.configBuilder()
                .bootstrapServers(kafkaServer)
                .groupId("example-group-" + session.getId())
                .topic(topic)
                .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
                .enableAutoCommit(true)
                .keyDeserializer(StringDeserializer.class)
                .valueDeserializer(StringDeserializer.class)
                .build()
        )
        .build();

KafkaConnector kafkaConnector = KafkaConnector.create();

Messaging messaging = Messaging.builder()
        .connector(kafkaConnector)
        .listener(fromKafka, payload -> {
            System.out.println("Kafka says: " + payload);
        })
        .build()
        .start();
Copied
  • Prepare channel for connecting kafka connector with specific publisher configuration → listener,
  • Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
  • Prepare Kafka connector, can be used by any channel

Implicit Config

Implicit config without any hard-coding is possible with Helidon Config following notation of MicroProfile Reactive Messaging.

Example of channel to connector mapping config with custom properties:
mp.messaging.incoming.from-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.first-test-prop: foo
mp.messaging.connector.example-connector.second-test-prop: bar
Copied
  • Channel → Connector mapping
  • Channel configuration properties
  • Connector configuration properties
Example consuming from connector:
Config config = Config.create();

Messaging.builder()
        .config(config)
        .connector(new ExampleConnector())
        .listener(Channel.create("from-connector-channel"),
                    s -> System.out.println("Consuming: " + s))
        .build()
        .start();

> Consuming: foo
> Consuming: bar
Copied

Reusability in MP Messaging

As the API is the same for MicroProfile Reactive Messaging connectors, all that is needed to make connector work in both ways is annotating it with @ApplicationScoped. Such connector is treated as a bean in Helidon MP.

For specific informations about creating messaging connectors for Helidon MP visit Messaging Connector Bean.