yacreader/common/concurrent_queue.h
Igor Kushnir d869e1230b ConcurrentQueue::enqueue: increment jobsLeft before adding a job
ConcurrentQueueTest::randomCalls() built in Debug mode often crashes
when concurrent_queue_test.cpp is modified and ConcurrentQueueTest is
launched from Qt Creator so that the test is built and immediately run.
The crash is an assertion failure that occurs most of the time under
the described circumstances at different test data rows:
void YACReader::ConcurrentQueue::finalizeJobs(int): Assertion `jobsLeft >= count' failed.

The assertion fails because ConcurrentQueue::enqueue() adds a job into
the queue first and then increments jobsLeft. If the job is immediately
picked up and executed very fast, ConcurrentQueue::nextJob() can try to
finalize it before enqueue() increments jobsLeft.

Simply reordering the modifications of jobsLeft and _queue in enqueue()
ensures that jobsLeft is always non-negative and eliminates the
assertion failures. Note that ConcurrentQueue::finalizeJobs() is the
only other function that modifies (decreases) jobsLeft. finalizeJobs()
is always called *after* the queue's size is reduced. So the following
invariant is now maintained at all times and documented:
jobsLeft >= _queue.size().
2021-12-29 09:36:44 +01:00

159 lines
3.5 KiB
C++

#ifndef CONCURRENT_QUEUE_H
#define CONCURRENT_QUEUE_H
#include <cassert>
#include <thread>
#include <limits>
#include <mutex>
#include <functional>
#include <condition_variable>
#include <queue>
#include <vector>
namespace YACReader {
class ConcurrentQueue
{
public:
explicit ConcurrentQueue(int threadCount)
: jobsLeft(0),
bailout(false)
{
threads = std::vector<std::thread>(threadCount);
for (int index = 0; index < threadCount; ++index) {
threads[index] = std::thread([this] {
this->nextJob();
});
}
}
~ConcurrentQueue()
{
joinAll();
}
void enqueue(std::function<void(void)> job)
{
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
++jobsLeft;
}
{
std::lock_guard<std::mutex> lock(queueMutex);
_queue.emplace(job);
}
jobAvailableVar.notify_one();
}
//! @brief Cancels all jobs that have not been picked up by worker threads yet.
//! @return The number of jobs that were canceled.
std::size_t cancelPending()
{
decltype(_queue) oldQueue;
{
const std::lock_guard<std::mutex> lock(queueMutex);
// The mutex locking time is lower with swap() compared to assigning a
// temporary (which destroys _queue's elements and deallocates memory).
_queue.swap(oldQueue);
}
const auto size = oldQueue.size();
assert(size <= std::numeric_limits<int>::max());
if (size != 0)
finalizeJobs(static_cast<int>(size));
return size;
}
void waitAll()
{
std::unique_lock<std::mutex> lock(jobsLeftMutex);
if (jobsLeft > 0) {
_waitVar.wait(lock, [this] {
return jobsLeft == 0;
});
}
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void(void)>> _queue;
int jobsLeft; //!< @invariant jobsLeft >= _queue.size()
bool bailout;
std::condition_variable jobAvailableVar;
std::condition_variable _waitVar;
std::mutex jobsLeftMutex;
std::mutex queueMutex;
void nextJob()
{
while (true) {
std::function<void(void)> job;
{
std::unique_lock<std::mutex> lock(queueMutex);
if (bailout) {
return;
}
jobAvailableVar.wait(lock, [this] {
return _queue.size() > 0 || bailout;
});
if (bailout) {
return;
}
job = _queue.front();
_queue.pop();
}
job();
finalizeJobs(1);
}
}
void finalizeJobs(int count)
{
assert(count > 0);
int remainingJobs;
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
assert(jobsLeft >= count);
jobsLeft -= count;
remainingJobs = jobsLeft;
}
assert(remainingJobs >= 0);
if (remainingJobs == 0)
_waitVar.notify_all();
}
void joinAll()
{
{
std::lock_guard<std::mutex> lock(queueMutex);
if (bailout) {
return;
}
bailout = true;
}
jobAvailableVar.notify_all();
for (auto &x : threads) {
if (x.joinable()) {
x.join();
}
}
}
};
}
#endif // CONCURRENT_QUEUE_H