Messaging Connector Bean

Messaging connector is just an application scoped bean which implements IncomingConnectorFactory, OutgoingConnectorFactory or both.

Example connector example-connector:
@ApplicationScoped
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {

   @Override
   public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
       return ReactiveStreams.of("foo", "bar")
               .map(Message::of);
   }

   @Override
   public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
       return ReactiveStreams.<Message<?>>builder()
               .map(Message::getPayload)
               .forEach(o -> System.out.println("Connector says: " + o));
   }
}
Copied
Example of channel to connector mapping config:
mp.messaging.outgoing.to-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.connector: example-connector
Copied
Example producing to connector:
@Outgoing("to-connector-channel")
public Publisher<String> produce() {
   return Flowable.just("fee", "fie");
}

> Connector says: fee
> Connector says: fie
Copied
Example consuming from connector:
@Incoming("from-connector-channel")
public void consume(String value) {
   System.out.println("Consuming: " + value);
}

> Consuming: foo
> Consuming: bar
Copied

Configuration

Example connector accessing configuration:
@ApplicationScoped
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory {

    @Override
    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {

        String firstPropValue = config.getValue("first-test-prop", String.class);
        String secondPropValue = config.getValue("second-test-prop", String.class);

        return ReactiveStreams.of(firstPropValue, secondPropValue)
                .map(Message::of);
    }
}
Copied
  • Config context is merged from channel and connector contexts
Example of channel to connector mapping config with custom properties:
mp.messaging.incoming.from-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.first-test-prop: foo
mp.messaging.connector.example-connector.second-test-prop: bar
Copied
  • Channel → Connector mapping
  • Channel configuration properties
  • Connector configuration properties
Example consuming from connector:
@Incoming("from-connector-channel")
public void consume(String value) {
   System.out.println("Consuming: " + value);
}

> Consuming: foo
> Consuming: bar
Copied