Document ConcurrentQueue and de-inline its implementation

ConcurrentQueue is currently used only by two classes and a test, but
modifying concurrent_queue.h requires recompiling 30 source files. None
of the member functions is so lightweight as to make it worth inlining.

An alternative to `@note ConcurrentQueue is unable to execute jobs if
@p threadCount == 0.` is `assert(threadCount != 0);`. But this would
force classes that contain a ConcurrentQueue data member to always start
a thread, even if they detect at runtime that they are never going to
enqueue a job.

Add Job type alias to avoid repeating the type.

Use default member initializers instead of the member initializer list
to make it clear [to the reader of the header] that no data member is
left uninitialized.
This commit is contained in:
Igor Kushnir 2021-03-18 09:10:51 +02:00 committed by Luis Ángel San Martín
parent 2655613543
commit 61cd245037
4 changed files with 152 additions and 120 deletions

View File

@ -160,6 +160,7 @@ HEADERS += comic_flow.h \
} }
SOURCES += comic_flow.cpp \ SOURCES += comic_flow.cpp \
../common/concurrent_queue.cpp \
create_library_dialog.cpp \ create_library_dialog.cpp \
db/comic_query_result_processor.cpp \ db/comic_query_result_processor.cpp \
db/folder_query_result_processor.cpp \ db/folder_query_result_processor.cpp \

127
common/concurrent_queue.cpp Normal file
View File

@ -0,0 +1,127 @@
#include "concurrent_queue.h"
#include <cassert>
#include <cstddef>
#include <mutex>
#include <utility>
using namespace YACReader;
ConcurrentQueue::ConcurrentQueue(std::size_t threadCount)
{
threads.reserve(threadCount);
for (; threadCount != 0; --threadCount)
threads.emplace_back(&ConcurrentQueue::nextJob, this);
}
ConcurrentQueue::~ConcurrentQueue()
{
joinAll();
}
void ConcurrentQueue::enqueue(Job job)
{
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
++jobsLeft;
}
{
std::lock_guard<std::mutex> lock(queueMutex);
_queue.emplace(std::move(job));
}
jobAvailableVar.notify_one();
}
std::size_t ConcurrentQueue::cancelPending()
{
decltype(_queue) oldQueue;
{
const std::lock_guard<std::mutex> lock(queueMutex);
// The mutex locking time is lower with swap() compared to assigning a
// temporary (which destroys _queue's elements and deallocates memory).
_queue.swap(oldQueue);
}
const auto size = oldQueue.size();
if (size != 0)
finalizeJobs(size);
return size;
}
void ConcurrentQueue::waitAll()
{
std::unique_lock<std::mutex> lock(jobsLeftMutex);
if (jobsLeft > 0) {
_waitVar.wait(lock, [this] {
return jobsLeft == 0;
});
}
}
void ConcurrentQueue::nextJob()
{
while (true) {
Job job;
{
std::unique_lock<std::mutex> lock(queueMutex);
if (bailout) {
return;
}
jobAvailableVar.wait(lock, [this] {
return _queue.size() > 0 || bailout;
});
if (bailout) {
return;
}
job = std::move(_queue.front());
_queue.pop();
}
job();
finalizeJobs(1);
}
}
void ConcurrentQueue::finalizeJobs(std::size_t count)
{
assert(count > 0);
std::size_t remainingJobs;
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
assert(jobsLeft >= count);
jobsLeft -= count;
remainingJobs = jobsLeft;
}
if (remainingJobs == 0)
_waitVar.notify_all();
}
void ConcurrentQueue::joinAll()
{
{
std::lock_guard<std::mutex> lock(queueMutex);
if (bailout) {
return;
}
bailout = true;
}
jobAvailableVar.notify_all();
for (auto &x : threads) {
if (x.joinable()) {
x.join();
}
}
}

View File

