ConcurrentQueue::cancelPending: notify _waitVar

This allows to call cancelPending() and waitAll() concurrently with no
additional synchronization. While calling these two functions
concurrently may not be useful often, supporting it costs little in
terms of performance and code complexity. Furthermore, a completely
thread-safe class is easier to document and use correctly.

Optimize job finalization by notifying _waitVar only if jobsLeft is
reduced to 0.

Optimize cancelPending() by not locking jobsLeftMutex when no job is
canceled (if the queue is empty when this function is called).

Add assertions that verify invariants.

The output of ConcurrentQueueTest::randomCalls() reflects the fact that
a caller of waitAll() can be blocked indefinitely when another thread
cancels all queued jobs while no job is being executed. The test output
snippet below omits repetitive information: the
"QINFO  : ConcurrentQueueTest::randomCalls(queue{1}; 2 user thread(s))"
prefix of each line, current thread id and the common hh:mm:ss current
time part (leaving only the millisecond part). Note that thread #1
begins waiting at the 1st line of the snippet. "=> -8" means that total
equals -8 and thus ConcurrentQueue::jobsLeft equals 8. Thread #0 then
cancels all 8 queued jobs, then enqueues and cancels 3 jobs twice, then
momentarily waits 3 times. #1 is not waked until #0 enqueues 8 more jobs
and starts waiting for them too. Once these new 8 jobs are completed,
both #0 and #1 end waiting.

 1. [ms] 311 | #1 begin waiting for 1 thread => -8
 2. [ms] 311 | #0 enqueuing complete.
 3. [ms] 311 | #0 canceled 8 jobs => -8
 4. [ms] 311 | #0 enqueuing 3 jobs...
 5. [ms] 312 | #0 enqueuing complete.
 6. [ms] 312 | #0 canceled 3 jobs => -3
 7. [ms] 312 | #0 enqueuing 3 jobs...
 8. [ms] 312 | #0 enqueuing complete.
 9. [ms] 312 | #0 canceled 3 jobs => -3
10. [ms] 312 | #0 begin waiting for 1 thread => 0
11. [ms] 312 | #0 end waiting for 1 thread => 0
12. [ms] 312 | #0 begin waiting for 1 thread => 0
13. [ms] 312 | #0 end waiting for 1 thread => 0
14. [ms] 312 | #0 begin waiting for 1 thread => 0
15. [ms] 312 | #0 end waiting for 1 thread => 0
16. [ms] 312 | #0 canceled 0 jobs => 0
17. [ms] 312 | #0 canceled 0 jobs => 0
18. [ms] 312 | #0 enqueuing 3 jobs...
19. [ms] 312 | #0 enqueuing complete.
20. [ms] 312 | #0 enqueuing 3 jobs...
21. [ms] 312 | #0 enqueuing complete.
22. [ms] 312 | #0 enqueuing 2 jobs...
23. [ms] 312 | #0 enqueuing complete.
24. [ms] 312 | #0 begin waiting for 1 thread => -8
25. [ms] 312 | [0.1] sleep 0.003 ms...
26. [ms] 312 | [0.1] +1 => -7
27. [ms] 312 | [0.2] sleep 0.003 ms...
28. [ms] 312 | [0.2] +1 => -6
29. [ms] 312 | [0.3] sleep 0.003 ms...
30. [ms] 312 | [0.3] +1 => -5
31. [ms] 312 | [0.1] sleep 0 ms...
32. [ms] 313 | [0.1] +1 => -4
33. [ms] 313 | [0.2] sleep 0.005 ms...
34. [ms] 313 | [0.2] +1 => -3
35. [ms] 313 | [0.3] sleep 0 ms...
36. [ms] 313 | [0.3] +1 => -2
37. [ms] 313 | [0.1] sleep 0.001 ms...
38. [ms] 313 | [0.1] +1 => -1
39. [ms] 313 | [0.2] sleep 0.001 ms...
40. [ms] 313 | [0.2] +1 => 0
41. [ms] 313 | #0 end waiting for 1 thread => 0
42. [ms] 313 | #1 end waiting for 1 thread => 0
This commit is contained in:
Igor Kushnir 2021-03-16 10:25:48 +02:00 committed by Luis Ángel San Martín
parent b514ba1270
commit 57eb8d0171

View File

@ -1,11 +1,14 @@
#ifndef CONCURRENT_QUEUE_H
#define CONCURRENT_QUEUE_H
#include <cassert>
#include <thread>
#include <limits>
#include <mutex>
#include <functional>
#include <condition_variable>
#include <queue>
#include <vector>
namespace YACReader {
class ConcurrentQueue
@ -54,11 +57,11 @@ public:
// temporary (which destroys _queue's elements and deallocates memory).
_queue.swap(oldQueue);
}
const auto size = oldQueue.size();
{
const std::lock_guard<std::mutex> lock(jobsLeftMutex);
jobsLeft -= size;
}
assert(size <= std::numeric_limits<int>::max());
if (size != 0)
finalizeJobs(static_cast<int>(size));
return size;
}
@ -107,16 +110,27 @@ private:
}
job();
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
--jobsLeft;
}
_waitVar.notify_all();
finalizeJobs(1);
}
}
void finalizeJobs(int count)
{
assert(count > 0);
int remainingJobs;
{
std::lock_guard<std::mutex> lock(jobsLeftMutex);
assert(jobsLeft >= count);
jobsLeft -= count;
remainingJobs = jobsLeft;
}
assert(remainingJobs >= 0);
if (remainingJobs == 0)
_waitVar.notify_all();
}
void joinAll()
{
{