Paho C++  1.0
The Paho MQTT C++ Client Library
 All Classes Files Functions Variables Typedefs Friends
async_client.h
Go to the documentation of this file.
1 
8 /*******************************************************************************
9  * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
10  *
11  * All rights reserved. This program and the accompanying materials
12  * are made available under the terms of the Eclipse Public License v1.0
13  * and Eclipse Distribution License v1.0 which accompany this distribution.
14  *
15  * The Eclipse Public License is available at
16  * http://www.eclipse.org/legal/epl-v10.html
17  * and the Eclipse Distribution License is available at
18  * http://www.eclipse.org/org/documents/edl-v10.php.
19  *
20  * Contributors:
21  * Frank Pagliughi - initial implementation and documentation
22  *******************************************************************************/
23 
24 #ifndef __mqtt_async_client_h
25 #define __mqtt_async_client_h
26 
27 #include "MQTTAsync.h"
28 #include "mqtt/types.h"
29 #include "mqtt/token.h"
30 #include "mqtt/string_collection.h"
31 #include "mqtt/delivery_token.h"
33 #include "mqtt/iaction_listener.h"
34 #include "mqtt/exception.h"
35 #include "mqtt/message.h"
36 #include "mqtt/callback.h"
37 #include "mqtt/thread_queue.h"
38 #include "mqtt/iasync_client.h"
39 #include <vector>
40 #include <list>
41 #include <memory>
42 #include <tuple>
43 #include <stdexcept>
44 
45 namespace mqtt {
46 
48 const uint32_t VERSION = 0x00090000;
50 const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 0.9");
52 const string COPYRIGHT("Copyright (c) 2013-2017 Frank Pagliughi");
53 
55 
60 class async_client : public virtual iasync_client
61 {
62 public:
64  using ptr_t = std::shared_ptr<async_client>;
66  using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
67 
68 private:
70  using guard = std::unique_lock<std::mutex>;
72  using unique_lock = std::unique_lock<std::mutex>;
73 
75  mutable std::mutex lock_;
77  MQTTAsync cli_;
79  string serverURI_;
81  string clientId_;
83  std::unique_ptr<MQTTClient_persistence> persist_;
85  callback* userCallback_;
87  token_ptr connTok_;
89  std::list<token_ptr> pendingTokens_;
91  std::list<delivery_token_ptr> pendingDeliveryTokens_;
94 
96  static void on_connected(void* context, char* cause);
97  static void on_connection_lost(void *context, char *cause);
98  static int on_message_arrived(void* context, char* topicName, int topicLen,
99  MQTTAsync_message* msg);
100  static void on_delivery_complete(void* context, MQTTAsync_token tok);
101 
103  friend class token;
104  virtual void add_token(token_ptr tok);
105  virtual void add_token(delivery_token_ptr tok);
106  virtual void remove_token(token* tok) override;
107  virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
108  void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
109 
111  async_client() =delete;
112  async_client(const async_client&) =delete;
113  async_client& operator=(const async_client&) =delete;
114 
115 public:
126  async_client(const string& serverURI, const string& clientId,
127  const string& persistDir);
140  async_client(const string& serverURI, const string& clientId,
141  iclient_persistence* persistence=nullptr);
154  async_client(const string& serverURI, const string& clientId,
155  int maxBufferedMessages, const string& persistDir);
170  async_client(const string& serverURI, const string& clientId,
171  int maxBufferedMessages, iclient_persistence* persistence=nullptr);
175  ~async_client() override;
183  token_ptr connect() override;
193  token_ptr connect(connect_options options) override;
207  token_ptr connect(connect_options options, void* userContext,
208  iaction_listener& cb) override;
220  token_ptr connect(void* userContext, iaction_listener& cb) override {
221  return connect(connect_options{}, userContext, cb);
222  }
228  token_ptr reconnect() override;
235  token_ptr disconnect() override { return disconnect(disconnect_options()); }
243  token_ptr disconnect(disconnect_options opts) override;
254  token_ptr disconnect(int timeout) override;
265  template <class Rep, class Period>
266  token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
267  // TODO: check range
268  return disconnect((int) to_milliseconds_count(timeout));
269  }
284  token_ptr disconnect(int timeout, void* userContext,
285  iaction_listener& cb) override;
300  template <class Rep, class Period>
301  token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout,
302  void* userContext, iaction_listener& cb) {
303  // TODO: check range
304  return disconnect((int) to_milliseconds_count(timeout), userContext, cb);
305  }
317  token_ptr disconnect(void* userContext, iaction_listener& cb) override {
318  return disconnect(0L, userContext, cb);
319  }
324  delivery_token_ptr get_pending_delivery_token(int msgID) const override;
329  std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
334  string get_client_id() const override { return clientId_; }
339  string get_server_uri() const override { return serverURI_; }
344  bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
357  delivery_token_ptr publish(string_ref topic, const void* payload, size_t n,
358  int qos, bool retained) override;
367  delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
368  return publish(std::move(topic), payload, n,
370  }
382  delivery_token_ptr publish(string_ref topic, binary_ref payload,
383  int qos, bool retained) override;
391  delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
392  return publish(std::move(topic), std::move(payload),
394  }
410  delivery_token_ptr publish(string_ref topic,
411  const void* payload, size_t n,
412  int qos, bool retained,
413  void* userContext, iaction_listener& cb) override;
422  delivery_token_ptr publish(const_message_ptr msg) override;
434  delivery_token_ptr publish(const_message_ptr msg,
435  void* userContext, iaction_listener& cb) override;
442  void set_callback(callback& cb) override;
448  void disable_callbacks() override;
460  token_ptr subscribe(const_string_collection_ptr topicFilters,
461  const qos_collection& qos) override;
476  token_ptr subscribe(const_string_collection_ptr topicFilters,
477  const qos_collection& qos,
478  void* userContext, iaction_listener& cb) override;
488  token_ptr subscribe(const string& topicFilter, int qos) override;
504  token_ptr subscribe(const string& topicFilter, int qos,
505  void* userContext, iaction_listener& cb) override;
513  token_ptr unsubscribe(const string& topicFilter) override;
522  token_ptr unsubscribe(const_string_collection_ptr topicFilters) override;
533  token_ptr unsubscribe(const_string_collection_ptr topicFilters,
534  void* userContext, iaction_listener& cb) override;
546  token_ptr unsubscribe(const string& topicFilter,
547  void* userContext, iaction_listener& cb) override;
548 
554  void start_consuming();
560  void stop_consuming();
566  const_message_ptr consume_message() { return que_->get(); }
573  bool try_consume_message(const_message_ptr* msg) {
574  return que_->try_get(msg);
575  }
583  template <typename Rep, class Period>
584  bool try_consume_message_for(const_message_ptr* msg,
585  const std::chrono::duration<Rep, Period>& relTime) {
586  return que_->try_get_for(msg, relTime);
587  }
595  template <class Clock, class Duration>
596  bool try_consume_message_until(const_message_ptr* msg,
597  const std::chrono::time_point<Clock,Duration>& absTime) {
598  return que_->try_get_until(msg, absTime);
599  }
600 
601 };
602 
604 using async_client_ptr = async_client::ptr_t;
605 
607 // end namespace mqtt
608 }
609 
610 #endif // __mqtt_async_client_h
611 
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Disconnects from the server.
Definition: async_client.h:317
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
Returns the delivery tokens for any outstanding publish operations.
Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation t...
Definition: async_client.h:60
string get_server_uri() const override
Returns the address of the server used by this client.
Definition: async_client.h:339
std::shared_ptr< async_client > ptr_t
Smart/shared pointer for an object of this class.
Definition: async_client.h:64
Basic types and type conversions for the Paho MQTT C++ library.
token_ptr reconnect() override
Reconnects the client using options from the previous connect.
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: iaction_listener.h:48
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition: async_client.h:220
const_message_ptr consume_message()
Read the next message from the queue.
Definition: async_client.h:566
Implementation of the template class 'thread_queue', a thread-safe, blocking queue for passing data b...
delivery_token_ptr get_pending_delivery_token(int msgID) const override
Returns the delivery token for the specified message ID.
bool is_connected() const override
Determines if this client is currently connected to the server.
Definition: async_client.h:344
token_ptr connect() override
Connects to an MQTT server using the default options.
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Disconnects from the server.
Definition: async_client.h:266
Definition of the string_collection class for the Paho MQTT C++ library.
Holds the set of options that control how the client connects to a server.
Definition: connect_options.h:46
Represents a topic destination, used for publish/subscribe messaging.
Definition: topic.h:42
Declaration of MQTT message class.
Declaration of MQTT callback class.
Declaration of MQTT exception class.
void start_consuming()
Start consuming messages.
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
Publishes a message to a topic on the server.
Enables an application to communicate with an MQTT server using non-blocking methods.
Definition: iasync_client.h:57
void disable_callbacks() override
Stops callbacks.
bool try_consume_message(const_message_ptr *msg)
Try to read the next message from the queue without blocking.
Definition: async_client.h:573
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Waits a limited time for a message to arrive.
Definition: async_client.h:584
~async_client() override
Destructor.
void stop_consuming()
Stop consuming messages.
Declaration of MQTT iaction_listener class.
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos) override
Subscribe to multiple topics, each of which may include wildcards.
static constexpr bool DFLT_RETAINED
The default retained flag.
Definition: message.h:58
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: callback.h:41
token_ptr unsubscribe(const string &topicFilter) override
Requests the server unsubscribe the client from a topic.
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Waits until a specific time for a message to occur.
Definition: async_client.h:596
Declaration of MQTT delivery_token class.
string get_client_id() const override
Returns the client ID used by this client.
Definition: async_client.h:334
Declaration of MQTT iclient_persistence interface.
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: token.h:49
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Publishes a message to a topic on the server.
Definition: async_client.h:391
std::unique_ptr< thread_queue< const_message_ptr >> consumer_queue_type
Type for a thread-safe queue to consume messages synchronously.
Definition: async_client.h:66
std::vector< int > qos_collection
Type for a collection of QOS values.
Definition: iasync_client.h:64
token_ptr disconnect() override
Disconnects from the server.
Definition: async_client.h:235
void set_callback(callback &cb) override
Sets a callback listener to use for events that happen asynchronously.
static constexpr int DFLT_QOS
The default QoS for a message.
Definition: message.h:56
Options for disconnecting from an MQTT broker.
Definition: disconnect_options.h:37
Implementation of the interface for the asynchronous clients, 'iasync_client'.
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Publishes a message to a topic on the server.
Definition: async_client.h:367
Declaration of MQTT token class.
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Disconnects from the server.
Definition: async_client.h:301