#ifndef CONCURRENT_QUEUE_H #define CONCURRENT_QUEUE_H #include #include #include #include #include namespace YACReader { class ConcurrentQueue { public: explicit ConcurrentQueue(int threadCount) : jobsLeft(0), bailout(false) { threads = std::vector(threadCount); for (int index = 0; index < threadCount; ++index) { threads[index] = std::thread([this] { this->nextJob(); }); } } ~ConcurrentQueue() { joinAll(); } void enqueue(std::function job) { { std::lock_guard lock(queueMutex); _queue.emplace(job); } { std::lock_guard lock(jobsLeftMutex); ++jobsLeft; } jobAvailableVar.notify_one(); } void cancellPending() { std::unique_lock lockQueue(queueMutex); std::unique_lock lockJobsLeft(jobsLeftMutex); _queue = std::queue>(); jobsLeft = 0; } void waitAll() { std::unique_lock lock(jobsLeftMutex); if (jobsLeft > 0) { _waitVar.wait(lock, [this] { return jobsLeft == 0; }); } } private: std::vector threads; std::queue> _queue; int jobsLeft; bool bailout; std::condition_variable jobAvailableVar; std::condition_variable _waitVar; std::mutex jobsLeftMutex; std::mutex queueMutex; void nextJob() { while (true) { std::function job; { std::unique_lock lock(queueMutex); if (bailout) { return; } jobAvailableVar.wait(lock, [this] { return _queue.size() > 0 || bailout; }); if (bailout) { return; } job = _queue.front(); _queue.pop(); } job(); { std::lock_guard lock(jobsLeftMutex); --jobsLeft; } _waitVar.notify_one(); } } void joinAll() { { std::lock_guard lock(queueMutex); if (bailout) { return; } bailout = true; } jobAvailableVar.notify_all(); for (auto &x : threads) { if (x.joinable()) { x.join(); } } } }; } #endif // CONCURRENT_QUEUE_H