The streaming subscription protocol lets you stream historical persisted events for Things and Policies using a reactive-streams-based flow.
subscribeForPersistedEvents command to start streaming historical events, use request to pull items, and cancel to stop early. Ditto responds with created, next, complete, or failed events.Overview
The history capabilities of the Ditto Protocol consist of 3 commands and 4 events that implement the reactive-streams protocol over any duplex transport layer. Ditto acts as the publisher of historical events, and you act as the subscriber. You control the delivery pace and can cancel before all results arrive.
Connections support
While connections do not inherently expose a duplex transport layer, the streaming subscription protocol works with them too. Send commands via any connection source, and receive events at the source’s reply-target.
How it works
Signal mapping
Each command or event corresponds to a reactive-streams signal. A subscription ID links all messages of one streaming request.
| Reactive-streams signal | Streaming protocol topic | Type | Direction |
|---|---|---|---|
| Publisher#subscribe | <ns>/<entity>/<group>/<channel>/streaming/subscribeForPersistedEvents |
Command | Client to Ditto |
| Subscription#request | <ns>/<entity>/<group>/<channel>/streaming/request |
Command | Client to Ditto |
| Subscription#cancel | <ns>/<entity>/<group>/<channel>/streaming/cancel |
Command | Client to Ditto |
| Subscriber#onSubscribe | <ns>/<entity>/<group>/<channel>/streaming/created |
Event | Ditto to Client |
| Subscriber#onNext | <ns>/<entity>/<group>/<channel>/streaming/next |
Event | Ditto to Client |
| Subscriber#onComplete | <ns>/<entity>/<group>/<channel>/streaming/complete |
Event | Ditto to Client |
| Subscriber#onError | <ns>/<entity>/<group>/<channel>/streaming/failed |
Event | Ditto to Client |
Interaction pattern
Client sends (in order):
subscribe request* cancel?
Ditto responds with:
created next* (complete | failed)?
- You send a
subscribeForPersistedEventscommand. Ditto responds with acreatedevent containing the subscription ID. - You send
requestcommands specifying how many items you want. Ditto sendsnextevents with results. - When all results are delivered, Ditto sends
complete. On error, Ditto sendsfailed. - You can send
cancelat any time to stop receiving results.
Ditto guarantees that no complete or failed event arrives before your first request command. This simplifies concurrency handling on multi-threaded clients.
After sending cancel, you may still receive buffered next, complete, or failed events.
Commands (client to Ditto)
Subscribe for persisted events
Start streaming historical events. Ditto always responds with a created event.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/subscribeForPersistedEvents |
| path | / |
| value | JSON object specifying start/stop options. |
Specify where to start and stop via the value field:
fromHistoricalRevision/toHistoricalRevision– revision-based rangefromHistoricalTimestamp/toHistoricalTimestamp– timestamp-based range
If you provide no options, Ditto streams the complete available history.
Request
After receiving a subscription ID from the created event, send request commands to tell Ditto how many items you want.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/request |
| path | / |
| value | JSON object specifying the demand. |
Cancel
Send a cancel command to stop receiving items. Items already in flight may still arrive.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/cancel |
| path | / |
| value | JSON object identifying the subscription. |
Events (Ditto to client)
Created
Ditto sends a created event in response to every subscribeForPersistedEvents command. It contains the subscription ID.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/created |
| path | / |
| value | JSON object with the subscription ID. |
Next
Each next event contains one historical event item. Ditto sends at most as many items as you requested.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/next |
| path | / |
| value | JSON object with one result item. |
Complete
Ditto sends complete when all items have been delivered.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/complete |
| path | / |
| value | JSON object identifying the subscription. |
Failed
Ditto sends failed on internal errors or reactive-streams specification violations. You cannot request more items after receiving failed.
| Field | Value |
|---|---|
| topic | <namespace>/<entityName>/<group>/<channel>/streaming/failed |
| path | / |
| value | JSON object with the failure reason. |
Further reading
- History – history concepts, SSE streaming, and configuration
- Things specification – Thing commands reference
- Search protocol – another reactive-streams protocol for searching