You use the Kafka 2.x binding to consume messages from and publish messages to Apache Kafka brokers.
connectionType: "kafka". You must set bootstrapServers in specificConfig. Source addresses are Kafka topics, and target addresses support topic, topic/key, and topic#partition formats.Overview
The Kafka 2.x protocol binding lets you consume messages from Kafka via sources and publish messages via targets.
When you send messages in Ditto Protocol format (UTF-8 encoded strings),
set the content-type to:
application/vnd.eclipse.ditto+json
For other formats, configure a payload mapping.
Global Kafka client configuration
You can configure the Kafka client behavior in
connectivity.conf
under ditto.connectivity.connection.kafka:
consumer– consumer settings for sourcescommitter– commit batch size and intervalproducer– producer settings for targets
Connection URI format
tcp://user:password@hostname:9092
Source configuration
The common source configuration applies. Source addresses are
Kafka topics. Legal characters: [a-z], [A-Z], [0-9], ., _, -.
Quality of Service
The qos field controls message delivery semantics:
qos: 0(at-most-once) – offsets are committed after consumption, regardless of processing successqos: 1(at-least-once) – offsets are committed only after requested acknowledgements succeed
{
"addresses": ["theTopic"],
"consumerCount": 1,
"qos": 1,
"authorizationContext": ["ditto:inbound-auth-subject"],
"enforcement": {
"input": "{{ header:device_id }}",
"filters": ["{{ entity:id }}"]
},
"headerMapping": {},
"payloadMapping": ["Ditto"],
"replyTarget": {
"enabled": true,
"address": "theReplyTopic",
"headerMapping": {},
"expectedResponseTypes": ["response", "error", "nack"]
},
"acknowledgementRequests": {
"includes": []
},
"declaredAcks": []
}
When qos: 1 is set, twin modify commands automatically request the twin-persisted
acknowledgement. If processing fails, consumption restarts from the last committed offset
(at-least-once semantics).
Source header mapping
Ditto extracts these headers from each consumed Kafka record:
| Header | Description |
|---|---|
kafka.topic |
Kafka topic the record was received from |
kafka.key |
Record key (if available) |
kafka.timestamp |
Record timestamp |
These headers can be used in a source header mapping:
{
"headerMapping": {
"the-topic": "{{ header:kafka.topic }}",
"the-key": "{{ header:kafka.key }}"
}
}
All other Kafka record headers are also available for header mapping.
Message expiry
Devices can set message expiry using two headers:
creation-time– epoch milliseconds when the message was createdttl– milliseconds the message remains valid
Ditto drops expired messages (where elapsed time since creation-time exceeds ttl).
Backpressure via acknowledgements
For Kafka sources, you can use acknowledgements for backpressure.
Built-in acknowledgements like live-response and twin-persisted are requested by default
for their respective message types.
To disable backpressure:
{
"acknowledgementRequests": {
"includes": [],
"filter": "fn:delete()"
}
}
Target configuration
The common target configuration applies. Target address supports
these formats:
| Format | Description |
|---|---|
topic |
Round-robin partition assignment |
topic/key |
Key-based partitioning (same key = same partition) |
topic#partition |
Specific partition number |
{
"address": "myTopic/myKey",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject"]
}
The target address supports placeholders.
Target acknowledgement handling
When you configure issued acknowledgement labels:
| Status | Condition |
|---|---|
204 |
Message consumed successfully (debug mode disabled) |
200 |
Message consumed successfully (debug mode enabled, includes RecordMetadata) |
4xx |
Kafka failed to consume (no retry) |
5xx |
Kafka failed to consume (retry feasible) |
When debug mode is enabled ("debugEnabled": "true"), the 200 response includes the Kafka
RecordMetadata as a JSON object with these fields:
timestamp(if present)serializedKeySizeserializedValueSizetopicpartitionoffset(if present)
Specific configuration options
| Property | Description | Default |
|---|---|---|
bootstrapServers (required) |
Comma-separated list of Kafka bootstrap servers | – |
saslMechanism (required with auth) |
SASL mechanism: plain, scram-sha-256, or scram-sha-512 |
– |
debugEnabled |
Include Kafka RecordMetadata in acknowledgement payloads |
false |
groupId |
Consumer group ID | connection ID |
Example connection JSON
{
"id": "kafka-example-connection-123",
"connectionType": "kafka",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://user:password@localhost:9092",
"specificConfig": {
"bootstrapServers": "localhost:9092,other.host:9092",
"saslMechanism": "plain"
},
"sources": [{
"addresses": ["theTopic"],
"consumerCount": 1,
"qos": 1,
"authorizationContext": ["ditto:inbound-auth-subject"],
"enforcement": {
"input": "{{ header:device_id }}",
"filters": ["{{ entity:id }}"]
},
"headerMapping": {},
"payloadMapping": ["Ditto"],
"replyTarget": {
"enabled": true,
"address": "theReplyTopic",
"headerMapping": {},
"expectedResponseTypes": ["response", "error", "nack"]
},
"acknowledgementRequests": {
"includes": []
},
"declaredAcks": []
}],
"targets": [{
"address": "topic/key",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject"]
}]
}
Further reading
- Connections overview – connection model and configuration
- Payload mapping – transform message payloads
- Header mapping – map external headers
- TLS certificates – secure connections with TLS