Contents

Overview

Connecting streams to Kafka with Reactive Messaging is easy to do. There is a standard Kafka client behind the scenes, all the producer and consumer configs can be propagated through messaging config.

Maven Coordinates

To enable Reactive Kafka Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.messaging.kafka</groupId>
    <artifactId>helidon-messaging-kafka</artifactId>
</dependency>
Copied

Config

Example of connector config:
mp.messaging:

  incoming.from-kafka:
    connector: helidon-kafka
    topic: messaging-test-topic-1
    auto.offset.reset: latest 
    enable.auto.commit: true
    group.id: example-group-id

  outgoing.to-kafka:
    connector: helidon-kafka
    topic: messaging-test-topic-1

  connector:
    helidon-kafka:
      bootstrap.servers: localhost:9092 
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Copied
  • Kafka client consumer’s property auto.offset.reset configuration for from-kafka channel only
  • Kafka client’s property bootstrap.servers configuration for all channels using the connector

Besides the following configuration options, any property from consumer or producer configuration can be passed to the underlying Kafka client.

Configuration options

KeyKindTypeDescription
acksVALUEStringThe number of acknowledgments the producer requires the leader to have received before considering a request complete
auto-offset-resetVALUEi.h.m.c.k.K.AutoOffsetResetWhat to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g
batch-sizeVALUEIntegerThe producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition
bootstrap-serversVALUEStringA list of host/port pairs to use for establishing the initial connection to the Kafka cluster
buffer-memoryVALUELongThe total bytes of memory the producer can use to buffer records waiting to be sent to the server
compression-typeVALUEStringThe compression type for all data generated by the producer
dlq-topicLISTStringNames of the "dead letter queue" topics to be used in case message is nacked
enable-auto-commitVALUEBooleanIf true the consumer's offset will be periodically committed in the background
group-idVALUEStringA unique string that identifies the consumer group this consumer belongs to
key-deserializerVALUEClassDeserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface
key-serializerVALUEClassSerializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface
period-executionsVALUELongPeriod between successive executions of polling loop
poll-timeoutVALUELongThe maximum time to block polling loop in milliseconds
retriesVALUEIntegerSetting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error
topicLISTStringNames of the topics to consume from
topic-patternVALUEPatternPattern for topic names to consume from
value-deserializerVALUEClassDeserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface
value-serializerVALUEClassSerializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface

Consuming Messages

Example of consuming from Kafka:
@Incoming("from-kafka")
public void consumeKafka(String msg) {
    System.out.println("Kafka says: " + msg);
}
Copied

Producing Messages

Example of producing to Kafka:
@Outgoing("to-kafka")
public PublisherBuilder<String> produceToKafka() {
    return ReactiveStreams.of("test1", "test2");
}
Copied

NACK Strategy

StrategyDescription
Kill channelNacked message sends error signal and causes channel failure so Messaging Health check can report it as DOWN
DLQNacked messages are sent to specified dead-letter-queue
Log onlyNacked message is logged and channel continues normally

Kill channel

Default NACK strategy for Kafka connector. When

Dead Letter Queue

Sends nacked messages to error topic, DLQ is well known pattern for dealing with unprocessed messages.

Helidon can derive connection settings for DLQ topic automatically if the error topic is present on the same Kafka cluster. Serializers are derived from deserializers used for consumption org.apache.kafka.common.serialization.StringDeserializer > org.apache.kafka.common.serialization.StringSerializer. Note that the name of the error topic is needed only in this case.

Example of derived DLQ config:
mp.messaging:
  incoming:
    my-channel:
      nack-dlq: dql_topic_name
Copied

If a custom connection is needed, then use the 'nack-dlq' key for all of the producer configuration.

Example of custom DLQ config:
mp.messaging:
  incoming:
    my-channel:
      nack-dlq:
        topic: dql_topic_name
        bootstrap.servers: localhost:9092
        key.serializer: org.apache.kafka.common.serialization.StringSerializer
        value.serializer: org.apache.kafka.common.serialization.StringSerializer
Copied

Log only

Only logs nacked messages and throws them away, offset is committed and channel continues normally consuming subsequent messages.

Example of log only enabled nack strategy
mp.messaging:
  incoming:
    my-channel:
      nack-log-only: true
Copied

Examples

Don’t forget to check out the examples with pre-configured Kafka docker image, for easy testing: