Messaging Connector Bean
Messaging connector is just an application scoped bean which implements IncomingConnectorFactory, OutgoingConnectorFactory or both.
Example connector
example-connector:@ApplicationScoped
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
return ReactiveStreams.of("foo", "bar")
.map(Message::of);
}
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.map(Message::getPayload)
.forEach(o -> System.out.println("Connector says: " + o));
}
}content_copy
Example of channel to connector mapping config:
mp.messaging.outgoing.to-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.connector: example-connectorcontent_copy
Example producing to connector:
@Outgoing("to-connector-channel")
public Publisher<String> produce() {
return Flowable.just("fee", "fie");
}
> Connector says: fee
> Connector says: fiecontent_copy
Example consuming from connector:
@Incoming("from-connector-channel")
public void consume(String value) {
System.out.println("Consuming: " + value);
}
> Consuming: foo
> Consuming: barcontent_copy
Configuration
Example connector accessing configuration:
@ApplicationScoped
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {
String firstPropValue = config.getValue("first-test-prop", String.class);
String secondPropValue = config.getValue("second-test-prop", String.class);
return ReactiveStreams.of(firstPropValue, secondPropValue)
.map(Message::of);
}
}content_copy
- Config context is merged from channel and connector contexts
Example of channel to connector mapping config with custom properties:
mp.messaging.incoming.from-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.first-test-prop: foo
mp.messaging.connector.example-connector.second-test-prop: barcontent_copy
- Channel → Connector mapping
- Channel configuration properties
- Connector configuration properties
Example consuming from connector:
@Incoming("from-connector-channel")
public void consume(String value) {
System.out.println("Consuming: " + value);
}
> Consuming: foo
> Consuming: barcontent_copy