25 #ifndef __mqtt_thread_queue_h
26 #define __mqtt_thread_queue_h
30 #include <condition_variable>
66 template <
typename T,
class Container=std::deque<T>>
82 mutable std::mutex lock_;
84 std::condition_variable notEmptyCond_;
86 std::condition_variable notFullCond_;
90 std::queue<T,Container> que_;
93 using guard = std::lock_guard<std::mutex>;
95 using unique_guard = std::unique_lock<std::mutex>;
150 unique_guard g(lock_);
153 notFullCond_.wait(g, [=]{
return que_.size() < cap_;});
154 que_.emplace(std::move(val));
157 notEmptyCond_.notify_one();
167 unique_guard g(lock_);
171 que_.emplace(std::move(val));
174 notEmptyCond_.notify_one();
186 template <
typename Rep,
class Period>
188 unique_guard g(lock_);
190 if (n >= cap_ && !notFullCond_.wait_for(g, relTime, [=]{
return que_.size() < cap_;}))
192 que_.emplace(std::move(val));
195 notEmptyCond_.notify_one();
209 template <
class Clock,
class Duration>
211 unique_guard g(lock_);
213 if (n >= cap_ && !notFullCond_.wait_until(g, absTime, [=]{
return que_.size() < cap_;}))
215 que_.emplace(std::move(val));
218 notEmptyCond_.notify_one();
229 unique_guard g(lock_);
230 auto n = que_.size();
232 notEmptyCond_.wait(g, [=]{
return !que_.empty();});
233 *val = std::move(que_.front());
237 notFullCond_.notify_one();
247 unique_guard g(lock_);
248 auto n = que_.size();
250 notEmptyCond_.wait(g, [=]{
return !que_.empty();});
255 notFullCond_.notify_one();
268 unique_guard g(lock_);
269 auto n = que_.size();
272 *val = std::move(que_.front());
276 notFullCond_.notify_one();
290 template <
typename Rep,
class Period>
292 unique_guard g(lock_);
293 auto n = que_.size();
294 if (n == 0 && !notEmptyCond_.wait_for(g, relTime, [=]{
return !que_.empty();}))
296 *val = std::move(que_.front());
300 notFullCond_.notify_one();
314 template <
class Clock,
class Duration>
316 unique_guard g(lock_);
317 auto n = que_.size();
318 if (n == 0 && !notEmptyCond_.wait_until(g, absTime, [=]{
return !que_.empty();}))
320 *val = std::move(que_.front());
324 notFullCond_.notify_one();
334 #endif // __mqtt_thread_queue_h
size_type size() const
Gets the number of items in the queue.
Definition: thread_queue.h:139
void capacity(size_type cap)
Sets the capacity of the queue.
Definition: thread_queue.h:131
thread_queue(size_t cap)
Constructs a queue with the specified capacity.
Definition: thread_queue.h:107
bool try_put_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Attempt to place an item in the queue with a bounded wait to an absolute time point.
Definition: thread_queue.h:210
bool try_get(value_type *val)
Attempts to remove a value from the queue without blocking.
Definition: thread_queue.h:267
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Attempt to remove an item from the queue for a bounded amout of time.
Definition: thread_queue.h:315
Container container_type
The underlying container type to use for the queue.
Definition: thread_queue.h:71
static constexpr size_type MAX_CAPACITY
The maximum capacity of the queue.
Definition: thread_queue.h:78
typename Container::size_type size_type
The type used to specify number of items in the container.
Definition: thread_queue.h:75
thread_queue()
Constructs a queue with the maximum capacity.
Definition: thread_queue.h:101
A thread-safe queue for inter-thread communication.
Definition: thread_queue.h:67
size_type capacity() const
Gets the capacity of the queue.
Definition: thread_queue.h:121
bool empty() const
Determine if the queue is empty.
Definition: thread_queue.h:113
bool try_put(value_type val)
Non-blocking attempt to place an item into the queue.
Definition: thread_queue.h:166
void put(value_type val)
Put an item into the queue.
Definition: thread_queue.h:149
bool try_put_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Attempt to place an item in the queue with a bounded wait.
Definition: thread_queue.h:187
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Attempt to remove an item from the queue for a bounded amout of time.
Definition: thread_queue.h:291
T value_type
The type of items to be held in the queue.
Definition: thread_queue.h:73