diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 0d8384fa..5be10b0d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -1,11 +1,14 @@ #ifndef CONCURRENT_QUEUE_H #define CONCURRENT_QUEUE_H +#include #include +#include #include #include #include #include +#include namespace YACReader { class ConcurrentQueue @@ -54,11 +57,11 @@ public: // temporary (which destroys _queue's elements and deallocates memory). _queue.swap(oldQueue); } + const auto size = oldQueue.size(); - { - const std::lock_guard lock(jobsLeftMutex); - jobsLeft -= size; - } + assert(size <= std::numeric_limits::max()); + if (size != 0) + finalizeJobs(static_cast(size)); return size; } @@ -107,16 +110,27 @@ private: } job(); - - { - std::lock_guard lock(jobsLeftMutex); - --jobsLeft; - } - - _waitVar.notify_all(); + finalizeJobs(1); } } + void finalizeJobs(int count) + { + assert(count > 0); + + int remainingJobs; + { + std::lock_guard lock(jobsLeftMutex); + assert(jobsLeft >= count); + jobsLeft -= count; + remainingJobs = jobsLeft; + } + + assert(remainingJobs >= 0); + if (remainingJobs == 0) + _waitVar.notify_all(); + } + void joinAll() { {