Connection model
You can integrate your Ditto instance with external messaging services such as Eclipse Hono, a RabbitMQ broker or an Apache Kafka broker via custom “connections”.
Additionally, you may invoke foreign HTTP endpoints by using the HTTP connection type.
A connection represents a communication channel for the exchange of messages between any service and Ditto. It requires a transport protocol, which is used to transmit Ditto Protocol messages. Ditto supports one-way and two-way communication over connections. This enables consumer/producer scenarios as well as fully-fledged command and response use cases. Nevertheless, those options might be limited by the transport protocol or the other endpoint’s capabilities.
All connections are configured and supervised via Ditto’s Connectivity service. The following model defines the connection itself:
Connection types
The top design priority of this model is to be as generic as possible, while still allowing protocol specific customizations and tweaks. This enables the implementations of different customizable connection types, and support for custom payload formats. Currently, the following connection types are supported:
The sources
and targets
address formats depends on the connectionType
and has therefore connectionType
specific limitations. Those are documented with the corresponding protocol bindings.
Sources
Sources are used to connect to message brokers / external systems in order to consume messages from them.
Source messages can be of the following type:
Sources contain:
- several addresses (depending on the connection type those are interpreted differently, e.g. as queues, topics, etc.),
- a consumer count defining how many consumers should be attached to each source address,
- an authorization context (see authorization) specifying which authorization subject is used to authorize messages from the source,
- enforcement information that allows filtering the messages that are consumed in this source,
- acknowledgement requests this source requires in order to ensure QoS 1 (“at least once”) processing of consumed messages before technically acknowledging them to the channel,
- declared labels of acknowledgements the source is allowed to send,
- header mapping for mapping headers of source messages to internal headers, and
- a reply-target to configure publication of any responses of incoming commands.
Source enforcement
Messages received from external systems are mapped to Ditto internal format, either by applying some custom mapping or the default mapping for Ditto Protocol messages.
During this mapping the digital twin of the device is determined i.e. which thing is accessed or modified as a result of the message. By default, no sanity check is done if this target thing corresponds to the device that originally sent the message. In some use cases this might be valid, but in other scenarios you might want to enforce that a device only sends data to its digital twin. Note that this could also be achieved by assigning a specific policy to each device and use placeholders in the authorization subject, but this can get cumbersome to maintain for a large number of devices.
With an enforcement, you can use a single policy for all devices and still make sure that a device only modifies its associated digital twin. Enforcement is only feasible if the message contains the verified identity of the sending device (e.g. in a message header). This verification has to be done by the external system e.g. by properly authenticating the devices and providing the identity in the messages sent to Ditto.
The enforcement configuration consists of two fields:
input
: Defines where device identity is extracted.filters
: Defines the filters that are matched against the input. At least one filter must match the input value, otherwise the message is rejected.
The following placeholders are available for the input
field:
Placeholder | Description | Example |
---|---|---|
{{ header:<name> }} |
Any header from the message received via the source (case-insensitive). | {{header:device_id }} |
{{ source:address }} |
The address on which the message was received. | devices/sensors/temperature1 |
The following placeholders are available for the filters
field:
Placeholder | Description | Example |
---|---|---|
{{ thing:id }} |
Full ID composed of ‘‘namespace’’ + ‘’:’’ as a separator + ‘‘name’’ | eclipse.ditto:thing-42 |
{{ thing:namespace }} |
Namespace (i.e. first part of an ID) | eclipse.ditto |
{{ thing:name }} |
Name (i.e. second part of an ID ) | thing-42 |
Assuming a device sensor:temperature1
pushes its telemetry data to Ditto which is stored in a thing named
sensor:temperature1
. The device identity is provided in a header field device_id
. To enforce that the device can
only send data to the Thing sensor:temperature1
the following enforcement configuration can be used:
{
"addresses": [ "telemetry/hono_tenant" ],
"authorizationContext": ["ditto:inbound-auth-subject"],
"enforcement": {
"input": "{{ header:device_id }}",
"filters": [ "{{ thing:id }}" ]
}
}
Note: This example assumes that there is a valid user named ditto:inbound-auth-subject
in Ditto.
If you want to use a user for the basic auth (from the HTTP API) use
the prefix nginx:
, e.g. nginx:ditto
.
See Basic Authentication for more information.
Source acknowledgement requests
A source can configure, that for each incoming message additional acknowledgement requests are added.
That is desirable whenever incoming messages should be processed with a higher “quality of service” than the default, which is “at most once” (or QoS 0).
In order to process messages from sources with an “at least once” (or QoS 1) semantic, configure the source’s
"acknowledgementRequests/includes"
to add the
“twin-persisted” acknowledgement request, which will
cause that a consumed message over this source will technically be acknowledged, it the twin was
successfully updated/persisted by Ditto.
How the technical acknowledgment is done is specific for the used connection type and documented in scope of that connection type.
In addition to the "includes"
defining which acknowledgements to request for each incoming message, the optional
"filter"
holds an fn:filter() function defining when to request
acknowledgements at all for an incoming message. This filter is applied on both acknowledgements: those
requested in the message and the ones requested
via the configured "includes"
array.
The JSON for a source with acknowledgement requests could look like this. The "filter"
in the example causes that
acknowledgements are only requested if the “qos” header was either not present or does not equal 0
:
{
"addresses": [
"<source>"
],
"authorizationContext": ["ditto:inbound-auth-subject"],
"headerMapping": {
"qos": "{{ header:qos }}"
},
"acknowledgementRequests": {
"includes": [
"twin-persisted",
"receiver-connection-id:my-custom-ack"
],
"filter": "fn:filter(header:qos,'ne','0')"
}
}
Source declared acknowledgement labels
The acknowledgements sent via a source must have their labels declared in the field declardAcks
as a JSON array.
If the label of an acknowledgement is not in the declaredAcks
array, then the acknowledgement is rejected with
an error. The declared labels must be prefixed by the connection ID followed by a colon or the
{{connection:id}}
placeholder followed by a colon. For example:
{
"addresses": [
"<source>"
],
"authorizationContext": ["ditto:inbound-auth-subject"],
"declaredAcks": [
"{{connection:id}}:my-custom-ack"
]
}
Source header mapping
For incoming messages, an optional header mapping may be applied. Mapped headers are added to the headers of the Ditto protocol message obtained by payload mapping. The default Ditto payload mapper does not retain any external header; in this case all Ditto protocol headers come from the header mapping.
The JSON for a source with header mapping could look like this:
{
"addresses": [
"<source>"
],
"authorizationContext": ["ditto:inbound-auth-subject"],
"headerMapping": {
"correlation-id": "{{ header:message-id }}",
"content-type": "{{ header:content-type }}"
}
}
Source reply target
A source may define a reply target to publish the responses of incoming commands. For a reply target, the address and header mapping are defined in itself, whereas its payload mapping is inherited from the parent source, because a payload mapping definition specifies the transformation for both: incoming and outgoing messages.
For example, to publish responses at the target address equal to the reply-to
header of incoming commands,
define source header mapping and reply target as follows. If an incoming command does not have the reply-to
header,
then its response is dropped.
{
"headerMapping": {
"reply-to": "{{ header:reply-to }}"
},
"replyTarget": {
"enabled": true,
"address": "{{ header:reply-to }}"
}
}
The reply target may contain its own header mapping ("headerMapping"
) in order to map response headers.
In addition, the reply target contains the expected response types ("expectedResponseTypes"
) which should be
published to the reply target.
The following reply targets are available to choose from:
- response: Send back successful responses (e.g. responses after a Thing was successfully modified, but also responses for query commands). Includes positive acknowledgements.
- error: Send back error responses (e.g. thing not modifiable due to lacking permissions)
- nack: If negative acknowledgement responses should be delivered.
This is an example "replyTarget"
containing both header mapping and expected response types:
{
"replyTarget": {
"enabled": true,
"address": "{{ header:reply-to }}",
"headerMapping": {
"correlation-id": "{{ header:correlation-id }}"
},
"expectedResponseTypes": [
"response",
"error",
"nack"
]
}
}
Targets
Targets are used to connect to messages brokers / external systems in order to publish messages to them.
Target messages can be of the following type:
- Thing messages
- Thing events
- Thing live commands/responses/events
- Policy announcements
- Connection announcements
Targets contain:
- one address (that is interpreted differently depending on the connection type, e.g. as queue, topic, etc.),
- topics that will be sent to the target,
- an authorization context (see authorization) specifying which authorization subject is used to authorize messages to the target, and
- header mapping to compute external headers from Ditto protocol headers.
Target topics and filtering
Which types of messages should be published to the target address, can be defined via configuration.
In order to only consume specific events like described in change notifications, the
following parameters can additionally be provided when specifying the topics
of a target:
Description | Topic | Filter by namespaces | Filter by RQL expression |
---|---|---|---|
Subscribe for Thing events/change notifications | _/_/things/twin/events |
✔ | ✔ |
Subscribe for Thing messages | _/_/things/live/messages |
✔ | ✔ |
Subscribe for Thing live commands | _/_/things/live/commands |
✔ | ❌ |
Subscribe for Thing live events | _/_/things/live/events |
✔ | ✔ |
Subscribe for Policy announcements | _/_/policies/announcements |
✔ | ❌ |
Subscribe for Connection announcements | _/_/connections/announcements |
❌ | ❌ |
The parameters are specified similar to HTTP query parameters, the first one separated with a ?
and all following ones
with &
. You need to URL-encode the filter values before using them in a configuration.
For example, this way the connection session would register for all events in the namespace org.eclipse.ditto
and which
would match an attribute “counter” to be greater than 42. Additionally, it would subscribe to messages in the namespace
org.eclipse.ditto
:
{
"address": "<target-address>",
"topics": [
"_/_/things/twin/events?namespaces=org.eclipse.ditto&filter=gt(attributes/counter,42)",
"_/_/things/twin/events?extraFields=attributes/placement&filter=gt(attributes/placement,'Kitchen')",
"_/_/things/live/messages?namespaces=org.eclipse.ditto"
],
"authorizationContext": ["ditto:outbound-auth-subject", "..."]
}
Target topics and enrichment
When extra fields should be added to outgoing messages on a connection, an extraFields
parameter can be added
to the topic. This is supported for all topics:
Description | Topic | Enrich by extra fields |
---|---|---|
Subscribe for Thing events/change notifications | _/_/things/twin/events |
✔ |
Subscribe for Thing messages | _/_/things/live/messages |
✔ |
Subscribe for Thing live commands | _/_/things/live/commands |
✔ |
Subscribe for Thing live events | _/_/things/live/events |
✔ |
Subscribe for Policy announcements | _/_/policies/announcements |
❌ |
Subscribe for Connection announcements | _/_/connections/announcements |
❌ |
Example:
{
"address": "<target-address>",
"topics": [
"_/_/things/twin/events?extraFields=attributes/placement",
"_/_/things/live/messages?extraFields=features/ConnectionStatus"
],
"authorizationContext": ["ditto:outbound-auth-subject", "..."]
}
Target issued acknowledgement label
A target can be configured to automatically issue acknowledgements for each published/emitted message, once the underlying channel confirmed that the message was successfully received.
That is desirable whenever outgoing messages (e.g. events) are handled in scope of a command sent with an “at least once” (QoS 1) semantic in order to only acknowledge that command, if the event was successfully forwarded into another system.
For more details on that topic, please refer to the acknowledgements section.
Whether an outgoing message is treated as successfully sent or not is specific for the used connection type and documented in scope of that connection type.
The issued acknowledgement label must be prefixed by the connection ID followed by a colon or the
{{connection:id}}
placeholder followed by a colon.
The JSON for a target with issued acknowledgement labels could look like this:
{
"address": "<target>",
"topics": [
"_/_/things/twin/events"
],
"authorizationContext": ["ditto:inbound-auth-subject"],
"issuedAcknowledgementLabel": "{{connection:id}}:my-custom-ack"
}
Target header mapping
For outgoing messages, an optional header mapping may be applied. Mapped headers are added to the external headers. The default Ditto payload mapper does not define any external header; in this case, all external headers come from the header mapping.
The JSON for a target with header mapping could like this:
{
"address": "<target>",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages?namespaces=org.eclipse.ditto"
],
"authorizationContext": ["ditto:inbound-auth-subject"],
"headerMapping": {
"message-id": "{{ header:correlation-id }}",
"content-type": "{{ header:content-type }}",
"subject": "{{ topic:subject }}",
"reply-to": "all-replies"
}
}
Authorization
A connection is initiated by the connectivity service. This obviates the need for client authorization, because
Ditto becomes the client in this case. Nevertheless, to access resources within Ditto, the connection must know on
whose behalf it is acting. This is controlled via the configured authorizationContext
, which holds a list of
self-assigned authorization subjects. Before a connection can access a Ditto resource, one of its
authorizationSubject
s must be granted the access rights by the authorization mechanism of a
Policies.
A connection target can only send data for things to which it has READ rights, as data flows from a thing to a target. A connection source can only receive data for things to which it has WRITE rights, as data flows from a source to a thing.
Specific configuration
Some connection types require specific configuration, which is not supported for other connection types.
Those are put into the specificConfig
field.
Payload Mapping
For more information on mapping message payloads see the corresponding Payload Mapping Documentation.
Placeholders
The configuration of a connection allows to use placeholders at certain places. This allows more fine-grained control
over how messages are consumed or where they are published to. The general syntax of a placeholder is
{{ placeholder }}
. Have a look at the placeholders concept for
more details on that.
Placeholder for source authorization subjects
Processing the messages received via a source using the same fixed authorization subject may not be suitable for every scenario. For example, if you want to declare fine-grained write permissions per device, this would not be possible with a fixed global subject. For this use case, we have introduced placeholder substitution for authorization subjects of source addresses that are resolved when processing messages from a source. Of course, this requires the sender of the message to provide necessary information about the original issuer of the message.
You can access any header value of the incoming message by using a placeholder like {{ header:name }}
.
Example:
Assuming the messages received from the source telemetry contain a device_id
header (e.g. sensor-123),
you may configure your source’s authorization subject as follows:
{
"id": "auth-subject-placeholder-example",
"sources": [
{
"addresses": [ "telemetry" ],
"authorizationContext": ["device:{{ header:device_id }}"]
}
]
}
The placeholder is then replaced by the value from the message headers and the message is forwarded and processed under the subject device:sensor-123. In case the header cannot be resolved or the header contains unexpected characters, an exception is thrown, which is sent back to the sender as an error message, if a valid reply-to header was provided, otherwise the message is dropped.
Placeholder for target addresses
Another use case for placeholders may be to publish twin events or live commands and events to a target address
containing thing-specific information e.g. you can distribute things from different namespaces to different target addresses.
You can use the placeholders {{ thing:id }}
, {{ thing:namespace }}
and {{ thing:name }}
in the target address for this purpose.
For a thing with the ID org.eclipse.ditto:device-123 these placeholders would be resolved as follows:
Placeholder | Description | Resolved value |
---|---|---|
thing:id |
Full ID composed of namespace : (as a separator), and name |
org.eclipse.ditto:device-123 |
thing:namespace |
Namespace (i.e. first part of an ID) | org.eclipse.ditto |
thing:name |
Name (i.e. second part of an ID ) | device-123 |
Additionally to the placeholders mentioned above, all documented connection placeholders may be used in target addresses. However, if any placeholder in the target address fails to resolve, then the message will be dropped.
Example:
Sending live commands and events to a target address that contains the thing’s namespace.
{
"id": "target-placeholder-example",
"targets": [
{
"addresses": [ "live/{{ thing:namespace }}" ],
"authorizationContext": ["ditto:auth-subject"],
"topics": [ "_/_/things/live/events", "_/_/things/live/commands" ]
}
]
}
SSH tunneling
Ditto supports tunneling a connection by establishing an SSH tunnel and using it to connect to the actual endpoint.
See SSH tunneling on how to setup and configure SSH tunneling with Ditto.