From 57eb8d0171f3beb235808734d3edeaf11b626550 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Tue, 16 Mar 2021 10:25:48 +0200 Subject: [PATCH] ConcurrentQueue::cancelPending: notify _waitVar This allows to call cancelPending() and waitAll() concurrently with no additional synchronization. While calling these two functions concurrently may not be useful often, supporting it costs little in terms of performance and code complexity. Furthermore, a completely thread-safe class is easier to document and use correctly. Optimize job finalization by notifying _waitVar only if jobsLeft is reduced to 0. Optimize cancelPending() by not locking jobsLeftMutex when no job is canceled (if the queue is empty when this function is called). Add assertions that verify invariants. The output of ConcurrentQueueTest::randomCalls() reflects the fact that a caller of waitAll() can be blocked indefinitely when another thread cancels all queued jobs while no job is being executed. The test output snippet below omits repetitive information: the "QINFO : ConcurrentQueueTest::randomCalls(queue{1}; 2 user thread(s))" prefix of each line, current thread id and the common hh:mm:ss current time part (leaving only the millisecond part). Note that thread #1 begins waiting at the 1st line of the snippet. "=> -8" means that total equals -8 and thus ConcurrentQueue::jobsLeft equals 8. Thread #0 then cancels all 8 queued jobs, then enqueues and cancels 3 jobs twice, then momentarily waits 3 times. #1 is not waked until #0 enqueues 8 more jobs and starts waiting for them too. Once these new 8 jobs are completed, both #0 and #1 end waiting. 1. [ms] 311 | #1 begin waiting for 1 thread => -8 2. [ms] 311 | #0 enqueuing complete. 3. [ms] 311 | #0 canceled 8 jobs => -8 4. [ms] 311 | #0 enqueuing 3 jobs... 5. [ms] 312 | #0 enqueuing complete. 6. [ms] 312 | #0 canceled 3 jobs => -3 7. [ms] 312 | #0 enqueuing 3 jobs... 8. [ms] 312 | #0 enqueuing complete. 9. [ms] 312 | #0 canceled 3 jobs => -3 10. [ms] 312 | #0 begin waiting for 1 thread => 0 11. [ms] 312 | #0 end waiting for 1 thread => 0 12. [ms] 312 | #0 begin waiting for 1 thread => 0 13. [ms] 312 | #0 end waiting for 1 thread => 0 14. [ms] 312 | #0 begin waiting for 1 thread => 0 15. [ms] 312 | #0 end waiting for 1 thread => 0 16. [ms] 312 | #0 canceled 0 jobs => 0 17. [ms] 312 | #0 canceled 0 jobs => 0 18. [ms] 312 | #0 enqueuing 3 jobs... 19. [ms] 312 | #0 enqueuing complete. 20. [ms] 312 | #0 enqueuing 3 jobs... 21. [ms] 312 | #0 enqueuing complete. 22. [ms] 312 | #0 enqueuing 2 jobs... 23. [ms] 312 | #0 enqueuing complete. 24. [ms] 312 | #0 begin waiting for 1 thread => -8 25. [ms] 312 | [0.1] sleep 0.003 ms... 26. [ms] 312 | [0.1] +1 => -7 27. [ms] 312 | [0.2] sleep 0.003 ms... 28. [ms] 312 | [0.2] +1 => -6 29. [ms] 312 | [0.3] sleep 0.003 ms... 30. [ms] 312 | [0.3] +1 => -5 31. [ms] 312 | [0.1] sleep 0 ms... 32. [ms] 313 | [0.1] +1 => -4 33. [ms] 313 | [0.2] sleep 0.005 ms... 34. [ms] 313 | [0.2] +1 => -3 35. [ms] 313 | [0.3] sleep 0 ms... 36. [ms] 313 | [0.3] +1 => -2 37. [ms] 313 | [0.1] sleep 0.001 ms... 38. [ms] 313 | [0.1] +1 => -1 39. [ms] 313 | [0.2] sleep 0.001 ms... 40. [ms] 313 | [0.2] +1 => 0 41. [ms] 313 | #0 end waiting for 1 thread => 0 42. [ms] 313 | #1 end waiting for 1 thread => 0 --- common/concurrent_queue.h | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 0d8384fa..5be10b0d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -1,11 +1,14 @@ #ifndef CONCURRENT_QUEUE_H #define CONCURRENT_QUEUE_H +#include #include +#include #include #include #include #include +#include namespace YACReader { class ConcurrentQueue @@ -54,11 +57,11 @@ public: // temporary (which destroys _queue's elements and deallocates memory). _queue.swap(oldQueue); } + const auto size = oldQueue.size(); - { - const std::lock_guard lock(jobsLeftMutex); - jobsLeft -= size; - } + assert(size <= std::numeric_limits::max()); + if (size != 0) + finalizeJobs(static_cast(size)); return size; } @@ -107,16 +110,27 @@ private: } job(); - - { - std::lock_guard lock(jobsLeftMutex); - --jobsLeft; - } - - _waitVar.notify_all(); + finalizeJobs(1); } } + void finalizeJobs(int count) + { + assert(count > 0); + + int remainingJobs; + { + std::lock_guard lock(jobsLeftMutex); + assert(jobsLeft >= count); + jobsLeft -= count; + remainingJobs = jobsLeft; + } + + assert(remainingJobs >= 0); + if (remainingJobs == 0) + _waitVar.notify_all(); + } + void joinAll() { {