Edit this page

You use the Kafka 2.x binding to consume messages from and publish messages to Apache Kafka brokers.

TL;DR: Configure a Kafka connection with connectionType: "kafka". You must set bootstrapServers in specificConfig. Source addresses are Kafka topics, and target addresses support topic, topic/key, and topic#partition formats.

Overview

The Kafka 2.x protocol binding lets you consume messages from Kafka via sources and publish messages via targets.

When you send messages in Ditto Protocol format (UTF-8 encoded strings), set the content-type to:

application/vnd.eclipse.ditto+json

For other formats, configure a payload mapping.

Global Kafka client configuration

You can configure the Kafka client behavior in connectivity.conf under ditto.connectivity.connection.kafka:

  • consumer – consumer settings for sources
  • committer – commit batch size and interval
  • producer – producer settings for targets

Connection URI format

tcp://user:password@hostname:9092

Source configuration

The common source configuration applies. Source addresses are Kafka topics. Legal characters: [a-z], [A-Z], [0-9], ., _, -.

Quality of Service

The qos field controls message delivery semantics:

  • qos: 0 (at-most-once) – offsets are committed after consumption, regardless of processing success
  • qos: 1 (at-least-once) – offsets are committed only after requested acknowledgements succeed
{
  "addresses": ["theTopic"],
  "consumerCount": 1,
  "qos": 1,
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "enforcement": {
    "input": "{{ header:device_id }}",
    "filters": ["{{ entity:id }}"]
  },
  "headerMapping": {},
  "payloadMapping": ["Ditto"],
  "replyTarget": {
    "enabled": true,
    "address": "theReplyTopic",
    "headerMapping": {},
    "expectedResponseTypes": ["response", "error", "nack"]
  },
  "acknowledgementRequests": {
    "includes": []
  },
  "declaredAcks": []
}

When qos: 1 is set, twin modify commands automatically request the twin-persisted acknowledgement. If processing fails, consumption restarts from the last committed offset (at-least-once semantics).

Source header mapping

Ditto extracts these headers from each consumed Kafka record:

Header Description
kafka.topic Kafka topic the record was received from
kafka.key Record key (if available)
kafka.timestamp Record timestamp

These headers can be used in a source header mapping:

{
  "headerMapping": {
    "the-topic": "{{ header:kafka.topic }}",
    "the-key": "{{ header:kafka.key }}"
  }
}

All other Kafka record headers are also available for header mapping.

Message expiry

Devices can set message expiry using two headers:

  • creation-time – epoch milliseconds when the message was created
  • ttl – milliseconds the message remains valid

Ditto drops expired messages (where elapsed time since creation-time exceeds ttl).

Backpressure via acknowledgements

For Kafka sources, you can use acknowledgements for backpressure. Built-in acknowledgements like live-response and twin-persisted are requested by default for their respective message types.

To disable backpressure:

{
  "acknowledgementRequests": {
    "includes": [],
    "filter": "fn:delete()"
  }
}

Target configuration

The common target configuration applies. Target address supports these formats:

Format Description
topic Round-robin partition assignment
topic/key Key-based partitioning (same key = same partition)
topic#partition Specific partition number
{
  "address": "myTopic/myKey",
  "topics": [
    "_/_/things/twin/events",
    "_/_/things/live/messages"
  ],
  "authorizationContext": ["ditto:outbound-auth-subject"]
}

The target address supports placeholders.

Target acknowledgement handling

When you configure issued acknowledgement labels:

Status Condition
204 Message consumed successfully (debug mode disabled)
200 Message consumed successfully (debug mode enabled, includes RecordMetadata)
4xx Kafka failed to consume (no retry)
5xx Kafka failed to consume (retry feasible)

When debug mode is enabled ("debugEnabled": "true"), the 200 response includes the Kafka RecordMetadata as a JSON object with these fields:

  • timestamp (if present)
  • serializedKeySize
  • serializedValueSize
  • topic
  • partition
  • offset (if present)

Specific configuration options

Property Description Default
bootstrapServers (required) Comma-separated list of Kafka bootstrap servers
saslMechanism (required with auth) SASL mechanism: plain, scram-sha-256, or scram-sha-512
debugEnabled Include Kafka RecordMetadata in acknowledgement payloads false
groupId Consumer group ID connection ID

Example connection JSON

{
  "id": "kafka-example-connection-123",
  "connectionType": "kafka",
  "connectionStatus": "open",
  "failoverEnabled": true,
  "uri": "tcp://user:password@localhost:9092",
  "specificConfig": {
    "bootstrapServers": "localhost:9092,other.host:9092",
    "saslMechanism": "plain"
  },
  "sources": [{
    "addresses": ["theTopic"],
    "consumerCount": 1,
    "qos": 1,
    "authorizationContext": ["ditto:inbound-auth-subject"],
    "enforcement": {
      "input": "{{ header:device_id }}",
      "filters": ["{{ entity:id }}"]
    },
    "headerMapping": {},
    "payloadMapping": ["Ditto"],
    "replyTarget": {
      "enabled": true,
      "address": "theReplyTopic",
      "headerMapping": {},
      "expectedResponseTypes": ["response", "error", "nack"]
    },
    "acknowledgementRequests": {
      "includes": []
    },
    "declaredAcks": []
  }],
  "targets": [{
    "address": "topic/key",
    "topics": [
      "_/_/things/twin/events",
      "_/_/things/live/messages"
    ],
    "authorizationContext": ["ditto:outbound-auth-subject"]
  }]
}

Further reading