This member function does not affect the logical state of the class.
Making std::condition_variable and std::mutex data members mutable is
idiomatic as these classes are thread-safe synchronization primitives.
This function is called only from ~ConcurrentQueue(). joinAll() is not
thread-safe and it cannot be called earlier without introducing a null
state. Moving the function's implementation into the definition of
~ConcurrentQueue() makes the code clearer. Removing joinAll() also
allows to establish and document invariants for two data members.
Assert consistency between jobsLeft and _queue in ~ConcurrentQueue().
ConcurrentQueue is currently used only by two classes and a test, but
modifying concurrent_queue.h requires recompiling 30 source files. None
of the member functions is so lightweight as to make it worth inlining.
An alternative to `@note ConcurrentQueue is unable to execute jobs if
@p threadCount == 0.` is `assert(threadCount != 0);`. But this would
force classes that contain a ConcurrentQueue data member to always start
a thread, even if they detect at runtime that they are never going to
enqueue a job.
Add Job type alias to avoid repeating the type.
Use default member initializers instead of the member initializer list
to make it clear [to the reader of the header] that no data member is
left uninitialized.
* threadCount argument: int => std::size_t to avoid implicit casting;
* eliminate temporary empty std::thread objects;
* replace a trivial lambda with a function pointer and its argument;
* get rid of the unused dedicated loop counter.
This data member's type can be unsigned because its value is never
negative now. Matching std::queue::size_type allows to improve type
safety, get rid of a static_cast and remove two assertions. The only
downside is a slight increase of sizeof(ConcurrentQueue).
Moving a std::function can be faster than copying it. Correcting these
normally minor inefficiencies is important here because they occur under
a mutex lock.
ConcurrentQueueTest::randomCalls() built in Debug mode often crashes
when concurrent_queue_test.cpp is modified and ConcurrentQueueTest is
launched from Qt Creator so that the test is built and immediately run.
The crash is an assertion failure that occurs most of the time under
the described circumstances at different test data rows:
void YACReader::ConcurrentQueue::finalizeJobs(int): Assertion `jobsLeft >= count' failed.
The assertion fails because ConcurrentQueue::enqueue() adds a job into
the queue first and then increments jobsLeft. If the job is immediately
picked up and executed very fast, ConcurrentQueue::nextJob() can try to
finalize it before enqueue() increments jobsLeft.
Simply reordering the modifications of jobsLeft and _queue in enqueue()
ensures that jobsLeft is always non-negative and eliminates the
assertion failures. Note that ConcurrentQueue::finalizeJobs() is the
only other function that modifies (decreases) jobsLeft. finalizeJobs()
is always called *after* the queue's size is reduced. So the following
invariant is now maintained at all times and documented:
jobsLeft >= _queue.size().
This allows to call cancelPending() and waitAll() concurrently with no
additional synchronization. While calling these two functions
concurrently may not be useful often, supporting it costs little in
terms of performance and code complexity. Furthermore, a completely
thread-safe class is easier to document and use correctly.
Optimize job finalization by notifying _waitVar only if jobsLeft is
reduced to 0.
Optimize cancelPending() by not locking jobsLeftMutex when no job is
canceled (if the queue is empty when this function is called).
Add assertions that verify invariants.
The output of ConcurrentQueueTest::randomCalls() reflects the fact that
a caller of waitAll() can be blocked indefinitely when another thread
cancels all queued jobs while no job is being executed. The test output
snippet below omits repetitive information: the
"QINFO : ConcurrentQueueTest::randomCalls(queue{1}; 2 user thread(s))"
prefix of each line, current thread id and the common hh:mm:ss current
time part (leaving only the millisecond part). Note that thread #1
begins waiting at the 1st line of the snippet. "=> -8" means that total
equals -8 and thus ConcurrentQueue::jobsLeft equals 8. Thread #0 then
cancels all 8 queued jobs, then enqueues and cancels 3 jobs twice, then
momentarily waits 3 times. #1 is not waked until #0 enqueues 8 more jobs
and starts waiting for them too. Once these new 8 jobs are completed,
both #0 and #1 end waiting.
1. [ms] 311 | #1 begin waiting for 1 thread => -8
2. [ms] 311 | #0 enqueuing complete.
3. [ms] 311 | #0 canceled 8 jobs => -8
4. [ms] 311 | #0 enqueuing 3 jobs...
5. [ms] 312 | #0 enqueuing complete.
6. [ms] 312 | #0 canceled 3 jobs => -3
7. [ms] 312 | #0 enqueuing 3 jobs...
8. [ms] 312 | #0 enqueuing complete.
9. [ms] 312 | #0 canceled 3 jobs => -3
10. [ms] 312 | #0 begin waiting for 1 thread => 0
11. [ms] 312 | #0 end waiting for 1 thread => 0
12. [ms] 312 | #0 begin waiting for 1 thread => 0
13. [ms] 312 | #0 end waiting for 1 thread => 0
14. [ms] 312 | #0 begin waiting for 1 thread => 0
15. [ms] 312 | #0 end waiting for 1 thread => 0
16. [ms] 312 | #0 canceled 0 jobs => 0
17. [ms] 312 | #0 canceled 0 jobs => 0
18. [ms] 312 | #0 enqueuing 3 jobs...
19. [ms] 312 | #0 enqueuing complete.
20. [ms] 312 | #0 enqueuing 3 jobs...
21. [ms] 312 | #0 enqueuing complete.
22. [ms] 312 | #0 enqueuing 2 jobs...
23. [ms] 312 | #0 enqueuing complete.
24. [ms] 312 | #0 begin waiting for 1 thread => -8
25. [ms] 312 | [0.1] sleep 0.003 ms...
26. [ms] 312 | [0.1] +1 => -7
27. [ms] 312 | [0.2] sleep 0.003 ms...
28. [ms] 312 | [0.2] +1 => -6
29. [ms] 312 | [0.3] sleep 0.003 ms...
30. [ms] 312 | [0.3] +1 => -5
31. [ms] 312 | [0.1] sleep 0 ms...
32. [ms] 313 | [0.1] +1 => -4
33. [ms] 313 | [0.2] sleep 0.005 ms...
34. [ms] 313 | [0.2] +1 => -3
35. [ms] 313 | [0.3] sleep 0 ms...
36. [ms] 313 | [0.3] +1 => -2
37. [ms] 313 | [0.1] sleep 0.001 ms...
38. [ms] 313 | [0.1] +1 => -1
39. [ms] 313 | [0.2] sleep 0.001 ms...
40. [ms] 313 | [0.2] +1 => 0
41. [ms] 313 | #0 end waiting for 1 thread => 0
42. [ms] 313 | #1 end waiting for 1 thread => 0
Replace _waitVar.notify_one() with _waitVar.notify_all(). This was the
only hurdle to documenting ConcurrentQueue::waitAll() as thread-safe.
ConcurrentQueueTest::waitAllFromMultipleThreads() passes instead of
hanging now.
The return value can be used to make
ConcurrentQueueTest::cancelPending1UserThread() non-flaky. It may also
be useful to non-testing code.
Improve the performance of cancelPending() by locking the two mutexes
separately and minimizing the locking time.
Worker threads may well be executing jobs while this function is being
called. If ConcurrentQueue::waitAll() is called soon enough after
cancelPending(), the worker threads may still be running, but waitAll()
would return immediately as jobsLeft would be nonpositive.
Subtracting _queue.size() from jobsLeft sets this variable to the number
of worker threads that are executing jobs at the moment.
ConcurrentQueueTest::cancelPending1UserThread() passes most of the time
now. But it still fails occasionally because it depends on the timing of
thread scheduling, which is unreliable.