Consume messages from AMQP 0.9.1 brokers via sources and send messages to AMQP 0.9.1 brokers via targets.
Content-type
When messages are sent in Ditto Protocol (as UTF-8
encoded String payload),
the content-type
of AMQP 0.9.1 messages must be set to:
application/vnd.eclipse.ditto+json
If messages, which are not in Ditto Protocol, should be processed, a payload mapping must be configured for the AMQP 0.9.1 connection in order to transform the messages.
AMQP 0.9.1 properties
Supported AMQP 0.9.1 properties which are interpreted in a specific way are:
content-type
: for defining the Ditto Protocol content-typecorrelation-id
: for correlating request messages to responses
Specific connection configuration
The common configuration for connections in Connections > Sources and Connections > Targets applies here as well. Following are some specifics for AMQP 0.9.1 connections:
Source format
An AMQP 0.9.1 connection requires the protocol configuration source object to have an addresses
property with a list
of queue names, and authorizationContext
array that contains the authorization subjects, in whose context
incoming messages are processed. These subjects may contain placeholders, see
placeholders section for more information.
{
"addresses": [
"<queue_name>",
"..."
],
"authorizationContext": ["ditto:inbound-auth-subject", "..."]
}
Source acknowledgement handling
For AMQP 0.9.1 sources, when configuring acknowledgement requests, consumed messages from the AMQP 0.9.1 broker are treated in the following way:
For Ditto acknowledgements with successful status:
- Acknowledges a single AMQP 0.9.1 message with an
Ack
message for the receiveddeliveryTag
For Ditto acknowledgements with mixed successful/failed status:
- If some of the aggregated acknowledgements require redelivery (e.g. based on a timeout):
- Negatively acknowledges the AMQP 0.9.1 message with a
Nack
message for the receiveddeliveryTag
and settingrequeue
totrue
- Negatively acknowledges the AMQP 0.9.1 message with a
- If none of the aggregated acknowledgements require redelivery:
- Negatively acknowledges the AMQP 0.9.1 message with a
Nack
message for the receiveddeliveryTag
and settingrequeue
tofalse
preventing redelivery by the AMQP 0.9.1 broker
- Negatively acknowledges the AMQP 0.9.1 message with a
Target format
An AMQP 0.9.1 connection requires the protocol configuration target object to have an address
property with a combined
value of the exchange_name
and routing_key
. The target address may contain placeholders; see
placeholders section for more information.
Further, "topics"
is a list of strings, each list entry representing a subscription of
Ditto protocol topics.
Outbound messages are published to the configured target address if one of the subjects in "authorizationContext"
has READ permission on the thing, which is associated with a message.
{
"address": "<exchange_name>/<routing_key>",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject"]
}
Target acknowledgement handling
For AMQP 0.9.1 targets, when configuring automatically issued acknowledgement labels, requested acknowledgements are produced in the following way:
Once the AMQP 0.9.1 client signals that the message was acknowledged by the AMQP 0.9.1 broker, the following information is mapped to the automatically created acknowledgement:
- Acknowledgement.status:
- will be
200
, if the message was successfully ACKed by the AMQP 0.9.1 broker - will be
400
, if the AMQP 0.9.1 broker does not support publisher confirms - will be
503
, if the AMQP 0.9.1 broker negatively confirmed receiving a message
- will be
- Acknowledgement.value:
- will be missing, for status
200
- will contain more information, in case that an error
status
was set
- will be missing, for status
Specific configuration properties
There are no specific configuration properties available for this type of connection.
Establishing connecting to an AMQP 0.9.1 endpoint
Ditto’s Connectivity service is responsible for creating new and managing existing connections.
This can be done dynamically at runtime without the need to restart any microservice using a Ditto DevOps command.
Example connection configuration to create a new AMQP 0.9.1 connection (e.g. in order to connect to a RabbitMQ):
{
"connection": {
"id": "rabbit-example-connection-123",
"connectionType": "amqp-091",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "amqp://user:password@localhost:5672/vhost",
"sources": [
{
"addresses": [
"queueName"
],
"authorizationContext": ["ditto:inbound-auth-subject", "..."]
}
],
"targets": [
{
"address": "exchangeName/routingKey",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject", "..."]
}
]
}
}