24 #ifndef __mqtt_async_client_h
25 #define __mqtt_async_client_h
27 #include "MQTTAsync.h"
48 const uint32_t VERSION = 0x00090000;
50 const string VERSION_STR(
"Paho MQTT C++ (mqttpp) v. 0.9");
52 const string COPYRIGHT(
"Copyright (c) 2013-2017 Frank Pagliughi");
64 using ptr_t = std::shared_ptr<async_client>;
70 using guard = std::unique_lock<std::mutex>;
72 using unique_lock = std::unique_lock<std::mutex>;
75 mutable std::mutex lock_;
83 std::unique_ptr<MQTTClient_persistence> persist_;
89 std::list<token_ptr> pendingTokens_;
91 std::list<delivery_token_ptr> pendingDeliveryTokens_;
96 static void on_connected(
void* context,
char* cause);
97 static void on_connection_lost(
void *context,
char *cause);
98 static int on_message_arrived(
void* context,
char* topicName,
int topicLen,
99 MQTTAsync_message* msg);
100 static void on_delivery_complete(
void* context, MQTTAsync_token tok);
104 virtual void add_token(token_ptr tok);
105 virtual void add_token(delivery_token_ptr tok);
106 virtual void remove_token(
token* tok)
override;
107 virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
108 void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
111 async_client() =
delete;
112 async_client(
const async_client&) =
delete;
113 async_client& operator=(
const async_client&) =
delete;
126 async_client(
const string& serverURI,
const string& clientId,
127 const string& persistDir);
140 async_client(
const string& serverURI,
const string& clientId,
141 iclient_persistence* persistence=
nullptr);
154 async_client(
const string& serverURI,
const string& clientId,
155 int maxBufferedMessages,
const string& persistDir);
170 async_client(
const string& serverURI,
const string& clientId,
171 int maxBufferedMessages, iclient_persistence* persistence=
nullptr);
193 token_ptr
connect(connect_options options)
override;
207 token_ptr
connect(connect_options options,
void* userContext,
208 iaction_listener& cb)
override;
265 template <
class Rep,
class Period>
266 token_ptr
disconnect(
const std::chrono::duration<Rep, Period>& timeout) {
268 return disconnect((
int) to_milliseconds_count(timeout));
284 token_ptr
disconnect(
int timeout,
void* userContext,
300 template <
class Rep,
class Period>
301 token_ptr
disconnect(
const std::chrono::duration<Rep, Period>& timeout,
304 return disconnect((
int) to_milliseconds_count(timeout), userContext, cb);
344 bool is_connected()
const override {
return to_bool(MQTTAsync_isConnected(cli_)); }
358 int qos,
bool retained)
override;
368 return publish(std::move(topic), payload, n,
383 int qos,
bool retained)
override;
392 return publish(std::move(topic), std::move(payload),
411 const void* payload,
size_t n,
412 int qos,
bool retained,
422 delivery_token_ptr
publish(const_message_ptr msg)
override;
434 delivery_token_ptr
publish(const_message_ptr msg,
460 token_ptr
subscribe(const_string_collection_ptr topicFilters,
476 token_ptr
subscribe(const_string_collection_ptr topicFilters,
488 token_ptr
subscribe(
const string& topicFilter,
int qos)
override;
504 token_ptr
subscribe(
const string& topicFilter,
int qos,
513 token_ptr
unsubscribe(
const string& topicFilter)
override;
522 token_ptr
unsubscribe(const_string_collection_ptr topicFilters)
override;
533 token_ptr
unsubscribe(const_string_collection_ptr topicFilters,
574 return que_->try_get(msg);
583 template <
typename Rep,
class Period>
585 const std::chrono::duration<Rep, Period>& relTime) {
586 return que_->try_get_for(msg, relTime);
595 template <
class Clock,
class Duration>
597 const std::chrono::time_point<Clock,Duration>& absTime) {
598 return que_->try_get_until(msg, absTime);
610 #endif // __mqtt_async_client_h
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Disconnects from the server.
Definition: async_client.h:317
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
Returns the delivery tokens for any outstanding publish operations.
Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation t...
Definition: async_client.h:60
string get_server_uri() const override
Returns the address of the server used by this client.
Definition: async_client.h:339
std::shared_ptr< async_client > ptr_t
Smart/shared pointer for an object of this class.
Definition: async_client.h:64
Basic types and type conversions for the Paho MQTT C++ library.
token_ptr reconnect() override
Reconnects the client using options from the previous connect.
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: iaction_listener.h:48
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition: async_client.h:220
const_message_ptr consume_message()
Read the next message from the queue.
Definition: async_client.h:566
Implementation of the template class 'thread_queue', a thread-safe, blocking queue for passing data b...
delivery_token_ptr get_pending_delivery_token(int msgID) const override
Returns the delivery token for the specified message ID.
bool is_connected() const override
Determines if this client is currently connected to the server.
Definition: async_client.h:344
token_ptr connect() override
Connects to an MQTT server using the default options.
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Disconnects from the server.
Definition: async_client.h:266
Definition of the string_collection class for the Paho MQTT C++ library.
Holds the set of options that control how the client connects to a server.
Definition: connect_options.h:46
Represents a topic destination, used for publish/subscribe messaging.
Definition: topic.h:42
Declaration of MQTT message class.
Declaration of MQTT callback class.
Declaration of MQTT exception class.
void start_consuming()
Start consuming messages.
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
Publishes a message to a topic on the server.
Enables an application to communicate with an MQTT server using non-blocking methods.
Definition: iasync_client.h:57
void disable_callbacks() override
Stops callbacks.
bool try_consume_message(const_message_ptr *msg)
Try to read the next message from the queue without blocking.
Definition: async_client.h:573
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Waits a limited time for a message to arrive.
Definition: async_client.h:584
~async_client() override
Destructor.
void stop_consuming()
Stop consuming messages.
Declaration of MQTT iaction_listener class.
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos) override
Subscribe to multiple topics, each of which may include wildcards.
static constexpr bool DFLT_RETAINED
The default retained flag.
Definition: message.h:58
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: callback.h:41
token_ptr unsubscribe(const string &topicFilter) override
Requests the server unsubscribe the client from a topic.
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Waits until a specific time for a message to occur.
Definition: async_client.h:596
Declaration of MQTT delivery_token class.
string get_client_id() const override
Returns the client ID used by this client.
Definition: async_client.h:334
Declaration of MQTT iclient_persistence interface.
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: token.h:49
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Publishes a message to a topic on the server.
Definition: async_client.h:391
std::unique_ptr< thread_queue< const_message_ptr >> consumer_queue_type
Type for a thread-safe queue to consume messages synchronously.
Definition: async_client.h:66
std::vector< int > qos_collection
Type for a collection of QOS values.
Definition: iasync_client.h:64
token_ptr disconnect() override
Disconnects from the server.
Definition: async_client.h:235
void set_callback(callback &cb) override
Sets a callback listener to use for events that happen asynchronously.
static constexpr int DFLT_QOS
The default QoS for a message.
Definition: message.h:56
Options for disconnecting from an MQTT broker.
Definition: disconnect_options.h:37
Implementation of the interface for the asynchronous clients, 'iasync_client'.
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Publishes a message to a topic on the server.
Definition: async_client.h:367
Declaration of MQTT token class.
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Disconnects from the server.
Definition: async_client.h:301