Send messages to Apache Kafka via targets.

Content-type

When Kafka messages are sent in Ditto Protocol, the payload should be UTF-8 encoded strings.

If messages, which are not in Ditto Protocol, should be processed, a payload mapping must be configured for the connection in order to transform the messages.

Specific connection configuration

The common configuration for connections in Connections > Targets applies here as well. Following are some specifics for Apache Kafka 2.x connections:

Source format

Target format

A Kafka 2.x connection requires the protocol configuration target object to have an address property. This property may have different formats:

  • topic: Contains a Kafka topic - a partition will be assigned in a round-robin fashion.
  • topic/key: Contains a Kafka topic and a key - Kafka ensures that messages with the same key end up in the same partition.
  • topic#partitionNumber: Contains a Kafka topic and a specific partition number - that partition will be used when sending records.

The target address may contain placeholders; see placeholders section for more information.

Further, "topics" is a list of strings, each list entry representing a subscription of Ditto protocol topics, see target topics and filtering for more information on that.

Outbound messages are published to the configured target address if one of the subjects in "authorizationContext" has READ permission on the thing, which is associated with a message.

{
  "address": "<kafka_topic>/<kafka_key>",
  "topics": [
    "_/_/things/twin/events",
    "_/_/things/live/messages"
  ],
  "authorizationContext": ["ditto:outbound-auth-subject"]
}

Target acknowledgement handling

For Kafka targets, when configuring automatically issued acknowledgement labels, requested acknowledgements are produced in the following way:

Once the Kafka client signals that the message was acknowledged by the Kafka broker, the following information is mapped to the automatically created acknowledement:

  • Acknowledgement.status:
    • will be 204, if Kafka debug mode was disabled and the message was successfully consumed by Kafka
    • will be 200, if Kafka debug mode was enabled (see specific config "debugEnabled") and the message was successfully consumed by Kafka
    • will be 4xx, if Kafka failed to consume the message but retrying sending the message does not make sense
    • will be 5xx, if Kafka failed to consume the message but retrying sending the message is feasible
  • Acknowledgement.value:
    • will be missing, if Kafka debug mode (see specific config "debugEnabled") was disabled
    • will include the Kafka RecordMetadata as JsonObject:
      • timestamp (if present)
      • serializedKeySize
      • serializedValueSize
      • topic
      • partition
      • offset (if present)

Specific configuration properties

The specific configuration properties contain the following keys:

  • bootstrapServers (required): contains a comma separated list of Kafka bootstrap servers to use for connecting to (in addition to the still required connection uri)
  • saslMechanism (required if connection uri contains username\/password): contains one of the following SASL mechanisms to use for authentication at Kafka:
    • plain
    • scram-sha-256
    • scram-sha-512
  • debugEnabled: determines whether for acknowledgements automatically issued by Kafka targets additional debug information should be included as payload or not - default: false

Establishing connecting to an Apache Kafka endpoint

Ditto’s Connectivity service is responsible for creating new and managing existing connections.

This can be done dynamically at runtime without the need to restart any microservice using a Ditto DevOps command.

Example connection configuration to create a new Kafka 2.x connection in order to connect to a running Apache Kafka server:

{
  "connection": {
    "id": "kafka-example-connection-123",
    "connectionType": "kafka",
    "connectionStatus": "open",
    "failoverEnabled": true,
    "uri": "tcp://user:password@localhost:9092",
    "specificConfig": {
      "bootstrapServers": "localhost:9092,other.host:9092",
      "saslMechanism": "plain"
    },
    "sources": [],
    "targets": [
      {
        "address": "topic/key",
        "topics": [
          "_/_/things/twin/events",
          "_/_/things/live/messages"
        ],
        "authorizationContext": ["ditto:outbound-auth-subject", "..."]
      }
    ]
  }
}