The payload mapping feature in Ditto’s connectivity APIs can be used to transform arbitrary payload consumed via the different supported protocols to Ditto Protocol messages and vice versa.
Motivation
Eclipse Ditto is about providing access to IoT devices via the digital twin pattern. In order to provide structured APIs for different heterogeneous devices Ditto defines a lightweight JSON based model.
A Thing might look like in the following example:
{
"thingId": "the.namespace:the-thing-id",
"policyId": "the.namespace:the-policy-id",
"attributes": {
"location": "kitchen"
},
"features": {
"transmission": {
"properties": {
"cur_speed": 90
}
}
}
}
Devices in the IoT, may they be brownfield devices or newly produced devices, will probably not send their data to the cloud in the structure and protocol Ditto requires.
They should not need to be aware of something like Ditto running in the cloud mirroring them as digital twins.
So for example device payload could look like this:
{
"val": "23.42 °C",
"ts": 1523946112727
}
In case of constrained devices or IoT protocols, even binary payload might be common.
0x08BD (hex representation)
Builtin mappers
The following message mappers are included in the Ditto codebase:
Mapper Alias | Description | Inbound | Outbound |
---|---|---|---|
Ditto | Assumes that inbound/outbound messages are already in Ditto Protocol (JSON) format. | ✓ | ✓ |
JavaScript | Converts arbitrary messages from and to the Ditto Protocol format using custom JavaScript code executed by Ditto. | ✓ | ✓ |
Normalized | Transforms the payload of events to a normalized view. | ✓ | |
ConnectionStatus | This mapper handles messages containing creation-time and ttd headers by updating a feature of the targeted thing with definition ConnectionStatus. |
✓ | |
RawMessage | For outgoing message commands and responses, this mapper extracts the payload for publishing directly into the channel. For incoming messages, this mapper wraps them in a configured message command or response envelope. | ✓ | ✓ |
ImplicitThingCreation | This mapper handles messages for which a Thing should be created automatically based on a defined template. | ✓ | |
UpdateTwinWithLiveResponse | This mapper creates a merge Thing command when an indiviudal retrieve command for an single Thing was received via the live channel patching exactly the retrieved “live” data into the twin. | ✓ | |
CloudEvents Mapper | The mapper maps incoming CloudEvent to Ditto Protocol. Supports both Binary and Structured CloudEvent. | ✓ | ✓ |
Ditto mapper
This is the default Ditto Protocol mapper. If you do not specify any payload mapping this mapper is used to map inbound and outbound messages. The mapper requires no mandatory options, so its alias can be directly used as a mapper reference.
It assumes that received messages are in Ditto Protocol JSON and emits outgoing messages also in that format.
JavaScript mapper
This mapper may be used whenever any inbound messages are not yet in Ditto Protocol. By using the built in JavaScript mapping engine (based on Rhino) custom defined JavaScript scripts can be executed which are responsible for creating Ditto Protocol JSON message from arbitrary consumed payload.
The same is possible for outbound messages in order to transform Ditto Protocol JSON messages (e.g. events or responses) to arbitrary other formats.
Configuration options
incomingScript
(required): the mapping script for incoming messagesoutgoingScript
(required): the mapping script for outgoing messagesloadBytebufferJS
(optional, default:"false"
): whether to load ByteBufferJS libraryloadLongJS
(optional, default:"false"
): whether to load LongJS library
Normalized mapper
This mapper transforms created
and modified
events (other type of messages are dropped) to a normalized view.
Events are mapped to a nested sparse JSON.
{
"topic": "thing/id/things/twin/events/modified",
"headers": { "content-type": "application/json" },
"path": "/features/sensors/properties/temperature/indoor/value",
"value": 42
}
would result in the following normalized JSON representation:
{
"thingId": "thing:id",
"features": {
"sensors": {
"properties": {
"temperature": {
"indoor": {
"value": 42
}
}
}
}
},
"_context": {
"topic": "thing/id/things/twin/events/modified",
"path": "/features/sensors/properties/temperature/indoor/value",
"value": 42,
"headers": {
"content-type": "application/json"
}
}
}
The _context
field contains the original message content excluding the value
.
Configuration options
fields
(optional, default: all fields): comma separated list of fields that are contained in the result (see also chapter about field selectors)
ConnectionStatus mapper
This mapper transforms the information from the ttd
and creation-time
message headers
(see Eclipse Hono device notifications) into a
ModifyFeature command that complies with the Vorto functionblock org.eclipse.ditto:ConnectionStatus
.
The connectivity state of the device is then represented in a Feature.
It is mostly used in conjunction with another mapper that transforms the payload e.g.:
"payloadMapping": [ "Ditto" , "connectionStatus" ]
Example of a resulting ConnectionStatus
feature:
{
"thingId": "eclipse:ditto",
"features": {
"ConnectionStatus": {
"definition": [ "org.eclipse.ditto:ConnectionStatus:1.0.0" ],
"properties": {
"status": {
"readySince": "2019-10-29T14:16:18Z",
"readyUntil": "2019-10-29T14:21:18Z"
}
}
}
}
}
Configuration options
thingId
(required): The ID of the Thing that is updated with the connectivity state. It can either be a fixed value or a header placeholder (e.g.{{ header:device_id }}
).featureId
(optional, default:ConnectionStatus
): The ID of the Feature that is updated. It can either be a fixed value or resolved from a message header (e.g.{{ header:feature_id }}
).
RawMessage mapper
This mapper relates the payload in the "value"
field of message commands and message responses to the payload
of AMQP, MQTT and Kafka messages and the body of HTTP requests. The encoding of the payload is chosen according to
the configured content type. The subject, direction, thing ID and feature ID of the envelope for incoming message
commands and responses need to be configured.
Messages with the Ditto protocol content type application/vnd.eclipse.ditto+json
or signals that are not message
commands or responses are mapped by the Ditto mapper instead.
For example, the mapper maps between the feature message command response
{
"topic": "org.eclipse.ditto/smartcoffee/things/live/messages/heatUp",
"headers": { "content-type": "application/octet-stream" },
"path": "/features/water-tank/inbox/messages/heatUp",
"value": "AQIDBAUG",
"status": 200
}
and an AMQP, MQTT 5, Kafka message with payload or an HTTP request with body of 6 bytes
0x01 02 03 04 05 06
and headers
content-type: application/octet-stream
status: 200
ditto-message-subject: heatUp
ditto-message-direction: TO
ditto-message-thing-id: org.eclipse.ditto:smartcoffee
ditto-message-feature-id: water-tank
The headers are lost for connection protocols without application headers such as MQTT 3.
Configuration options
Example configuration:
{
"outgoingContentType": "application/octet-stream",
"incomingMessageHeaders": {
"content-type": "{{ header:content-type | fn:default('application/octet-stream') }}",
"status": "{{ header:status }}",
"ditto-message-subject": "{{ header:ditto-message-subject | fn:default('fallback-subject') }}",
"ditto-message-direction": "TO",
"ditto-message-thing-id": "{{ header:ditto-message-thing-id | fn:default('ns:fallback-thing') }}",
"ditto-message-feature-id": "{{ header:ditto-message-feature-id }}"
}
}
outgoingContentType
(optional): The fallback content-type for outgoing message commands and responses without the content-type header. Default totext/plain; charset=UTF-8
.-
incomingMessageHeaders
(optional): A JSON object containing the following headers needed to construct a message command or response envelope containing the incoming message as payload in the field"value"
. The following placeholders may be used in the headers:Placeholder Description {{ header:<header-name> }}
header value from the external message, e.g. from protocol headers {{ request:subjectId }}
the first authenticated subjectId which did the request - the one of the connection source in this case {{ time:now }}
the current timestamp in ISO-8601 format as string in UTC timezone {{ time:now_epoch_millis }}
the current timestamp in “milliseconds since epoch” formatted as string content-type
(optional): The content type with which to encode the incoming message as payload. Default to{{ header:content-type | fn:default('application/octet-stream') }}
. If resolved to the Ditto protocol content typeapplication/vnd.eclipse.ditto+json
, then the entire payload is interpreted as a Ditto protocol message instead.status
(optional): Include for message responses. Exclude for message commands. Default to{{ header:status }}
.ditto-message-subject
(mandatory for MQTT 3): Subject of the message. Default to{{ header:ditto-message-subject }}
. Mapping will fail if not resolvable.ditto-message-direction
(optional): The message direction. Default toTO
, which corresponds toinbox
in message commands and responses.ditto-message-thing-id
(mandatory for MQTT 3): ID of the thing to send the message command or response to. Default to{{ header:ditto-message-thing-id }}
. Mapping will fail if not resolvable.ditto-message-feature-id
(optional): Include to send the message or message response to a feature of the thing. Exclude to send it to the thing itself. Default to{{ header:ditto-message-feature-id }}
.
ImplicitThingCreation mapper
This mapper implicitly creates a new thing for an incoming message.
The created thing contains the values defined in the template, configured in the mappingDefinitions
options
.
Configuration options
thing
(required): The values of the thing that is created implicitly. It can either contain fixed values or header placeholders (e.g.{{ header:device_id }}
).-
the following placeholders may be used inside the
"thing"
JSON:Placeholder Description {{ header:<header-name> }}
header value from the external message, e.g. from protocol headers {{ request:subjectId }}
the first authenticated subjectId which did the request - the one of the connection source in this case {{ time:now }}
the current timestamp in ISO-8601 format as string in UTC timezone {{ time:now_epoch_millis }}
the current timestamp in “milliseconds since epoch” formatted as string -
The
"thing"
JSON may also include:- an inline policy:
"_policy"
containing the Policy JSON to create a new policy from and link with the thing - a “copy policy from” statement:
"_copyPolicyFrom"
- see also create Thing alternatives- either including a policyId to copy from
- or containing the link to a thing to copy the policy from in the form:
{{ ref:things/<theThingId>/policyId }}
- an inline policy:
-
commandHeaders
(optional, default:{"If-None-Match": "*"}
): The Ditto headers to use for constructing the “create thing” command for creating the twin and to use for creating errors.-
in this configured headers, the following placeholders may be used:
Placeholder Description {{ header:<header-name> }}
header value from the external message, e.g. from protocol headers {{ request:subjectId }}
the first authenticated subjectId which did the request - the one of the connection source in this case {{ time:now }}
the current timestamp in ISO-8601 format as string in UTC timezone {{ time:now_epoch_millis }}
the current timestamp in “milliseconds since epoch” formatted as string
-
allowPolicyLockout
(optional, default:true
): whether it should be allowed to create policies without havingWRITE
permissions in the created policy for the subject which creates the policy (the authorizationContext of the connection source which received the message for which a thing should be created implicitly)
Example of a template defined in options
:
{
"thing": {
"thingId": "{{ header:device_id }}",
"attributes": {
"CreatedBy": "ImplicitThingCreation"
}
},
"commandHeaders": {
"If-None-Match": "*",
"correlation-id": "{{ header:correlation-id }}"
},
"allowPolicyLockout": true
}
UpdateTwinWithLiveResponse mapper
This mapper creates a merge Thing command when a retrieve command was received via the live channel patching exactly the retrieved “live” data into the twin.
Configuration options
dittoHeadersForMerge
(optional): The Ditto headers to use for constructing the “merge thing” command for updating the twin, may for example add a condition to apply in order to update the twin (default applied Ditto headers if not configured:"response-required": false
,"if-match": "*"
).-
in this configured headers, the following placeholders may be used:
Placeholder Description {{ header:<header-name> }}
header value from the external message, e.g. from protocol headers {{ request:subjectId }}
the first authenticated subjectId which did the request - the one of the connection source in this case {{ time:now }}
the current timestamp in ISO-8601 format as string in UTC timezone {{ time:now_epoch_millis }}
the current timestamp in “milliseconds since epoch” formatted as string
-
Example configuration:
{
"dittoHeadersForMerge": {
"if-match": "*",
"response-required": false,
"put-metadata": [
{"key":"*/updated-by","value":"{{ request:subjectId }}"},
{"key":"*/updated-via","value":"device-live-response"},
{"key":"*/update-hint","value":"{{ header:some-custom-hint }}"},
{"key":"*/updated-at","value":"{{ time:now }}"}
]
}
}
CloudEvents Mapper
This mapper maps incoming CloudEvent to Ditto Protocol. It provides support for both Binary CloudEvents as well as Structured CloudEvents.
Note: The mapper supports incoming Structured CloudEvents messages with content-type:application/cloudevents+json
and Binary CloudEvents message with content-type:application/vnd.eclipse.ditto+json
CloudEvents examples
Incoming messages need to have the mandatory CloudEvents fields. For example, a Binary CloudEvent for Ditto would look like this:
headers:
ce-specversion:1.0
ce-id:some-id
ce-type:some-type
ce-source:generic-producer
content-type:application/vnd.eclipse.ditto+json
{
"topic": "my.sensors/sensor01/things/twin/commands/modify",
"path": "/",
"value": {
"thingId": "my.sensors:sensor01",
"policyId": "my.test:policy",
"attributes": {
"manufacturer": "Well known sensors producer",
"serial number": "100",
"location": "Ground floor"
},
"features": {
"measurements": {
"properties": {
"temperature": 100,
"humidity": 0
}
}
}
}
}
A Structured CloudEvent for Ditto would look like this:
headers:
content-type:application/cloudevents+json
{
"specversion": "1.0",
"id": "3212e",
"source": "http:somesite.com",
"type": "com.site.com",
"data": {
"topic": "my.sensors/sensor01/things/twin/commands/modify",
"path": "/",
"value": {
"thingId": "my.sensors:sensor01",
"policyId": "my.test:policy",
"attributes": {
"manufacturer": "Well known sensors producer",
"serial number": "100",
"location": "Ground floor"
},
"features": {
"measurements": {
"properties": {
"temperature": 100,
"humidity": 0
}
}
}
}
}
}
Example connection with multiple mappers
The following example connection defines a ConnectionStatus
mapping with the ID status
and references it in a source.
Messages received via this source will be mapped by the Ditto
mapping and the ConnectionStatus
mapping.
The Ditto
mapping requires no options to be configured, so you can directly use its alias Ditto
.
{
"name": "exampleConnection",
"sources": [{
"addresses": ["<source>"],
"authorizationContext": ["ditto:inbound"],
"payloadMapping": ["Ditto", "status"]
}
],
"mappingDefinitions": {
"status": {
"mappingEngine": "ConnectionStatus",
"options": {
"thingId": "{{ header:device_id }}"
}
}
}
}
Example connection with mapping conditions
The following example connection defines incomingConditions
and outgoingConditions
for the ConnectionStatus
mapping engine.
Optional incomingConditions are validated before the mapping of inbound messages.
Optional outgoingConditions are validated before the mapping of outbound messages.
Conditional Mapping can be achieved by using function expressions.
When multiple incoming or outgoing conditions are set for one mappingEngine
,
all have to equal true for the mapping to be executed.
{
"name": "exampleConnection",
"sources": [{
"addresses": ["<source>"],
"authorizationContext": ["ditto:inbound"],
"payloadMapping": ["status"]
}
],
"mappingDefinitions": {
"status": {
"mappingEngine": "ConnectionStatus",
"incomingConditions": {
"sampleCondition": "fn:filter(header:incoming-mapping-required,'eq','true')"
},
"outgoingConditions": {
"sampleCondition": "fn:filter(header:outgoing-mapping-required,'eq','true')"
},
"options": {
"thingId": "{{ header:device_id }}"
}
}
}
}
JavaScript mapping engine
Ditto utilizes the Rhino JavaScript engine for Java for evaluating the JavaScript to apply for mapping payloads.
Using Rhino instead of Nashorn, the newer JavaScript engine shipped with Java, has the benefit that sandboxing can be applied in a better way.
Sandboxing of different payload scripts is required as Ditto is intended to be run as cloud service where multiple connections to different endpoints are managed for different tenants at the same time. This requires the isolation of each single script to avoid interference with other scripts and to protect the JVM executing the script against harmful code execution.
Constraints
Rhino does not fully support EcmaScript 6. Check which language constructs are supported before using them in a mapping function. See https://mozilla.github.io/rhino/compat/engines.html.
Ditto currently includes Rhino version 1.7.14
and has the VERSION_ES6
flag enabled.
Sandboxing
For sandboxing/security reasons following restrictions apply:
- access to Java packages and classes is not possible
- using
exit
,quit
,print
, etc. is not possible - file access is not possible
- doing remote calls (e.g. to foreign web-servers) is not possible
- programming an endless-loop will terminate the script
- programming a recursion will terminate the script
- the file size of the script is limited
- no foreign JS libraries can be loaded (unless they fit in the file size limit and are included into the mapping script)
Helper libraries
In order to work more conveniently with binary payloads, the following libraries may be loaded for payload transformations:
- bytebuffer.js a ByteBuffer implementation using ArrayBuffers
- long.js for representing a 64-bit two’s-complement integer value
Adding additional JS libraries
The used Rhino JS engine allows making use of “CommonJS” in order to load JS
modules via require('')
into the engine.
This feature is exposed to Ditto, configuring the configuration key commonJsModulePath
or environment variable
CONNECTIVITY_MESSAGE_MAPPING_JS_COMMON_JS_MODULE_PATH
of the connectivity service to a path in the
connectivity Docker container where to load additional CommonJS modules from - e.g. use a volume mount in order to get
additional JS modules into the container.
For example, configure this variable to a folder to which you add (our mount) JavaScript libraries:
CONNECTIVITY_MESSAGE_MAPPING_JS_COMMON_JS_MODULE_PATH=/opt/commonjs-modules/
Then, for example, put pbf.js
(or any other JS library you want to use)
into that folder.
Afterwards, the library can be used in your JS snippet using:
var Pbf = require('pbf');
Helper functions
Ditto comes with a few helper functions, which makes writing the mapping scripts easier. They are available under the
Ditto
scope:
/**
* Builds a Ditto Protocol message from the passed parameters.
* @param {string} namespace - The namespace of the entity in java package notation, e.g.: "org.eclipse.ditto". Or "_"
* (underscore) for connection announcements.
* @param {string} name - The name of the entity, e.g.: "device".
* @param {string} channel - The channel for the signal: "twin"|"live"|"none"
* @param {string} group - The affected group/entity: "things"|"policies"|"connections".
* @param {string} criterion - The criterion to apply: "commands"|"events"|"search"|"messages"|"announcements"|"errors".
* @param {string} action - The action to perform: "create"|"retrieve"|"modify"|"delete". Or the announcement name:
* "opened"|"closed"|"subjectDeletion". Or the subject of the message.
* @param {string} path - The path which is affected by the message (e.g.: "/attributes"), or the destination
* of a message (e.g.: "inbox"|"outbox").
* @param {Object.<string, string>} dittoHeaders - The headers Object containing all Ditto Protocol header values.
* @param {*} [value] - The value to apply / which was applied (e.g. in a "modify" action).
* @param {number} [status] - The status code that indicates the result of the command. If setting a status code,
* the Ditto Protocol Message will be interpreted as a response (e.g. content will be ignored when using 204).
* @param {Object} [extra] - The enriched extra fields when selected via "extraFields" option.
* @returns {DittoProtocolMessage} dittoProtocolMessage(s) -
* The mapped Ditto Protocol message or
* <code>null</code> if the message could/should not be mapped
*/
function buildDittoProtocolMsg(namespace, name, group, channel, criterion, action, path, dittoHeaders, value, status, extra) {
const topic = buildTopic(namespace, name, group, channel, criterion, action);
return {
topic: topic,
path: path,
headers: dittoHeaders,
value: value,
status: status,
extra: extra,
};
}
/**
* Builds a Ditto Protocol topic from the passed parameters.
* @param {string} namespace - The namespace of the entity in java package notation, e.g.: "org.eclipse.ditto". Or "_"
* (underscore) for connection announcements.
* @param {string} name - The name of the entity, e.g.: "device".
* @param {string} channel - The channel for the signal: "twin"|"live"|"none"
* @param {string} group - The affected group/entity: "things"|"policies"|"connections".
* @param {string} criterion - The criterion to apply: "commands"|"events"|"search"|"messages"|"announcements"|"errors".
* @param {string} action - The action to perform: "create"|"retrieve"|"modify"|"delete". Or the announcement name:
* "opened"|"closed"|"subjectDeletion". Or the subject of the message.
* @returns {string} topic - the topic.
*/
function buildTopic(namespace, name, group, channel, criterion, action) {
const topicChannel = 'none' === channel ? '' : '/' + channel;
return namespace + "/" + name + "/" + group + topicChannel + "/" + criterion + "/" + action;
}
/**
* Builds an external message from the passed parameters.
* @param {Object.<string, string>} headers - The external headers Object containing header values
* @param {string} [textPayload] - The external mapped String
* @param {ArrayBuffer} [bytePayload] - The external mapped bytes as ArrayBuffer
* @param {string} [contentType] - The returned Content-Type
* @returns {ExternalMessage} externalMessage -
* the mapped external message
* or <code>null</code> if the message could/should not be mapped
*/
function buildExternalMsg(headers, textPayload, bytePayload, contentType) {
return {
headers: headers,
textPayload: textPayload,
bytePayload: bytePayload,
contentType: contentType,
};
}
/**
* Transforms the passed ArrayBuffer to a String interpreting the content of the passed arrayBuffer as unsigned 8
* bit integers.
*
* @param {ArrayBuffer} arrayBuffer the ArrayBuffer to transform to a String
* @returns {String} the transformed String
*/
function arrayBufferToString(arrayBuffer) {
return String.fromCharCode.apply(null, new Uint8Array(arrayBuffer));
}
/**
* Transforms the passed String to an ArrayBuffer using unsigned 8 bit integers.
*
* @param {String} string the String to transform to an ArrayBuffer
* @returns {ArrayBuffer} the transformed ArrayBuffer
*/
function stringToArrayBuffer(string) {
let buf = new ArrayBuffer(string.length);
let bufView = new Uint8Array(buf);
for (let i=0, strLen=string.length; i<strLen; i++) {
bufView[i] = string.charCodeAt(i);
}
return buf;
}
/**
* Transforms the passed ArrayBuffer to a {ByteBuffer} (from bytebuffer.js library which needs to be loaded).
*
* @param {ArrayBuffer} arrayBuffer the ArrayBuffer to transform
* @returns {ByteBuffer} the transformed ByteBuffer
*/
function asByteBuffer(arrayBuffer) {
let byteBuffer = new ArrayBuffer(arrayBuffer.byteLength);
new Uint8Array(byteBuffer).set(new Uint8Array(arrayBuffer));
return dcodeIO.ByteBuffer.wrap(byteBuffer);
}
Mapping incoming messages
Incoming external messages can be mapped to Ditto Protocol conform messages by implementing the following JavaScript function:
/**
* Maps the passed parameters to a Ditto Protocol message.
* @param {Object.<string, string>} headers - The headers Object containing all received header values
* @param {string} [textPayload] - The String to be mapped
* @param {ArrayBuffer} [bytePayload] - The bytes to be mapped as ArrayBuffer
* @param {string} [contentType] - The received Content-Type, e.g. "application/json"
* @returns {(DittoProtocolMessage|Array<DittoProtocolMessage>)} dittoProtocolMessage(s) -
* the mapped Ditto Protocol message,
* an array of Ditto Protocol messages or
* <code>null</code> if the message could/should not be mapped
*/
function mapToDittoProtocolMsg(
headers,
textPayload,
bytePayload,
contentType
) {
// ### Insert/adapt your mapping logic here.
// Use helper function Ditto.buildDittoProtocolMsg to build Ditto protocol message
// based on incoming payload.
// See https://websites.eclipseprojects.io/ditto/connectivity-mapping.html#helper-functions for details.
// ### example code assuming the Ditto protocol content type for incoming messages.
if (contentType === 'application/vnd.eclipse.ditto+json') {
// Message is sent as Ditto protocol text payload and can be used directly
return JSON.parse(textPayload);
} else if (contentType === 'application/octet-stream') {
// Message is sent as binary payload; assume Ditto protocol message (JSON).
try {
return JSON.parse(Ditto.arrayBufferToString(bytePayload));
} catch (e) {
// parsing failed (no JSON document); return null to drop the message
return null;
}
} else if (contentType === 'application/json') {
let parsedJson = JSON.parse(textPayload);
// the following variables would be determined from the "parsedJson" and from the "headers":
let namespace = "";
let name = "";
let group = "things";
let channel = "twin";
let criterion = "commands";
let action = "modify";
let path = "/attributes";
let dittoHeaders = {};
let value = {
"a": 1
};
return Ditto.buildDittoProtocolMsg(
namespace,
name,
group,
channel,
criterion,
action,
path,
dittoHeaders,
value)
}
// no mapping logic matched; return null to drop the message
return null;
}
The result of the function has to be a JavaScript object in Ditto Protocol or an array of
such JavaScript objects. That’s where the helper method Ditto.buildDittoProtocolMsg
is useful:
it explicitly defines which parameters are required for the Ditto Protocol message.
There is another JavaScript function which is helpful when access to the complete external message is needed.
It is possible to define the mapToDittoProtocolMsgWrapper
in the incoming payload mapping and access the original
externalMsg
.
This is the default implementation of mapToDittoProtocolMsgWrapper
, delegating to mapToDittoProtocolMsg
:
/**
* Maps the passed external message to a Ditto Protocol message.
* @param {ExternalMessage} externalMsg - The external message to map to a Ditto Protocol message
* @returns {(DittoProtocolMessage|Array<DittoProtocolMessage>)} dittoProtocolMessage(s) -
* The mapped Ditto Protocol message,
* an array of Ditto Protocol messages or
* <code>null</code> if the message could/should not be mapped
*/
function mapToDittoProtocolMsgWrapper(externalMsg) {
let headers = externalMsg.headers;
let textPayload = externalMsg.textPayload;
let bytePayload = externalMsg.bytePayload;
let contentType = externalMsg.contentType;
return mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType);
}
Mapping outgoing messages
Outgoing Ditto Protocol messages (e.g. responses or events) can be mapped to external messages by implementing the following JavaScript function:
/**
* Maps the passed parameters which originated from a Ditto Protocol message to an external message.
* @param {string} namespace - The namespace of the entity in java package notation, e.g.: "org.eclipse.ditto". Or "_"
* (underscore) for connection announcements.
* @param {string} name - The name of the entity, e.g.: "device".
* @param {string} group - The affected group/entity: "things"|"policies"|"connections".
* @param {string} channel - The channel for the signal: "twin"|"live"|"none"
* @param {string} criterion - The criterion to apply: "commands"|"events"|"search"|"messages"|"announcements"|
* "errors".
* @param {string} action - The action to perform: "create"|"retrieve"|"modify"|"delete". Or the announcement name:
* "opened"|"closed"|"subjectDeletion". Or the subject of the message.
* @param {string} path - The path which is affected by the message (e.g.: "/attributes"), or the destination
* of a message (e.g.: "inbox"|"outbox").
* @param {Object.<string, string>} dittoHeaders - The headers Object containing all Ditto Protocol header values.
* @param {*} [value] - The value to apply / which was applied (e.g. in a "modify" action).
* @param {number} [status] - The status code that indicates the result of the command. When this field is set,
* it indicates that the Ditto Protocol Message contains a response.
* @param {Object} [extra] - The enriched extra fields when selected via "extraFields" option.
* @returns {(ExternalMessage|Array<ExternalMessage>)} externalMessage - The mapped external message, an array of
* external messages or <code>null</code> if the message could/should not be mapped.
*/
function mapFromDittoProtocolMsg(
namespace,
name,
group,
channel,
criterion,
action,
path,
dittoHeaders,
value,
status,
extra
) {
// ###
// Insert your mapping logic here
// ### example code using the Ditto protocol content type.
let headers = dittoHeaders;
let textPayload = JSON.stringify(Ditto.buildDittoProtocolMsg(namespace, name, group, channel, criterion, action,
path, dittoHeaders, value, status, extra));
let bytePayload = null;
let contentType = 'application/vnd.eclipse.ditto+json';
return Ditto.buildExternalMsg(
headers, // The external headers Object containing header values
textPayload, // The external mapped String
bytePayload, // The external mapped byte[]
contentType // The returned Content-Type
);
}
The result of the function has to be a JavaScript object or, an array of JavaScript objects with the fields headers
,
textPayload
, bytePayload
and contentType
. That’s where the helper method Ditto.buildExternalMsg
is useful:
it explicitly defines which parameters are required for the external message.
There is another JavaScript function which is helpful when access to the complete Ditto protocol message is needed.
It is possible to define the mapFromDittoProtocolMsgWrapper
in the outgoing payload mapping and access the
original dittoProtocolMsg
.
Please refer to the Ditto Protocol specification
to inspect which JSON fields are available when.
This is the default implementation of mapFromDittoProtocolMsgWrapper
, delegating to mapFromDittoProtocolMsg
:
/**
* Maps the passed Ditto Protocol message to an external message.
* @param {DittoProtocolMessage} dittoProtocolMsg - The Ditto Protocol message to map
* @returns {(ExternalMessage|Array<ExternalMessage>)} externalMessage -
* The mapped external message,
* an array of external messages or
* <code>null</code> if the message could/should not be mapped
*/
function mapFromDittoProtocolMsgWrapper(dittoProtocolMsg) {
let topic = dittoProtocolMsg.topic;
let splitTopic = topic.split("/");
let namespace = splitTopic[0];
let name = splitTopic[1];
let group = splitTopic[2];
let channel;
let criterion;
let action;
if (hasChannel(group)) {
channel = splitTopic[3];
criterion = splitTopic[4];
action = splitTopic[5];
} else {
channel = 'none';
criterion = splitTopic[3];
action = splitTopic[4];
}
let path = dittoProtocolMsg.path;
let dittoHeaders = dittoProtocolMsg.headers;
let value = dittoProtocolMsg.value;
let status = dittoProtocolMsg.status;
let extra = dittoProtocolMsg.extra;
return mapFromDittoProtocolMsg(namespace, name, group, channel, criterion, action, path, dittoHeaders, value, status, extra);
}
JavaScript payload types
Both, text payloads and byte payloads may be mapped.
Text payloads
Working with text payloads is as easy as it gets in JavaScript. For example, for the content-type application/json
structured data may be processed like this:
let value;
if (contentType === 'application/json') {
let parsedJson = JSON.parse(textPayload);
value = parsedJson.number1 + parsedJson['sub-field']; // remember to access JSON keys with dashes in a JS special way
} else {
// a script may decide to not map other content-types than application/json
return null;
}
// proceed ...
Byte payloads
Working with byte payloads is also possible but does require a little bit of knowledge about JavaScript’s ArrayBuffer TypedArrays and DataView.
What you get in the mapping scripts is a bytePayload
of type ArrayBuffer
which lets you work on the bytes
in different ways:
Typed Arrays
A TypedArray [is] a view into an ArrayBuffer where every item has the same size and type.
source
With TypedArrays you can simply wrap the bytePayload
ArrayBuffer
and work on all the items e.g.
as unsigned 8-bit integers:
let bytes = new Uint8Array(bytePayload);
bytes[0]; // access the first byte
bytes[1]; // access the second byte
DataViews
The DataView [is] another view into an ArrayBuffer, but one which allows items of different size and type in the ArrayBuffer.
source
let view = new DataView(bytePayload);
view.getInt8(0); // access a 8-bit signed integer (byte) on offset=0
view.getUint16(1); // access a 16-bit unsigned integer (usigned short) on offset=1
DataViews also allow to set
bytes to an underlying ArrayBuffer conveniently.
ByteBuffer.js
Alternatively, Ditto’s JavaScript transformation may be loaded with the above mentioned libraries,
e.g. “bytebuffer.js”.
With ByteBuffer
, the content of an ArrayBuffer
can be accessed in a buffered way:
let byteBuf = Ditto.asByteBuffer(bytePayload);
let numberFromBytes = parseInt(byteBuf.toHex(), 16);
let base64encoded = byteBuf.toBase64();
let buf = dcodeIO.ByteBuffer.fromBase64(base64encoded);
buf.readInt(); // read a 32bit signed integer + advances the offset in the buffer
buf.readUTF8String(4); // read 4 characters of UTF-8 encoded string + advances the offset in the buffer
buf.remaining(); // gets the number of remaining readable bytes in the buffer
Check the ByteBuffer API documentation to find out what is possible with that helper.
JavaScript Examples
Text payload example
Let’s assume your device sends telemetry data via Eclipse Hono’s MQTT adapter into the cloud. And, that an example payload of your device is:
{
"temp": "23.42 °C",
"hum": 78,
"pres": {
"value": 760,
"unit": "mmHg"
}
}
We want to map a single message of this device containing updates for all 3 values to a Thing in the following structure:
{
"thingId": "the.namespace:the-thing-id",
"policyId": "the.namespace:the-policy-id",
"features": {
"temperature": {
"properties": {
"value": 23.42
}
},
"pressure": {
"properties": {
"value": 760
}
},
"humidity": {
"properties": {
"value": 78
}
}
}
}
Therefore, we define following incoming
mapping function:
function mapToDittoProtocolMsg(
headers,
textPayload,
bytePayload,
contentType
) {
if (contentType !== 'application/json') {
return null; // only handle messages with content-type application/json
}
let jsonData = JSON.parse(textPayload);
let value = {
temperature: {
properties: {
value: jsonData.temp.split(" ")[0] // omit the unit
}
},
pressure: {
properties: {
value: jsonData.pres.value
}
},
humidity: {
properties: {
value: jsonData.hum
}
}
};
return Ditto.buildDittoProtocolMsg(
'org.eclipse.ditto', // in this example always the same
headers['device_id'], // Eclipse Hono sets the authenticated device_id as AMQP 1.0 header
'things', // we deal with a Thing
'twin', // we want to update the twin
'commands', // we want to create a command to update a twin
'modify', // modify the twin
'/features', // modify all features at once
headers, // pass through the headers from AMQP 1.0
value
);
}
When your device now sends its payload via the MQTT adapter of Eclipse Hono:
mosquitto_pub -u 'sensor1@DEFAULT_TENANT' -P hono-secret -t telemetry -m '{"temp": "23.42 °C","hum": 78,"pres": {"value": 760,"unit": "mmHg"}}'
Your digital twin is updated by applying the specified script and extracting the relevant values from the passed textPayload
.
Bytes payload example
For this example, let’s assume your device sends telemetry data via Eclipse Hono’s HTTP adapter into the cloud. An example payload of your device - displayed as hexadecimal - is:
0x09EF03F72A
Let us now also assume that
- the first 2 bytes
09 EF
represent- the temperature as 16bit signed integer (thus, may also be negative)
- this is not a float in oder to save space (as float needs at least 32 bit)
- the second 2 bytes
03 F7
represent the pressure as 16bit signed integer - the last byte
2A
represents the humidity as 8bit unsigned integer of our device.
We want to map a single message of this device containing updates for all 3 values to a Thing in the following structure:
{
"thingId": "the.namespace:the-thing-id",
"policyId": "the.namespace:the-policy-id",
"features": {
"temperature": {
"properties": {
"value": 25.43
}
},
"pressure": {
"properties": {
"value": 1015
}
},
"humidity": {
"properties": {
"value": 42
}
}
}
}
Therefore, we define following incoming
mapping function:
function mapToDittoProtocolMsg(
headers,
textPayload,
bytePayload,
contentType
) {
if (contentType !== 'application/octet-stream') {
return null; // only handle messages with content-type application/octet-stream
}
let view = new DataView(bytePayload);
let value = {
temperature: {
properties: {
// interpret the first 2 bytes (16 bit) as signed int and divide through 100.0:
value: view.getInt16(0) / 100.0
}
},
pressure: {
properties: {
// interpret the next 2 bytes (16 bit) as signed int:
value: view.getInt16(2)
}
},
humidity: {
properties: {
// interpret the next 1 bytes (8 bit) as unsigned int:
value: view.getUint8(4)
}
}
};
return Ditto.buildDittoProtocolMsg(
'org.eclipse.ditto', // in this example always the same
headers['device_id'], // Eclipse Hono sets the authenticated device_id as AMQP 1.0 header
'things', // we deal with a Thing
'twin', // we want to update the twin
'commands', // we want to create a command to update a twin
'modify', // modify the twin
'/features', // modify all features at once
headers, // pass through the headers from AMQP 1.0
value
);
}
When your device now sends its payload via the HTTP adapter of Eclipse Hono:
echo -e $((0x09EF03F72A)) | curl -i -X POST -u sensor1@DEFAULT_TENANT:hono-secret -H 'Content-Type: application/octet-stream' --data-binary @- http://127.0.0.1:8080/telemetry
Your digital twin is updated by applying the specified script and extracting the relevant values from the passed bytePayload
.
Custom Java based implementation
Beside the JavaScript based mapping - which can be configured/changed at runtime without the need of restarting the connectivity service - there is also the possibility to implement a custom Java based mapper.
The interface to be implemented is
MessageMapper
)
and there is an abstract class AbstractMessageMapper
which eases implementation of a custom mapper.
Simply extend from AbstractMessageMapper
to provide a custom mapper:
public final class FooMapper extends AbstractMessageMapper {
private static final String MAPPER_ALIAS = "Foo";
public FooMapper(ActorSystem actorSystem, Config config) {
super(actorSystem, config);
}
private FooMapper(AbstractMessageMapper copyFromMapper) {
super(copyFromMapper);
}
@Override
public String getAlias() {
return MAPPER_ALIAS;
}
@Override
public boolean isConfigurationMandatory() {
return false;
}
@Override
public MessageMapper createNewMapperInstance() {
return new FooMapper(this);
}
@Override
public List<Adaptable> map(ExternalMessage externalMessage) {
// TODO implement mapping inbound messages consumed via "sources" to DittoProtocol adaptables
return null;
}
@Override
public DittoHeaders getAdditionalInboundHeaders(ExternalMessage externalMessage) {
return DittoHeaders.empty();
}
@Override
public List<ExternalMessage> map(Adaptable adaptable) {
// TODO implement mapping DittoProtocol adaptables to outbound messages published via "targets"
return null;
}
@Override
private void doConfigure(Connection connection, MappingConfig mappingConfig,
MessageMapperConfiguration configuration) {
// extract configuration if needed
}
}
After instantiation of the custom MessageMapper
, the doConfigure
method is called with all the options which were
provided to the mapper in the configured connection.
Use them in order to pass in configurations, thresholds, etc.
Then, simply implement both of the map
methods:
List<Adaptable> map(ExternalMessage message)
maps from an incoming external message to- an empty list of
Adaptable
s if the incoming message should be dropped - a list of one or many Ditto Protocol
Adaptable
s
- an empty list of
List<ExternalMessage> map(Adaptable adaptable)
maps from an outgoing Ditto ProtocolAdaptable
to- an empty list of
ExternalMessage
s if the outgoing message should be dropped - a list of one or many external messages
- an empty list of
In order to use this custom Java based mapper implementation, the following steps are required:
- the alias has to be defined via the implemented
getAlias()
method - it must be unique and should start with an uppercase letter - if the custom mapper requires mandatory options then implement
isConfigurationMandatory()
to returntrue
- the mapper class needs to be on the classpath of the connectivity
microservice in order to be loaded.
Follow the instructions of how to extend Ditto to achieve that. - the mapper needs to be registered via configuration in the connectivity service, extend the configuration or add the mapper via system properties configuration
- when creating a new connection you have to specify the alias of your mapper as the
mappingEngine
in the connection’smappingDefinitions
and reference the ID of your mapper in a source or a target
isConfigurationMandatory() = true
), you can
directly reference the alias in a source or a target without first defining it inside mappingDefinitions
.Example for Custom Java based mapper
Please have a look at the following Ditto example project:
This shows how to implement, add and configure a custom, Protobuf based, Java payload mapper for Ditto to use in the connectivity service for mapping a custom domain specific Protbuf encoded payload.