Contents
Overview
Connecting streams to Kafka with Reactive Messaging is easy to do. There is a standard Kafka client behind the scenes, all the producer and consumer configs can be propagated through messaging config.
Maven Coordinates
To enable Reactive Kafka Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.kafka</groupId>
<artifactId>helidon-messaging-kafka</artifactId>
</dependency>Config
mp.messaging:
incoming.from-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
auto.offset.reset: latest
enable.auto.commit: true
group.id: example-group-id
outgoing.to-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
connector:
helidon-kafka:
bootstrap.servers: localhost:9092
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer- Kafka client consumer’s property auto.offset.reset configuration for
from-kafkachannel only - Kafka client’s property bootstrap.servers configuration for all channels using the connector
Besides the following configuration options, any property from consumer or producer configuration can be passed to the underlying Kafka client.
Configuration options
| Key | Kind | Type | Description |
|---|---|---|---|
acks | VALUE | String | The number of acknowledgments the producer requires the leader to have received before considering a request complete |
auto-offset-reset | VALUE | i.h.m.c.k.K.AutoOffsetReset | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g |
batch-size | VALUE | Integer | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition |
bootstrap-servers | VALUE | String | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster |
buffer-memory | VALUE | Long | The total bytes of memory the producer can use to buffer records waiting to be sent to the server |
compression-type | VALUE | String | The compression type for all data generated by the producer |
dlq-topic | LIST | String | Names of the "dead letter queue" topics to be used in case message is nacked |
enable-auto-commit | VALUE | Boolean | If true the consumer's offset will be periodically committed in the background |
group-id | VALUE | String | A unique string that identifies the consumer group this consumer belongs to |
key-deserializer | VALUE | Class | Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface |
key-serializer | VALUE | Class | Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface |
period-executions | VALUE | Long | Period between successive executions of polling loop |
poll-timeout | VALUE | Long | The maximum time to block polling loop in milliseconds |
retries | VALUE | Integer | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error |
topic | LIST | String | Names of the topics to consume from |
topic-pattern | VALUE | Pattern | Pattern for topic names to consume from |
value-deserializer | VALUE | Class | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface |
value-serializer | VALUE | Class | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface |
Consuming Messages
@Incoming("from-kafka")
public void consumeKafka(String msg) {
System.out.println("Kafka says: " + msg);
}Producing Messages
@Outgoing("to-kafka")
public PublisherBuilder<String> produceToKafka() {
return ReactiveStreams.of("test1", "test2");
}NACK Strategy
| Strategy | Description |
| Kill channel | Nacked message sends error signal and causes channel failure so Messaging Health check can report it as DOWN |
| DLQ | Nacked messages are sent to specified dead-letter-queue |
| Log only | Nacked message is logged and channel continues normally |
Kill channel
Default NACK strategy for Kafka connector. When
Dead Letter Queue
Sends nacked messages to error topic, DLQ is well known pattern for dealing with unprocessed messages.
Helidon can derive connection settings for DLQ topic automatically if the error topic is present on the same Kafka cluster. Serializers are derived from deserializers used for consumption org.apache.kafka.common.serialization.StringDeserializer > org.apache.kafka.common.serialization.StringSerializer. Note that the name of the error topic is needed only in this case.
mp.messaging:
incoming:
my-channel:
nack-dlq: dql_topic_nameIf a custom connection is needed, then use the 'nack-dlq' key for all of the producer configuration.
mp.messaging:
incoming:
my-channel:
nack-dlq:
topic: dql_topic_name
bootstrap.servers: localhost:9092
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializerLog only
Only logs nacked messages and throws them away, offset is committed and channel continues normally consuming subsequent messages.
mp.messaging:
incoming:
my-channel:
nack-log-only: trueExamples
Don’t forget to check out the examples with pre-configured Kafka docker image, for easy testing: