A client SDK for Java in order to interact with digital twins provided by an Eclipse Ditto backend.

Features

  • Digital twin management: CRUD (create, read, update, delete) of Ditto things
  • Change notifications: consume notifications whenever a “watched” digital twin is modified
  • Send/receive messages to/from devices connected via a digital twin
  • Use the live channel in order to react on commands directed to devices targeting their “live” state

Communication channel

The Ditto Java client interacts with an Eclipse Ditto backend via Ditto’s WebSocket sending and receiving messages in Ditto Protocol.

Usage

Maven coordinates:

<dependency>
   <groupId>org.eclipse.ditto</groupId>
   <artifactId>ditto-client</artifactId>
   <version>${ditto-client.version}</version>
</dependency>

Instantiate & configure a new Ditto client

To configure your Ditto client instance, use the org.eclipse.ditto.client.configuration package in order to

  • create instances of AuthenticationProvider and MessagingProvider
  • create a DittoClient instance

For example:

ProxyConfiguration proxyConfiguration =
    ProxyConfiguration.newBuilder()
        .proxyHost("localhost")
        .proxyPort(3128)
        .build();

AuthenticationProvider authenticationProvider =
    AuthenticationProviders.clientCredentials(ClientCredentialsAuthenticationConfiguration.newBuilder()
        .clientId("my-oauth-client-id")
        .clientSecret("my-oauth-client-secret")
        .scopes("offline_access email")
        .tokenEndpoint("https://my-oauth-provider/oauth/token")
        // optionally configure a proxy server
        .proxyConfiguration(proxyConfiguration)
        .build());

MessagingProvider messagingProvider =
    MessagingProviders.webSocket(WebSocketMessagingConfiguration.newBuilder()
        .endpoint("wss://ditto.eclipse.org")
        .jsonSchemaVersion(JsonSchemaVersion.V_2)
        // optionally configure a proxy server or a truststore containing the trusted CAs for SSL connection establishment
        .proxyConfiguration(proxyConfiguration)
        .trustStoreConfiguration(TrustStoreConfiguration.newBuilder()
            .location(TRUSTSTORE_LOCATION)
            .password(TRUSTSTORE_PASSWORD)
            .build())
        .build(), authenticationProvider);

DittoClient client = DittoClients.newInstance(messagingProvider);

Use the Ditto client

Manage twins

client.twin().create("org.eclipse.ditto:new-thing").handle((createdThing, throwable) -> {
    if (createdThing != null) {
        System.out.println("Created new thing: " + createdThing);
    } else {
        System.out.println("Thing could not be created due to: " + throwable.getMessage());
    }
    return client.twin().forId(thingId).putAttribute("first-updated-at", OffsetDateTime.now().toString());
}).get(); // this will block the thread! work asynchronously whenever possible!

Subscribe for change notifications

In order to subscribe for events emitted by Ditto after a twin was modified, start the consumption on the twin channel:

client.twin().startConsumption().get();
System.out.println("Subscribed for Twin events");
client.twin().registerForThingChanges("my-changes", change -> {
   if (change.getAction() == ChangeAction.CREATED) {
       System.out.println("An existing Thing was modified: " + change.getThing());
       // perform custom actions ..
   }
});

There is also the possibility here to apply server side filtering of which events will get delivered to the client:

client.twin().startConsumption(
   Options.Consumption.filter("gt(features/temperature/properties/value,23.0)")
).get();
System.out.println("Subscribed for Twin events");
client.twin().registerForFeaturePropertyChanges("my-feature-changes", "temperature", "value", change -> {
   // perform custom actions ..
});
Subscribe to enriched change notifications
Available since Ditto 1.1.0

In order to use enrichment in the Ditto Java client, the startConsumption() call can be enhanced with the additional extra fields:

client.twin().startConsumption(
   Options.Consumption.extraFields(JsonFieldSelector.newInstance("attributes/location"))
).get();
client.twin().registerForThingChanges("my-enriched-changes", change -> {
   Optional<JsonObject> extra = change.getExtra();
   // perform custom actions, making use of the 'extra' data ..
});

In combination with a filter, the extra fields may also be used as part of such a filter:

client.twin().startConsumption(
   Options.Consumption.extraFields(JsonFieldSelector.newInstance("attributes/location")),
   Options.Consumption.filter("eq(attributes/location,\"kitchen\")")
).get();
// register the callbacks...

Send/receive messages

Register for receiving messages with the subject hello.world on any thing:

client.live().registerForMessage("globalMessageHandler", "hello.world", message -> {
   System.out.println("Received Message with subject " +  message.getSubject());
   message.reply()
      .statusCode(HttpStatusCode.IM_A_TEAPOT)
      .payload("Hello, I'm just a Teapot!")
      .send();
});

Send a message with the subject hello.world to the thing with ID org.eclipse.ditto:new-thing:

client.live().forId("org.eclipse.ditto:new-thing")
   .message()
   .from()
   .subject("hello.world")
   .payload("I am a Teapot")
   .send(String.class, (response, throwable) ->
      System.out.println("Got response: " + response.getPayload().orElse(null))
   );

Manage policies

Available since Ditto 1.1.0

Read a policy:

Policy retrievedPolicy = client.policies().retrieve(PolicyId.of("org.eclipse.ditto:new-policy"))
   .get(); // this will block the thread! work asynchronously whenever possible!

Create a policy:

Policy newPolicy = Policy.newBuilder(PolicyId.of("org.eclipse.ditto:new-policy"))
   .forLabel("DEFAULT")
   .setSubject(Subject.newInstance(SubjectIssuer.newInstance("nginx"), "ditto"))
   .setGrantedPermissions(PoliciesResourceType.policyResource("/"), "READ", "WRITE")
   .setGrantedPermissions(PoliciesResourceType.thingResource("/"), "READ", "WRITE")
   .build();

client.policies().create(newPolicy)
   .get(); // this will block the thread! work asynchronously whenever possible!

Updating and deleting policies is also possible via the Java client API, please follow the API and the JavaDoc.

Search for things

Available since Ditto 1.1.0

Search for things using the Java 8 java.util.Stream API:

client.twin().search()
   .stream(queryBuilder -> queryBuilder.namespace("org.eclipse.ditto")
      .filter("eq(attributes/location,'kitchen')") // apply RQL expression here
      .options(builder -> builder.sort(s -> s.desc("thingId")).size(1))
   )
   .forEach(foundThing -> System.out.println("Found thing: " + foundThing));

Use an RQL query in order to filter for the searched things.

Search for things using the reactive streams org.reactivestreams.Publisher API:

Publisher<List<Thing>> publisher = client.twin().search()
   .publisher(queryBuilder -> queryBuilder.namespace("org.eclipse.ditto")
      .filter("eq(attributes/location,'kitchen')") // apply RQL expression here
      .options(builder -> builder.sort(s -> s.desc("thingId")).size(1))
   );
// integrate the publisher in the reactive streams library of your choice, e.g. Akka streams:
akka.stream.javadsl.Source<Thing, NotUsed> things = akka.stream.javadsl.Source.fromPublisher(publisher)
   .flatMapConcat(Source::from);
// .. proceed working with the Akka Source ..

Request and issue acknowledgements

Available since Ditto 1.1.0

Requesting acknowledgements is possible in the Ditto Java client in the following way:

DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
   .acknowledgementRequest(
      AcknowledgementRequest.of(DittoAcknowledgementLabel.PERSISTED),
      AcknowledgementRequest.of(AcknowledgementLabel.of("my-custom-ack"))
   )
   .timeout("5s")
   .build();

client.twin().forId(ThingId.of("org.eclipse.ditto:my-thing"))
   .putAttribute("counter", 42, Options.dittoHeaders(dittoHeaders))
   .whenComplete((aVoid, throwable) -> {
      if (throwable instanceof AcknowledgementsFailedException) {
         Acknowledgements acknowledgements = ((AcknowledgementsFailedException) throwable).getAcknowledgements();
         System.out.println("Acknowledgements could not be fulfilled: " + acknowledgements);
      }   
   });

Issuing requested acknowledgements can be done like this whenever a Change callback is invoked with a change notification:

client.twin().registerForThingChanges("REG1", change -> {
   change.handleAcknowledgementRequest(AcknowledgementLabel.of("my-custom-ack"), ackHandle ->
      ackHandle.acknowledge(HttpStatusCode.NOT_FOUND, JsonObject.newBuilder()
         .set("error-detail", "Could not be found")
         .build()
      )
   );
});

Further examples

For further examples on how to use the Ditto client, please have a look at the class DittoClientUsageExamples which is configured to connect to the Ditto sandbox.

Tags: