Commit Graph

15 Commits

Author SHA1 Message Date
Igor Kushnir
05b384ed6d Make ConcurrentQueue::waitAll() const
This member function does not affect the logical state of the class.
Making std::condition_variable and std::mutex data members mutable is
idiomatic as these classes are thread-safe synchronization primitives.
2021-12-29 09:36:44 +01:00
Igor Kushnir
4cb542c8cc Remove ConcurrentQueue::joinAll()
This function is called only from ~ConcurrentQueue(). joinAll() is not
thread-safe and it cannot be called earlier without introducing a null
state. Moving the function's implementation into the definition of
~ConcurrentQueue() makes the code clearer. Removing joinAll() also
allows to establish and document invariants for two data members.

Assert consistency between jobsLeft and _queue in ~ConcurrentQueue().
2021-12-29 09:36:44 +01:00
Igor Kushnir
61cd245037 Document ConcurrentQueue and de-inline its implementation
ConcurrentQueue is currently used only by two classes and a test, but
modifying concurrent_queue.h requires recompiling 30 source files. None
of the member functions is so lightweight as to make it worth inlining.

An alternative to `@note ConcurrentQueue is unable to execute jobs if
@p threadCount == 0.` is `assert(threadCount != 0);`. But this would
force classes that contain a ConcurrentQueue data member to always start
a thread, even if they detect at runtime that they are never going to
enqueue a job.

Add Job type alias to avoid repeating the type.

