Maven Coordinates

To enable Kafka Connector add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.messaging.kafka</groupId>
    <artifactId>helidon-messaging-kafka</artifactId>
</dependency>
Copied

Reactive Kafka Connector

Connecting streams to Kafka with Reactive Messaging couldn’t be easier.

Explicit config with config builder

Example of consuming from Kafka:
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();

Channel<String> fromKafka = Channel.<String>builder()
        .name("from-kafka")
        .publisherConfig(KafkaConnector.configBuilder()
                .bootstrapServers(kafkaServer)
                .groupId("example-group-" + session.getId())
                .topic(topic)
                .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
                .enableAutoCommit(true)
                .keyDeserializer(StringDeserializer.class)
                .valueDeserializer(StringDeserializer.class)
                .build()
        )
        .build();

KafkaConnector kafkaConnector = KafkaConnector.create();

Messaging messaging = Messaging.builder()
        .connector(kafkaConnector)
        .listener(fromKafka, payload -> {
            System.out.println("Kafka says: " + payload);
        })
        .build()
        .start();
Copied
  • Prepare a channel for connecting kafka connector with specific publisher configuration → listener
  • Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
  • Prepare Kafka connector, can be used by any channel
Example of producing to Kafka:
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();

Channel<String> toKafka = Channel.<String>builder()
        .subscriberConfig(KafkaConnector.configBuilder()
                .bootstrapServers(kafkaServer)
                .topic(topic)
                .keySerializer(StringSerializer.class)
                .valueSerializer(StringSerializer.class)
                .build()
        ).build();

KafkaConnector kafkaConnector = KafkaConnector.create();

messaging = Messaging.builder()
        .publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
        .connector(kafkaConnector)
        .build()
        .start();
Copied
  • Prepare a channel for connecting kafka connector with specific publisher configuration → listener
  • Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
  • Prepare Kafka connector, can be used by any channel

Implicit Helidon Config

Example of connector config:
mp.messaging:

  incoming.from-kafka:
    connector: helidon-kafka
    topic: messaging-test-topic-1
    auto.offset.reset: latest
    enable.auto.commit: true
    group.id: example-group-id

  outgoing.to-kafka:
    connector: helidon-kafka
    topic: messaging-test-topic-1

  connector:
    helidon-kafka:
      bootstrap.servers: localhost:9092
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Copied
Example of consuming from Kafka:
Config config = Config.create();

Channel<String> fromKafka = Channel.create("from-kafka");

KafkaConnector kafkaConnector = KafkaConnector.create();

Messaging messaging = Messaging.builder()
        .config(config)
        .connector(kafkaConnector)
        .listener(fromKafka, payload -> {
            System.out.println("Kafka says: " + payload);
        })
        .build()
        .start();
Copied
  • Prepare Kafka connector, can be used by any channel
Example of producing to Kafka:
Config config = Config.create();

Channel<String> toKafka = Channel.create("to-kafka");

KafkaConnector kafkaConnector = KafkaConnector.create();

messaging = Messaging.builder()
        .config(config)
        .publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
        .connector(kafkaConnector)
        .build()
        .start();
Copied
  • Prepare Kafka connector, can be used by any channel

Don’t forget to check out the examples with pre-configured Kafka docker image, for easy testing: