From fa5ce254253041e9d3e506a568f64a2db7c0c07d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20=C3=81ngel=20San=20Mart=C3=ADn?= Date: Thu, 14 Jan 2021 09:03:17 +0100 Subject: [PATCH] Add concurrent queue based on lambdas --- YACReaderLibrary/YACReaderLibrary.pro | 1 + common/concurrent_queue.h | 132 ++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 common/concurrent_queue.h diff --git a/YACReaderLibrary/YACReaderLibrary.pro b/YACReaderLibrary/YACReaderLibrary.pro index 156056df..6a81ad51 100644 --- a/YACReaderLibrary/YACReaderLibrary.pro +++ b/YACReaderLibrary/YACReaderLibrary.pro @@ -76,6 +76,7 @@ QT += sql network widgets script # Input HEADERS += comic_flow.h \ + ../common/concurrent_queue.h \ create_library_dialog.h \ db/query_lexer.h \ library_creator.h \ diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h new file mode 100644 index 00000000..5b92f6f1 --- /dev/null +++ b/common/concurrent_queue.h @@ -0,0 +1,132 @@ +#ifndef CONCURRENT_QUEUE_H +#define CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace YACReader { +class ConcurrentQueue +{ +public: + explicit ConcurrentQueue(int threadCount) + : jobsLeft(0), + bailout(false) + { + threads = std::vector(threadCount); + for (int index = 0; index < threadCount; ++index) { + threads[index] = std::thread([this] { + this->nextJob(); + }); + } + } + + ~ConcurrentQueue() + { + joinAll(); + } + + void enqueue(std::function job) + { + { + std::lock_guard lock(queueMutex); + _queue.emplace(job); + } + + { + std::lock_guard lock(jobsLeftMutex); + ++jobsLeft; + } + + jobAvailableVar.notify_one(); + } + + void cancellPending() + { + std::unique_lock lock(jobsLeftMutex); + _queue = std::queue>(); + jobsLeft = 0; + } + + void waitAll() + { + std::unique_lock lock(jobsLeftMutex); + if (jobsLeft > 0) { + _waitVar.wait(lock, [this] { + return jobsLeft == 0; + }); + } + } + +private: + std::vector threads; + std::queue> _queue; + int jobsLeft; + bool bailout; + std::condition_variable jobAvailableVar; + std::condition_variable _waitVar; + std::mutex jobsLeftMutex; + std::mutex queueMutex; + + void nextJob() + { + while (true) { + std::function job; + + { + std::unique_lock lock(queueMutex); + + if (bailout) { + return; + } + + jobAvailableVar.wait(lock, [this] { + return _queue.size() > 0 || bailout; + }); + + if (bailout) { + return; + } + + job = _queue.front(); + _queue.pop(); + } + + job(); + + { + std::lock_guard lock(jobsLeftMutex); + --jobsLeft; + } + + _waitVar.notify_one(); + } + } + + void joinAll() + { + { + std::lock_guard lock(queueMutex); + + if (bailout) { + return; + } + + bailout = true; + } + + jobAvailableVar.notify_all(); + + for (auto &x : threads) { + if (x.joinable()) { + x.join(); + } + } + } +}; + +} + +#endif // CONCURRENT_QUEUE_H