yacreader/common/concurrent_queue.cpp
Igor Kushnir 05b384ed6d Make ConcurrentQueue::waitAll() const
This member function does not affect the logical state of the class.
Making std::condition_variable and std::mutex data members mutable is
idiomatic as these classes are thread-safe synchronization primitives.
2021-12-29 09:36:44 +01:00

108 lines
2.3 KiB
C++

#include "concurrent_queue.h"
#include <cassert>
#include <cstddef>
#include <mutex>
#include <utility>
using namespace YACReader;
ConcurrentQueue::ConcurrentQueue(std::size_t threadCount)
{
threads.reserve(threadCount);
for (; threadCount != 0; --threadCount)
threads.emplace_back(&ConcurrentQueue::nextJob, this);
}
ConcurrentQueue::~ConcurrentQueue()
{
{
std::lock_guard<std::mutex> lock(queueMutex);
assert(!bailout);
bailout = true;
}
jobAvailableVar.notify_all();
for (auto &x : threads)
x.join();
assert(jobsLeft == _queue.size() && "Only not yet started jobs are left.");
}
void ConcurrentQueue::enqueue(Job job)
{
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
++jobsLeft;
}
{
std::lock_guard<std::mutex> lock(queueMutex);
_queue.emplace(std::move(job));
}
jobAvailableVar.notify_one();
}
std::size_t ConcurrentQueue::cancelPending()
{
decltype(_queue) oldQueue;
{
const std::lock_guard<std::mutex> lock(queueMutex);
// The mutex locking time is lower with swap() compared to assigning a
// temporary (which destroys _queue's elements and deallocates memory).
_queue.swap(oldQueue);
}
const auto size = oldQueue.size();
if (size != 0)
finalizeJobs(size);
return size;
}
void ConcurrentQueue::waitAll() const
{
std::unique_lock<std::mutex> lock(jobsLeftMutex);
_waitVar.wait(lock, [this] { return jobsLeft == 0; });
}
void ConcurrentQueue::nextJob()
{
while (true) {
Job job;
{
std::unique_lock<std::mutex> lock(queueMutex);
jobAvailableVar.wait(lock, [this] {
return _queue.size() > 0 || bailout;
});
if (bailout) {
return;
}
job = std::move(_queue.front());
_queue.pop();
}
job();
finalizeJobs(1);
}
}
void ConcurrentQueue::finalizeJobs(std::size_t count)
{
assert(count > 0);
std::size_t remainingJobs;
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
assert(jobsLeft >= count);
jobsLeft -= count;
remainingJobs = jobsLeft;
}
if (remainingJobs == 0)
_waitVar.notify_all();
}