From 4cb542c8cca6aea8698491743ed65bb3cc8ed239 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 09:51:59 +0200 Subject: [PATCH] Remove ConcurrentQueue::joinAll() This function is called only from ~ConcurrentQueue(). joinAll() is not thread-safe and it cannot be called earlier without introducing a null state. Moving the function's implementation into the definition of ~ConcurrentQueue() makes the code clearer. Removing joinAll() also allows to establish and document invariants for two data members. Assert consistency between jobsLeft and _queue in ~ConcurrentQueue(). --- common/concurrent_queue.cpp | 32 ++++++++++---------------------- common/concurrent_queue.h | 4 ++-- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/common/concurrent_queue.cpp b/common/concurrent_queue.cpp index 7f73c2d0..28a0efe1 100644 --- a/common/concurrent_queue.cpp +++ b/common/concurrent_queue.cpp @@ -16,7 +16,16 @@ ConcurrentQueue::ConcurrentQueue(std::size_t threadCount) ConcurrentQueue::~ConcurrentQueue() { - joinAll(); + { + std::lock_guard lock(queueMutex); + assert(!bailout); + bailout = true; + } + jobAvailableVar.notify_all(); + + for (auto &x : threads) + x.join(); + assert(jobsLeft == _queue.size() && "Only not yet started jobs are left."); } void ConcurrentQueue::enqueue(Job job) @@ -104,24 +113,3 @@ void ConcurrentQueue::finalizeJobs(std::size_t count) if (remainingJobs == 0) _waitVar.notify_all(); } - -void ConcurrentQueue::joinAll() -{ - { - std::lock_guard lock(queueMutex); - - if (bailout) { - return; - } - - bailout = true; - } - - jobAvailableVar.notify_all(); - - for (auto &x : threads) { - if (x.joinable()) { - x.join(); - } - } -} diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index a27c505e..4d3c4f23 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -36,10 +36,11 @@ public: void waitAll(); private: + //! @invariant all worker threads are joinable until the destructor is called. std::vector threads; std::queue _queue; std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size() - bool bailout = false; + bool bailout = false; //!< @invariant is false until the destructor is called. std::condition_variable jobAvailableVar; std::condition_variable _waitVar; std::mutex jobsLeftMutex; @@ -47,7 +48,6 @@ private: void nextJob(); void finalizeJobs(std::size_t count); - void joinAll(); }; }