Maven Coordinates
To enable Kafka Connector add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.kafka</groupId>
<artifactId>helidon-messaging-kafka</artifactId>
</dependency>content_copy
Reactive Kafka Connector
Connecting streams to Kafka with Reactive Messaging couldn’t be easier.
Explicit config with config builder
Example of consuming from Kafka:
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> fromKafka = Channel.<String>builder()
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build()
)
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();content_copy
- Prepare a channel for connecting kafka connector with specific publisher configuration → listener
- Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
- Prepare Kafka connector, can be used by any channel
Example of producing to Kafka:
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build()
).build();
KafkaConnector kafkaConnector = KafkaConnector.create();
messaging = Messaging.builder()
.publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();content_copy
- Prepare a channel for connecting kafka connector with specific publisher configuration → listener
- Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
- Prepare Kafka connector, can be used by any channel
Implicit Helidon Config
Example of connector config:
mp.messaging:
incoming.from-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
auto.offset.reset: latest
enable.auto.commit: true
group.id: example-group-id
outgoing.to-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
connector:
helidon-kafka:
bootstrap.servers: localhost:9092
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializercontent_copy
Example of consuming from Kafka:
Config config = Config.create();
Channel<String> fromKafka = Channel.create("from-kafka");
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.config(config)
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();content_copy
- Prepare Kafka connector, can be used by any channel
Example of producing to Kafka:
Config config = Config.create();
Channel<String> toKafka = Channel.create("to-kafka");
KafkaConnector kafkaConnector = KafkaConnector.create();
messaging = Messaging.builder()
.config(config)
.publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();content_copy
- Prepare Kafka connector, can be used by any channel
Don’t forget to check out the examples with pre-configured Kafka docker image, for easy testing: