#include "concurrent_queue.h" #include #include #include #include using namespace YACReader; ConcurrentQueue::ConcurrentQueue(std::size_t threadCount) { threads.reserve(threadCount); for (; threadCount != 0; --threadCount) threads.emplace_back(&ConcurrentQueue::nextJob, this); } ConcurrentQueue::~ConcurrentQueue() { { std::lock_guard lock(queueMutex); assert(!bailout); bailout = true; } jobAvailableVar.notify_all(); for (auto &x : threads) x.join(); assert(jobsLeft == _queue.size() && "Only not yet started jobs are left."); } void ConcurrentQueue::enqueue(Job job) { { std::lock_guard lock(jobsLeftMutex); ++jobsLeft; } { std::lock_guard lock(queueMutex); _queue.emplace(std::move(job)); } jobAvailableVar.notify_one(); } std::size_t ConcurrentQueue::cancelPending() { decltype(_queue) oldQueue; { const std::lock_guard lock(queueMutex); // The mutex locking time is lower with swap() compared to assigning a // temporary (which destroys _queue's elements and deallocates memory). _queue.swap(oldQueue); } const auto size = oldQueue.size(); if (size != 0) finalizeJobs(size); return size; } void ConcurrentQueue::waitAll() const { std::unique_lock lock(jobsLeftMutex); _waitVar.wait(lock, [this] { return jobsLeft == 0; }); } void ConcurrentQueue::nextJob() { while (true) { Job job; { std::unique_lock lock(queueMutex); jobAvailableVar.wait(lock, [this] { return _queue.size() > 0 || bailout; }); if (bailout) { return; } job = std::move(_queue.front()); _queue.pop(); } job(); finalizeJobs(1); } } void ConcurrentQueue::finalizeJobs(std::size_t count) { assert(count > 0); std::size_t remainingJobs; { std::lock_guard lock(jobsLeftMutex); assert(jobsLeft >= count); jobsLeft -= count; remainingJobs = jobsLeft; } if (remainingJobs == 0) _waitVar.notify_all(); }