Edit this page

The streaming subscription protocol lets you stream historical persisted events for Things and Policies using a reactive-streams-based flow.

TL;DR: Send a subscribeForPersistedEvents command to start streaming historical events, use request to pull items, and cancel to stop early. Ditto responds with created, next, complete, or failed events.

Overview

The history capabilities of the Ditto Protocol consist of 3 commands and 4 events that implement the reactive-streams protocol over any duplex transport layer. Ditto acts as the publisher of historical events, and you act as the subscriber. You control the delivery pace and can cancel before all results arrive.

Connections support

While connections do not inherently expose a duplex transport layer, the streaming subscription protocol works with them too. Send commands via any connection source, and receive events at the source’s reply-target.

How it works

Signal mapping

Each command or event corresponds to a reactive-streams signal. A subscription ID links all messages of one streaming request.

Reactive-streams signal Streaming protocol topic Type Direction
Publisher#subscribe <ns>/<entity>/<group>/<channel>/streaming/subscribeForPersistedEvents Command Client to Ditto
Subscription#request <ns>/<entity>/<group>/<channel>/streaming/request Command Client to Ditto
Subscription#cancel <ns>/<entity>/<group>/<channel>/streaming/cancel Command Client to Ditto
Subscriber#onSubscribe <ns>/<entity>/<group>/<channel>/streaming/created Event Ditto to Client
Subscriber#onNext <ns>/<entity>/<group>/<channel>/streaming/next Event Ditto to Client
Subscriber#onComplete <ns>/<entity>/<group>/<channel>/streaming/complete Event Ditto to Client
Subscriber#onError <ns>/<entity>/<group>/<channel>/streaming/failed Event Ditto to Client

Interaction pattern

Client sends (in order):

subscribe request* cancel?

Ditto responds with:

created next* (complete | failed)?
  1. You send a subscribeForPersistedEvents command. Ditto responds with a created event containing the subscription ID.
  2. You send request commands specifying how many items you want. Ditto sends next events with results.
  3. When all results are delivered, Ditto sends complete. On error, Ditto sends failed.
  4. You can send cancel at any time to stop receiving results.

Ditto guarantees that no complete or failed event arrives before your first request command. This simplifies concurrency handling on multi-threaded clients.

After sending cancel, you may still receive buffered next, complete, or failed events.

Commands (client to Ditto)

Subscribe for persisted events

Start streaming historical events. Ditto always responds with a created event.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/subscribeForPersistedEvents
path /
value JSON object specifying start/stop options.
   

Specify where to start and stop via the value field:

  • fromHistoricalRevision / toHistoricalRevision – revision-based range
  • fromHistoricalTimestamp / toHistoricalTimestamp – timestamp-based range

If you provide no options, Ditto streams the complete available history.

Request

After receiving a subscription ID from the created event, send request commands to tell Ditto how many items you want.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/request
path /
value JSON object specifying the demand.
   

Cancel

Send a cancel command to stop receiving items. Items already in flight may still arrive.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/cancel
path /
value JSON object identifying the subscription.
   

Events (Ditto to client)

Created

Ditto sends a created event in response to every subscribeForPersistedEvents command. It contains the subscription ID.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/created
path /
value JSON object with the subscription ID.
   

Next

Each next event contains one historical event item. Ditto sends at most as many items as you requested.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/next
path /
value JSON object with one result item.
   

Complete

Ditto sends complete when all items have been delivered.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/complete
path /
value JSON object identifying the subscription.
   

Failed

Ditto sends failed on internal errors or reactive-streams specification violations. You cannot request more items after receiving failed.

Field Value
topic <namespace>/<entityName>/<group>/<channel>/streaming/failed
path /
value JSON object with the failure reason.
   

Further reading