Paho C++  1.0
The Paho MQTT C++ Client Library
 All Classes Files Functions Variables Typedefs Friends
thread_queue.h
Go to the documentation of this file.
1 
9 /*******************************************************************************
10  * Copyright (c) 2017 Frank Pagliughi <fpagliughi@mindspring.com>
11  *
12  * All rights reserved. This program and the accompanying materials
13  * are made available under the terms of the Eclipse Public License v1.0
14  * and Eclipse Distribution License v1.0 which accompany this distribution.
15  *
16  * The Eclipse Public License is available at
17  * http://www.eclipse.org/legal/epl-v10.html
18  * and the Eclipse Distribution License is available at
19  * http://www.eclipse.org/org/documents/edl-v10.php.
20  *
21  * Contributors:
22  * Frank Pagliughi - initial implementation and documentation
23  *******************************************************************************/
24 
25 #ifndef __mqtt_thread_queue_h
26 #define __mqtt_thread_queue_h
27 
28 #include <thread>
29 #include <mutex>
30 #include <condition_variable>
31 #include <limits>
32 #include <deque>
33 #include <queue>
34 
35 namespace mqtt {
36 
38 
66 template <typename T, class Container=std::deque<T>>
68 {
69 public:
71  using container_type = Container;
73  using value_type = T;
75  using size_type = typename Container::size_type;
76 
78  static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
79 
80 private:
82  mutable std::mutex lock_;
84  std::condition_variable notEmptyCond_;
86  std::condition_variable notFullCond_;
88  size_type cap_;
90  std::queue<T,Container> que_;
91 
93  using guard = std::lock_guard<std::mutex>;
95  using unique_guard = std::unique_lock<std::mutex>;
96 
97 public:
107  explicit thread_queue(size_t cap) : cap_(cap) {}
113  bool empty() const {
114  guard g(lock_);
115  return que_.empty();
116  }
121  size_type capacity() const {
122  guard g(lock_);
123  return cap_;
124  }
131  void capacity(size_type cap) {
132  guard g(lock_);
133  cap_ = cap;
134  }
139  size_type size() const {
140  guard g(lock_);
141  return que_.size();
142  }
149  void put(value_type val) {
150  unique_guard g(lock_);
151  size_type n = que_.size();
152  if (n >= cap_)
153  notFullCond_.wait(g, [=]{return que_.size() < cap_;});
154  que_.emplace(std::move(val));
155  if (n == 0) {
156  g.unlock();
157  notEmptyCond_.notify_one();
158  }
159  }
166  bool try_put(value_type val) {
167  unique_guard g(lock_);
168  size_type n = que_.size();
169  if (n >= cap_)
170  return false;
171  que_.emplace(std::move(val));
172  if (n == 0) {
173  g.unlock();
174  notEmptyCond_.notify_one();
175  }
176  }
186  template <typename Rep, class Period>
187  bool try_put_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
188  unique_guard g(lock_);
189  size_type n = que_.size();
190  if (n >= cap_ && !notFullCond_.wait_for(g, relTime, [=]{return que_.size() < cap_;}))
191  return false;
192  que_.emplace(std::move(val));
193  if (n == 0) {
194  g.unlock();
195  notEmptyCond_.notify_one();
196  }
197  return true;
198  }
209  template <class Clock, class Duration>
210  bool try_put_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
211  unique_guard g(lock_);
212  size_type n = que_.size();
213  if (n >= cap_ && !notFullCond_.wait_until(g, absTime, [=]{return que_.size() < cap_;}))
214  return false;
215  que_.emplace(std::move(val));
216  if (n == 0) {
217  g.unlock();
218  notEmptyCond_.notify_one();
219  }
220  return true;
221  }
228  void get(value_type* val) {
229  unique_guard g(lock_);
230  auto n = que_.size();
231  if (n == 0)
232  notEmptyCond_.wait(g, [=]{return !que_.empty();});
233  *val = std::move(que_.front());
234  que_.pop();
235  if (n == cap_) {
236  g.unlock();
237  notFullCond_.notify_one();
238  }
239  }
246  value_type get() {
247  unique_guard g(lock_);
248  auto n = que_.size();
249  if (n == 0)
250  notEmptyCond_.wait(g, [=]{return !que_.empty();});
251  value_type val = std::move(que_.front());
252  que_.pop();
253  if (n == cap_) {
254  g.unlock();
255  notFullCond_.notify_one();
256  }
257  return val;
258  }
267  bool try_get(value_type* val) {
268  unique_guard g(lock_);
269  auto n = que_.size();
270  if (n == 0)
271  return false;
272  *val = std::move(que_.front());
273  que_.pop();
274  if (n == cap_) {
275  g.unlock();
276  notFullCond_.notify_one();
277  }
278  return true;
279  }
290  template <typename Rep, class Period>
291  bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
292  unique_guard g(lock_);
293  auto n = que_.size();
294  if (n == 0 && !notEmptyCond_.wait_for(g, relTime, [=]{return !que_.empty();}))
295  return false;
296  *val = std::move(que_.front());
297  que_.pop();
298  if (n == cap_) {
299  g.unlock();
300  notFullCond_.notify_one();
301  }
302  return true;
303  }
314  template <class Clock, class Duration>
315  bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
316  unique_guard g(lock_);
317  auto n = que_.size();
318  if (n == 0 && !notEmptyCond_.wait_until(g, absTime, [=]{return !que_.empty();}))
319  return false;
320  *val = std::move(que_.front());
321  que_.pop();
322  if (n == cap_) {
323  g.unlock();
324  notFullCond_.notify_one();
325  }
326  return true;
327  }
328 };
329 
331 // end namespace mqtt
332 }
333 
334 #endif // __mqtt_thread_queue_h
335 
size_type size() const
Gets the number of items in the queue.
Definition: thread_queue.h:139
void capacity(size_type cap)
Sets the capacity of the queue.
Definition: thread_queue.h:131
thread_queue(size_t cap)
Constructs a queue with the specified capacity.
Definition: thread_queue.h:107
bool try_put_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Attempt to place an item in the queue with a bounded wait to an absolute time point.
Definition: thread_queue.h:210
bool try_get(value_type *val)
Attempts to remove a value from the queue without blocking.
Definition: thread_queue.h:267
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Attempt to remove an item from the queue for a bounded amout of time.
Definition: thread_queue.h:315
Container container_type
The underlying container type to use for the queue.
Definition: thread_queue.h:71
static constexpr size_type MAX_CAPACITY
The maximum capacity of the queue.
Definition: thread_queue.h:78
typename Container::size_type size_type
The type used to specify number of items in the container.
Definition: thread_queue.h:75
thread_queue()
Constructs a queue with the maximum capacity.
Definition: thread_queue.h:101
A thread-safe queue for inter-thread communication.
Definition: thread_queue.h:67
size_type capacity() const
Gets the capacity of the queue.
Definition: thread_queue.h:121
bool empty() const
Determine if the queue is empty.
Definition: thread_queue.h:113
bool try_put(value_type val)
Non-blocking attempt to place an item into the queue.
Definition: thread_queue.h:166
void put(value_type val)
Put an item into the queue.
Definition: thread_queue.h:149
bool try_put_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Attempt to place an item in the queue with a bounded wait.
Definition: thread_queue.h:187
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Attempt to remove an item from the queue for a bounded amout of time.
Definition: thread_queue.h:291
T value_type
The type of items to be held in the queue.
Definition: thread_queue.h:73