@ -1,151 +1,53 @@
#ifndef CONCURRENT_QUEUE_H #ifndef CONCURRENT_QUEUE_H
#define CONCURRENT_QUEUE_H #define CONCURRENT_QUEUE_H
#include <cassert>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <functional> #include <functional>
#include <condition_variable> #include <condition_variable>
#include <queue> #include <queue>
#include <utility>
#include <vector> #include <vector>
namespace YACReader { namespace YACReader {
//! All functions in this class are thread-safe in the Qt documentation sense.
class ConcurrentQueue class ConcurrentQueue
{ {
public: public:
explicit ConcurrentQueue(std::size_t threadCount) //! @brief Creates and starts executing @p threadCount worker threads.
: jobsLeft(0), //! @note ConcurrentQueue is unable to execute jobs if @p threadCount == 0.
bailout(false) explicit ConcurrentQueue(std::size_t threadCount);
{
threads.reserve(threadCount);
for (; threadCount != 0; --threadCount)
threads.emplace_back(&ConcurrentQueue::nextJob, this);
}
~ConcurrentQueue() //! Cancels all jobs that have not been picked up by worker threads yet,
{ //! waits for all worker threads to complete their jobs and joins them.
joinAll(); ~ConcurrentQueue();
}
void enqueue(std::function<void(void)> job) using Job = std::function<void()>;
{
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
++jobsLeft;
}
{ //! @brief Adds @p job to the queue.
std::lock_guard<std::mutex> lock(queueMutex); //! @note A worker thread may start executing @p job immediately if it is idle.
_queue.emplace(std::move(job)); //! Worker threads start executing jobs in the same order as they are enqueued.
} void enqueue(Job job);
jobAvailableVar.notify_one();
}
//! @brief Cancels all jobs that have not been picked up by worker threads yet. //! @brief Cancels all jobs that have not been picked up by worker threads yet.
//! @return The number of jobs that were canceled. //! @return The number of jobs that were canceled.
std::size_t cancelPending() std::size_t cancelPending();
{
decltype(_queue) oldQueue;
{
const std::lock_guard<std::mutex> lock(queueMutex);
// The mutex locking time is lower with swap() compared to assigning a
// temporary (which destroys _queue's elements and deallocates memory).
_queue.swap(oldQueue);
}
const auto size = oldQueue.size(); //! @brief Blocks the current thread until all enqueued jobs are completed.
if (size != 0) void waitAll();
finalizeJobs(size);
return size;
}
void waitAll()
{
std::unique_lock<std::mutex> lock(jobsLeftMutex);
if (jobsLeft > 0) {
_waitVar.wait(lock, [this] {
return jobsLeft == 0;
});
}
}
private: private:
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::queue<std::function<void(void)>> _queue; std::queue<Job> _queue;
std::size_t jobsLeft; //!< @invariant jobsLeft >= _queue.size() std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size()
bool bailout; bool bailout = false;
std::condition_variable jobAvailableVar; std::condition_variable jobAvailableVar;
std::condition_variable _waitVar; std::condition_variable _waitVar;
std::mutex jobsLeftMutex; std::mutex jobsLeftMutex;
std::mutex queueMutex; std::mutex queueMutex;
void nextJob() void nextJob();
{ void finalizeJobs(std::size_t count);
while (true) { void joinAll();
std::function<void(void)> job;
{
std::unique_lock<std::mutex> lock(queueMutex);
if (bailout) {
return;
}
jobAvailableVar.wait(lock, [this] {
return _queue.size() > 0 || bailout;
});
if (bailout) {
return;
}
job = std::move(_queue.front());
_queue.pop();
}
job();
finalizeJobs(1);
}
}
void finalizeJobs(std::size_t count)
{
assert(count > 0);
std::size_t remainingJobs;
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
assert(jobsLeft >= count);
jobsLeft -= count;
remainingJobs = jobsLeft;
}
if (remainingJobs == 0)
_waitVar.notify_all();
}
void joinAll()
{
{
std::lock_guard<std::mutex> lock(queueMutex);
if (bailout) {
return;
}
bailout = true;
}
jobAvailableVar.notify_all();
for (auto &x : threads) {
if (x.joinable()) {
x.join();
}
}
}
}; };
} }

View File

@ -4,4 +4,6 @@ PATH_TO_common = ../../common
INCLUDEPATH += $$PATH_TO_common INCLUDEPATH += $$PATH_TO_common
HEADERS += $${PATH_TO_common}/concurrent_queue.h HEADERS += $${PATH_TO_common}/concurrent_queue.h
SOURCES += concurrent_queue_test.cpp SOURCES += \
$${PATH_TO_common}/concurrent_queue.cpp \
concurrent_queue_test.cpp