Use default member initializers instead of the member initializer list
to make it clear [to the reader of the header] that no data member is
left uninitialized.
2021-12-29 09:36:44 +01:00
Igor Kushnir
2655613543 ConcurrentQueue: simplify the constructor implementation
* threadCount argument: int => std::size_t to avoid implicit casting;
* eliminate temporary empty std::thread objects;
* replace a trivial lambda with a function pointer and its argument;
* get rid of the unused dedicated loop counter.
2021-12-29 09:36:44 +01:00
Igor Kushnir
d026050d49 ConcurrentQueue::jobsLeft: int => std::size_t
This data member's type can be unsigned because its value is never
negative now. Matching std::queue::size_type allows to improve type
safety, get rid of a static_cast and remove two assertions. The only
downside is a slight increase of sizeof(ConcurrentQueue).
2021-12-29 09:36:44 +01:00
Igor Kushnir
d8a6b7f432 ConcurrentQueue: std::move jobs
Moving a std::function can be faster than copying it. Correcting these
normally minor inefficiencies is important here because they occur under
a mutex lock.
2021-12-29 09:36:44 +01:00
Igor Kushnir
d869e1230b ConcurrentQueue::enqueue: increment jobsLeft before adding a job
ConcurrentQueueTest::randomCalls() built in Debug mode often crashes
when concurrent_queue_test.cpp is modified and ConcurrentQueueTest is
launched from Qt Creator so that the test is built and immediately run.
The crash is an assertion failure that occurs most of the time under
the described circumstances at different test data rows:
void YACReader::ConcurrentQueue::finalizeJobs(int): Assertion `jobsLeft >= count' failed.

The assertion fails because ConcurrentQueue::enqueue() adds a job into
the queue first and then increments jobsLeft. If the job is immediately
picked up and executed very fast, ConcurrentQueue::nextJob() can try to
finalize it before enqueue() increments jobsLeft.

Simply reordering the modifications of jobsLeft and _queue in enqueue()
ensures that jobsLeft is always non-negative and eliminates the
assertion failures. Note that ConcurrentQueue::finalizeJobs() is the
only other function that modifies (decreases) jobsLeft. finalizeJobs()
is always called *after* the queue's size is reduced. So the following
invariant is now maintained at all times and documented:
jobsLeft >= _queue.size().
2021-12-29 09:36:44 +01:00
Igor Kushnir
57eb8d0171 ConcurrentQueue::cancelPending: notify _waitVar
This allows to call cancelPending() and waitAll() concurrently with no
additional synchronization. While calling these two functions
concurrently may not be useful often, supporting it costs little in
terms of performance and code complexity. Furthermore, a completely
thread-safe class is easier to document and use correctly.

Optimize job finalization by notifying _waitVar only if jobsLeft is
reduced to 0.

Optimize cancelPending() by not locking jobsLeftMutex when no job is
canceled (if the queue is empty when this function is called).

Add assertions that verify invariants.

The output of ConcurrentQueueTest::randomCalls() reflects the fact that
a caller of waitAll() can be blocked indefinitely when another thread
cancels all queued jobs while no job is being executed. The test output
snippet below omits repetitive information: the
"QINFO  : ConcurrentQueueTest::randomCalls(queue{1}; 2 user thread(s))"
prefix of each line, current thread id and the common hh:mm:ss current
time part (leaving only the millisecond part). Note that thread #1
begins waiting at the 1st line of the snippet. "=> -8" means that total
equals -8 and thus ConcurrentQueue::jobsLeft equals 8. Thread #0 then
cancels all 8 queued jobs, then enqueues and cancels 3 jobs twice, then
momentarily waits 3 times. #1 is not waked until #0 enqueues 8 more jobs
and starts waiting for them too. Once these new 8 jobs are completed,
both #0 and #1 end waiting.

 1. [ms] 311 | #1 begin waiting for 1 thread => -8
 2. [ms] 311 | #0 enqueuing complete.
 3. [ms] 311 | #0 canceled 8 jobs => -8
 4. [ms] 311 | #0 enqueuing 3 jobs...
 5. [ms] 312 | #0 enqueuing complete.
 6. [ms] 312 | #0 canceled 3 jobs => -3
 7. [ms] 312 | #0 enqueuing 3 jobs...
 8. [ms] 312 | #0 enqueuing complete.
 9. [ms] 312 | #0 canceled 3 jobs => -3
10. [ms] 312 | #0 begin waiting for 1 thread => 0
11. [ms] 312 | #0 end waiting for 1 thread => 0
12. [ms] 312 | #0 begin waiting for 1 thread => 0
13. [ms] 312 | #0 end waiting for 1 thread => 0
14. [ms] 312 | #0 begin waiting for 1 thread => 0
15. [ms] 312 | #0 end waiting for 1 thread => 0
16. [ms] 312 | #0 canceled 0 jobs => 0
17. [ms] 312 | #0 canceled 0 jobs => 0
18. [ms] 312 | #0 enqueuing 3 jobs...
19. [ms] 312 | #0 enqueuing complete.
20. [ms] 312 | #0 enqueuing 3 jobs...
21. [ms] 312 | #0 enqueuing complete.
22. [ms] 312 | #0 enqueuing 2 jobs...
23. [ms] 312 | #0 enqueuing complete.
24. [ms] 312 | #0 begin waiting for 1 thread => -8
25. [ms] 312 | [0.1] sleep 0.003 ms...
26. [ms] 312 | [0.1] +1 => -7
27. [ms] 312 | [0.2] sleep 0.003 ms...
28. [ms] 312 | [0.2] +1 => -6
29. [ms] 312 | [0.3] sleep 0.003 ms...
30. [ms] 312 | [0.3] +1 => -5
31. [ms] 312 | [0.1] sleep 0 ms...
32. [ms] 313 | [0.1] +1 => -4
33. [ms] 313 | [0.2] sleep 0.005 ms...
34. [ms] 313 | [0.2] +1 => -3
35. [ms] 313 | [0.3] sleep 0 ms...
36. [ms] 313 | [0.3] +1 => -2
37. [ms] 313 | [0.1] sleep 0.001 ms...
38. [ms] 313 | [0.1] +1 => -1
39. [ms] 313 | [0.2] sleep 0.001 ms...
40. [ms] 313 | [0.2] +1 => 0
41. [ms] 313 | #0 end waiting for 1 thread => 0
42. [ms] 313 | #1 end waiting for 1 thread => 0
2021-12-29 09:36:44 +01:00
Igor Kushnir
e8b5f42e75 ConcurrentQueue::nextJob: notify all _waitVar threads
Replace _waitVar.notify_one() with _waitVar.notify_all(). This was the
only hurdle to documenting ConcurrentQueue::waitAll() as thread-safe.

ConcurrentQueueTest::waitAllFromMultipleThreads() passes instead of
hanging now.
2021-12-29 09:36:44 +01:00
Igor Kushnir
b43e3383d5 ConcurrentQueue::cancelPending: return the number of canceled jobs
The return value can be used to make
ConcurrentQueueTest::cancelPending1UserThread() non-flaky. It may also
be useful to non-testing code.

Improve the performance of cancelPending() by locking the two mutexes
separately and minimizing the locking time.
2021-12-29 09:36:44 +01:00
Igor Kushnir
a72fdb9ca2 ConcurrentQueue::cancelPending: don't reset jobsLeft to 0
Worker threads may well be executing jobs while this function is being
called. If ConcurrentQueue::waitAll() is called soon enough after
cancelPending(), the worker threads may still be running, but waitAll()
would return immediately as jobsLeft would be nonpositive.

Subtracting _queue.size() from jobsLeft sets this variable to the number
of worker threads that are executing jobs at the moment.

ConcurrentQueueTest::cancelPending1UserThread() passes most of the time
now. But it still fails occasionally because it depends on the timing of
thread scheduling, which is unreliable.
2021-12-29 09:36:44 +01:00
Igor Kushnir
228fe1284e Fix a typo in ConcurrentQueue::cancelPending function name 2021-12-29 09:36:44 +01:00
Luis Ángel San Martín
93596a4972 Restore needed lock 2021-02-08 09:02:19 +01:00
Luis Ángel San Martín
4c93c70de6 Use the right mutex when the queue is modified 2021-02-08 08:32:42 +01:00
Luis Ángel San Martín
fa5ce25425 Add concurrent queue based on lambdas 2021-01-14 09:03:17 +01:00