7#include <condition_variable>
19 mutable std::mutex _mutex;
20 std::condition_variable _deq_cv;
21 std::condition_variable _enq_cv;
23 unsigned int const _cap;
26 std::function<void(T
const &)>
const _on_drop_callback;
30 std::function<
void( T
const & ) > on_drop_callback =
nullptr )
33 , _on_drop_callback( on_drop_callback )
41 std::unique_lock<std::mutex> lock(_mutex);
44 if( _on_drop_callback )
45 _on_drop_callback( item );
49 _queue.push_back(std::move(item));
51 if( _queue.size() > _cap )
53 if( _on_drop_callback )
54 _on_drop_callback( _queue.front() );
71 std::unique_lock<std::mutex> lock(_mutex);
72 _enq_cv.wait( lock, [
this]() {
73 return _queue.size() < _cap; } );
77 if( _on_drop_callback )
78 _on_drop_callback( item );
82 _queue.push_back(std::move(item));
94 bool dequeue( T * item,
unsigned int timeout_ms )
96 std::unique_lock<std::mutex> lock(_mutex);
97 if( ! _deq_cv.wait_for( lock,
98 std::chrono::milliseconds( timeout_ms ),
99 [
this]() { return ! _accepting || ! _queue.empty(); } )
105 *item = std::move(_queue.front());
109 _enq_cv.notify_one();
118 std::lock_guard< std::mutex > lock( _mutex );
122 *item = std::move(_queue.front());
126 _enq_cv.notify_one();
134 std::lock_guard< std::mutex > lock( _mutex );
137 fn( _queue.front() );
144 std::lock_guard< std::mutex > lock( _mutex );
147 fn( _queue.front() );
153 std::lock_guard< std::mutex > lock( _mutex );
163 std::lock_guard< std::mutex > lock( _mutex );
173 _enq_cv.notify_all();
174 _deq_cv.notify_all();
180 std::lock_guard< std::mutex > lock( _mutex );
189 std::lock_guard< std::mutex > lock( _mutex );
190 return _queue.size();
204 std::function<
void( T
const & ) > on_drop_callback =
nullptr )
205 : _queue( cap, on_drop_callback )
211 if( item->is_blocking() )
214 return _queue.
enqueue( std::move( item ) );
217 bool dequeue(T* item,
unsigned int timeout_ms)
219 return _queue.
dequeue(item, timeout_ms);
230 return _queue.
peek( fn );
236 return _queue.
peek( fn );
256 return _queue.
size();
261 return _queue.
empty();
295 template<
class Duration >
298 using namespace std::chrono;
300 std::unique_lock<std::mutex> lock(_owner->_was_stopped_mutex);
306 _owner->_was_stopped_cv.wait_for( lock, sleep_time, [&]() {
return was_stopped(); } ) );
318 std::function<
void(
action ) > on_drop_callback =
nullptr );
332 void invoke(T item,
bool is_blocking =
false)
339 _queue.
enqueue(std::move(item));
346 void invoke_and_wait(T item, std::function<
bool()> exit_condition,
bool is_blocking =
false)
351 auto func = std::move(item);
354 std::lock_guard<std::mutex> lk(_blocking_invoke_mutex);
358 _blocking_invoke_cv.notify_one();
362 std::unique_lock<std::mutex> lk(_blocking_invoke_mutex);
363 _blocking_invoke_cv.wait(lk, [&](){
return done || exit_condition(); });
379 bool flush(std::chrono::steady_clock::duration timeout = std::chrono::seconds(10) );
386 bool _wait_for_start(
int timeout_ms );
393 std::atomic<bool> _was_stopped;
394 std::condition_variable _was_stopped_cv;
395 std::mutex _was_stopped_mutex;
397 std::mutex _dispatch_mutex;
399 std::condition_variable _blocking_invoke_cv;
400 std::mutex _blocking_invoke_mutex;
402 std::atomic<bool> _is_alive;
405template<
class T = std::function<
void(dispatcher::cancellable_timer)>>
410 : _operation(std::move(operation)), _dispatcher(1), _stopped(true)
424 if (!_stopped.load()) {
454 std::atomic<bool> _stopped;
460 watchdog(std::function<
void()> operation, uint64_t timeout_ms) :
461 _timeout_ms(timeout_ms), _operation(std::move(operation))
465 if(cancellable_timer.try_sleep( std::chrono::milliseconds( _timeout_ms )))
469 std::lock_guard<std::mutex> lk(_m);
481 void start() { std::lock_guard<std::mutex> lk(_m); _watcher->start(); _running =
true; }
482 void stop() { { std::lock_guard<std::mutex> lk(_m); _running =
false; } _watcher->stop(); }
483 bool running() { std::lock_guard<std::mutex> lk(_m);
return _running; }
484 void set_timeout(uint64_t timeout_ms) { std::lock_guard<std::mutex> lk(_m); _timeout_ms = timeout_ms; }
485 void kick() { std::lock_guard<std::mutex> lk(_m); _kicked =
true; }
489 uint64_t _timeout_ms;
490 bool _kicked =
false;
491 bool _running =
false;
492 std::function<void()> _operation;
493 std::shared_ptr<active_object<>> _watcher;
Definition: concurrency.h:407
active_object(T operation)
Definition: concurrency.h:409
void start()
Definition: concurrency.h:414
~active_object()
Definition: concurrency.h:430
void stop()
Definition: concurrency.h:422
bool is_active() const
Definition: concurrency.h:435
Definition: concurrency.h:280
bool was_stopped() const
Definition: concurrency.h:288
cancellable_timer(dispatcher *owner)
Definition: concurrency.h:284
bool try_sleep(Duration sleep_time)
Definition: concurrency.h:296
Definition: concurrency.h:273
std::function< void(cancellable_timer const &)> action
Definition: concurrency.h:311
dispatcher(unsigned int queue_capacity, std::function< void(action) > on_drop_callback=nullptr)
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)
Definition: concurrency.h:346
void invoke(T item, bool is_blocking=false)
Definition: concurrency.h:332
bool flush(std::chrono::steady_clock::duration timeout=std::chrono::seconds(10))
bool empty() const
Definition: concurrency.h:322
Definition: concurrency.h:199
void start()
Definition: concurrency.h:249
bool empty() const
Definition: concurrency.h:259
bool try_dequeue(T *item)
Definition: concurrency.h:222
size_t size() const
Definition: concurrency.h:254
void clear()
Definition: concurrency.h:239
bool enqueue(T &&item)
Definition: concurrency.h:209
bool peek(Fn fn)
Definition: concurrency.h:234
bool stopped() const
Definition: concurrency.h:265
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:217
bool started() const
Definition: concurrency.h:264
void stop()
Definition: concurrency.h:244
bool peek(Fn fn) const
Definition: concurrency.h:228
Definition: concurrency.h:17
void _clear()
Definition: concurrency.h:168
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:94
bool blocking_enqueue(T &&item)
Definition: concurrency.h:69
bool peek(Fn fn)
Definition: concurrency.h:142
void start()
Definition: concurrency.h:178
bool peek(Fn fn) const
Definition: concurrency.h:132
bool stopped() const
Definition: concurrency.h:185
bool try_dequeue(T *item)
Definition: concurrency.h:116
void stop()
Definition: concurrency.h:151
bool enqueue(T &&item)
Definition: concurrency.h:39
bool started() const
Definition: concurrency.h:184
size_t size() const
Definition: concurrency.h:187
bool empty() const
Definition: concurrency.h:193
void clear()
Definition: concurrency.h:161
Definition: concurrency.h:458
void stop()
Definition: concurrency.h:482
watchdog(std::function< void()> operation, uint64_t timeout_ms)
Definition: concurrency.h:460
void kick()
Definition: concurrency.h:485
void set_timeout(uint64_t timeout_ms)
Definition: concurrency.h:484
~watchdog()
Definition: concurrency.h:475
void start()
Definition: concurrency.h:481
bool running()
Definition: concurrency.h:483
const int QUEUE_MAX_SIZE
Definition: concurrency.h:13