From b0b0849cbc7bf2435ff764794eee3af8c16ae975 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Wed, 3 Mar 2021 17:27:31 +0200 Subject: [PATCH 01/22] Extract DEFINES += NOMINMAX into common config.pri --- YACReader/YACReader.pro | 2 +- YACReaderLibrary/YACReaderLibrary.pro | 2 +- YACReaderLibraryServer/YACReaderLibraryServer.pro | 2 +- config.pri | 2 ++ 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/YACReader/YACReader.pro b/YACReader/YACReader.pro index 771ef583..d1996afd 100644 --- a/YACReader/YACReader.pro +++ b/YACReader/YACReader.pro @@ -6,7 +6,7 @@ QMAKE_TARGET_BUNDLE_PREFIX = "com.yacreader" DEPENDPATH += . \ release -DEFINES += NOMINMAX YACREADER +DEFINES += YACREADER #load default build flags include (../config.pri) diff --git a/YACReaderLibrary/YACReaderLibrary.pro b/YACReaderLibrary/YACReaderLibrary.pro index 3160ac8e..0ba002d6 100644 --- a/YACReaderLibrary/YACReaderLibrary.pro +++ b/YACReaderLibrary/YACReaderLibrary.pro @@ -12,7 +12,7 @@ INCLUDEPATH += . \ ./comic_vine \ ./comic_vine/model -DEFINES += SERVER_RELEASE NOMINMAX YACREADER_LIBRARY +DEFINES += SERVER_RELEASE YACREADER_LIBRARY # load default build flags include (../config.pri) diff --git a/YACReaderLibraryServer/YACReaderLibraryServer.pro b/YACReaderLibraryServer/YACReaderLibraryServer.pro index fd892f1d..67aedd06 100644 --- a/YACReaderLibraryServer/YACReaderLibraryServer.pro +++ b/YACReaderLibraryServer/YACReaderLibraryServer.pro @@ -10,7 +10,7 @@ INCLUDEPATH += ../YACReaderLibrary \ ../YACReaderLibrary/server \ ../YACReaderLibrary/db -DEFINES += SERVER_RELEASE NOMINMAX YACREADER_LIBRARY +DEFINES += SERVER_RELEASE YACREADER_LIBRARY # load default build flags # do a basic dependency check include(headless_config.pri) diff --git a/config.pri b/config.pri index 14af1b2e..660f80c0 100644 --- a/config.pri +++ b/config.pri @@ -5,6 +5,8 @@ CONFIG += c++17 win32:QMAKE_CXXFLAGS += /std:c++17 #enable c++17 explicitly in msvc +DEFINES += NOMINMAX + if(unix|mingw):QMAKE_CXXFLAGS_RELEASE += -DNDEBUG win32:msvc:QMAKE_CXXFLAGS_RELEASE += /DNDEBUG From ec938651c4f39cd3397c30bf42e9139471eb1836 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Wed, 3 Mar 2021 18:03:52 +0200 Subject: [PATCH 02/22] tests: add the first Qt Test - ConcurrentQueueTest Place common Qt Test qmake code into tests/qt_test.pri. Build tests as part of top-level YACReader project unless no_tests CONFIG option is set. This way the tests are built by default during development. Packagers can skip building tests by running `qmake "CONFIG+=no_tests"`. Both ConcurrentQueueTest::singleUserThread() and ConcurrentQueueTest::multipleUserThreads() pass. Evidently ConcurrentQueue::enqueue() can be safely called from multiple threads on the same ConcurrentQueue object with no additional synchronization. Once each thread enqueues all its jobs, one thread can safely call waitAll(). --- YACReader.pro | 1 + .../concurrent_queue_test.cpp | 268 ++++++++++++++++++ .../concurrent_queue_test.pro | 7 + tests/qt_test.pri | 9 + tests/tests.pro | 2 + 5 files changed, 287 insertions(+) create mode 100644 tests/concurrent_queue_test/concurrent_queue_test.cpp create mode 100644 tests/concurrent_queue_test/concurrent_queue_test.pro create mode 100644 tests/qt_test.pri create mode 100644 tests/tests.pro diff --git a/YACReader.pro b/YACReader.pro index f788b405..f3106c4f 100644 --- a/YACReader.pro +++ b/YACReader.pro @@ -1,3 +1,4 @@ TEMPLATE = subdirs SUBDIRS = YACReader YACReaderLibrary YACReaderLibraryServer YACReaderLibrary.depends = YACReader +!CONFIG(no_tests): SUBDIRS += tests diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp new file mode 100644 index 00000000..387b118b --- /dev/null +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -0,0 +1,268 @@ +#include "concurrent_queue.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace chrono = std::chrono; +using Clock = chrono::steady_clock; +using YACReader::ConcurrentQueue; + +namespace { +double toMilliseconds(Clock::duration duration) +{ + return chrono::duration_cast(duration).count() / 1000.0; +} + +QString currentThreadInfo() +{ + std::ostringstream os; + os << std::this_thread::get_id(); + return QString::fromStdString(os.str()); +} + +QDebug log() +{ + return qInfo().noquote() << currentThreadInfo() << '|' + << QTime::currentTime().toString(Qt::ISODateWithMs) << '|'; +} + +using Total = std::atomic; + +struct JobData { + int summand; + Clock::duration sleepingTime; +}; +using JobDataSet = QVector; + +int expectedTotal(const JobDataSet &jobs) +{ + return std::accumulate(jobs.cbegin(), jobs.cend(), 0, + [](int total, JobData job) { + return total + job.summand; + }); +} + +int expectedTotal(const QVector &jobs) +{ + return std::accumulate(jobs.cbegin(), jobs.cend(), 0, + [](int total, const JobDataSet &dataSet) { + return total + expectedTotal(dataSet); + }); +} + +class Id +{ +public: + explicit Id(int threadId, int jobId) + : threadId { threadId }, jobId { jobId } { } + + QString toString() const { return QStringLiteral("[%1.%2]").arg(threadId).arg(jobId); } + +private: + const int threadId; + const int jobId; +}; + +QDebug operator<<(QDebug debug, Id id) +{ + QDebugStateSaver saver(debug); + debug.noquote() << id.toString(); + return debug; +} + +class Job +{ +public: + explicit Job(Total &total, JobData data, Id id) + : total { total }, data { data }, id { id } { } + + void operator()() + { + log().nospace() << id << " sleep " << toMilliseconds(data.sleepingTime) << " ms..."; + std::this_thread::sleep_for(data.sleepingTime); + + const auto updatedTotal = (total += data.summand); + log().nospace() << id << " +" << data.summand << " => " << updatedTotal; + } + +private: + Total &total; + const JobData data; + const Id id; +}; + +class Enqueuer +{ +public: + explicit Enqueuer(ConcurrentQueue &queue, Total &total, const JobDataSet &jobs, int threadId) + : queue { queue }, total { total }, jobs { jobs }, threadId { threadId } { } + + void operator()() + { + const char *const jobStr = jobs.size() == 1 ? "job" : "jobs"; + log() << QStringLiteral("#%1 enqueuing %2 %3...").arg(threadId).arg(jobs.size()).arg(jobStr); + for (int i = 0; i < jobs.size(); ++i) + queue.enqueue(Job(total, jobs.at(i), Id(threadId, i + 1))); + log() << QStringLiteral("#%1 enqueuing complete.").arg(threadId); + } + +private: + ConcurrentQueue &queue; + Total &total; + const JobDataSet jobs; + const int threadId; +}; + +} + +Q_DECLARE_METATYPE(JobData) + +class ConcurrentQueueTest : public QObject +{ + Q_OBJECT +private slots: + void init(); + + void singleUserThread_data(); + void singleUserThread(); + + void multipleUserThreads_data(); + void multipleUserThreads(); + +private: + static constexpr int primaryThreadId { 0 }; + + QString messageFormatString(int threadCount) const + { + auto format = QStringLiteral("#%1 %5 %2 %3 => %4").arg(primaryThreadId); + const char *const threadStr = threadCount == 1 ? "thread" : "threads"; + return format.arg(threadCount).arg(threadStr).arg(total.load()); + } + + void printStartedMessage(int threadCount) const + { + log() << messageFormatString(threadCount).arg("started"); + } + void printWaitedMessage(int threadCount) const + { + log() << messageFormatString(threadCount).arg("waited for"); + } + + Total total { 0 }; +}; + +void ConcurrentQueueTest::init() +{ + total = 0; +} + +void ConcurrentQueueTest::singleUserThread_data() +{ + QTest::addColumn("threadCount"); + QTest::addColumn("jobs"); + + using ms = chrono::milliseconds; + + QTest::newRow("-") << 0 << JobDataSet {}; + QTest::newRow("0") << 7 << JobDataSet {}; + QTest::newRow("A") << 1 << JobDataSet { { 5, ms(0) } }; + QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } }; + QTest::newRow("C") << 1 << JobDataSet { { 1, ms(0) }, { 5, ms(2) }, { 3, ms(1) } }; + QTest::newRow("D") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } }; + QTest::newRow("E") << 2 << JobDataSet { { 1, ms(2) }, { 2, ms(1) } }; + QTest::newRow("F") << 3 << JobDataSet { { 8, ms(3) }, { 5, ms(4) }, { 2, ms(1) }, { 11, ms(1) }, { 100, ms(3) } }; +} + +void ConcurrentQueueTest::singleUserThread() +{ + QFETCH(const int, threadCount); + QFETCH(const JobDataSet, jobs); + + ConcurrentQueue queue(threadCount); + printStartedMessage(threadCount); + + Enqueuer(queue, total, jobs, primaryThreadId)(); + + queue.waitAll(); + printWaitedMessage(threadCount); + + QCOMPARE(total.load(), expectedTotal(jobs)); +} + +void ConcurrentQueueTest::multipleUserThreads_data() +{ + QTest::addColumn("threadCount"); + QTest::addColumn>("jobs"); + + using ms = chrono::milliseconds; + + JobDataSet jobs1 { { 1, ms(1) } }; + JobDataSet jobs2 { { 2, ms(4) } }; + QVector allJobs { jobs1, jobs2 }; + QTest::newRow("A1") << 1 << allJobs; + QTest::newRow("A2") << 2 << allJobs; + + jobs1.push_back({ 5, ms(3) }); + jobs2.push_back({ 10, ms(1) }); + allJobs = { jobs1, jobs2 }; + QTest::newRow("B1") << 2 << allJobs; + QTest::newRow("B2") << 3 << allJobs; + QTest::newRow("B3") << 8 << allJobs; + + jobs1.push_back({ 20, ms(0) }); + jobs2.push_back({ 40, ms(2) }); + allJobs = { jobs1, jobs2 }; + QTest::newRow("C") << 4 << allJobs; + + JobDataSet jobs3 { { 80, ms(0) }, { 160, ms(2) }, { 320, ms(1) }, { 640, ms(0) }, { 2000, ms(3) } }; + allJobs.push_back(jobs3); + QTest::newRow("D1") << 3 << allJobs; + QTest::newRow("D2") << 5 << allJobs; + + JobDataSet jobs4 { { 4000, ms(1) }, { 8000, ms(3) } }; + allJobs.push_back(jobs4); + QTest::newRow("E1") << 4 << allJobs; + QTest::newRow("E2") << 6 << allJobs; +} + +void ConcurrentQueueTest::multipleUserThreads() +{ + QFETCH(const int, threadCount); + QFETCH(const QVector, jobs); + + ConcurrentQueue queue(threadCount); + printStartedMessage(threadCount); + + if (!jobs.empty()) { + std::vector enqueuerThreads; + enqueuerThreads.reserve(jobs.size() - 1); + for (int i = 1; i < jobs.size(); ++i) + enqueuerThreads.emplace_back(Enqueuer(queue, total, jobs.at(i), i)); + + Enqueuer(queue, total, jobs.constFirst(), primaryThreadId)(); + for (auto &t : enqueuerThreads) + t.join(); + } + + queue.waitAll(); + printWaitedMessage(threadCount); + + QCOMPARE(total.load(), expectedTotal(jobs)); +} + +QTEST_APPLESS_MAIN(ConcurrentQueueTest) + +#include "concurrent_queue_test.moc" diff --git a/tests/concurrent_queue_test/concurrent_queue_test.pro b/tests/concurrent_queue_test/concurrent_queue_test.pro new file mode 100644 index 00000000..d6cb7f5e --- /dev/null +++ b/tests/concurrent_queue_test/concurrent_queue_test.pro @@ -0,0 +1,7 @@ +include(../qt_test.pri) + +PATH_TO_common = ../../common + +INCLUDEPATH += $$PATH_TO_common +HEADERS += $${PATH_TO_common}/concurrent_queue.h +SOURCES += concurrent_queue_test.cpp diff --git a/tests/qt_test.pri b/tests/qt_test.pri new file mode 100644 index 00000000..1005fa03 --- /dev/null +++ b/tests/qt_test.pri @@ -0,0 +1,9 @@ +QT += testlib +QT -= gui + +CONFIG += qt console warn_on testcase no_testcase_installs +CONFIG -= app_bundle + +TEMPLATE = app + +include(../config.pri) diff --git a/tests/tests.pro b/tests/tests.pro new file mode 100644 index 00000000..15317f23 --- /dev/null +++ b/tests/tests.pro @@ -0,0 +1,2 @@ +TEMPLATE = subdirs +SUBDIRS += concurrent_queue_test From 228fe1284eb8507052f3e34a9bd8b0b2e5841df4 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 4 Mar 2021 17:26:37 +0200 Subject: [PATCH 03/22] Fix a typo in ConcurrentQueue::cancelPending function name --- YACReaderLibrary/db/comic_query_result_processor.cpp | 2 +- YACReaderLibrary/db/folder_query_result_processor.cpp | 2 +- common/concurrent_queue.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/YACReaderLibrary/db/comic_query_result_processor.cpp b/YACReaderLibrary/db/comic_query_result_processor.cpp index 873ab3b5..b05acb5c 100644 --- a/YACReaderLibrary/db/comic_query_result_processor.cpp +++ b/YACReaderLibrary/db/comic_query_result_processor.cpp @@ -16,7 +16,7 @@ YACReader::ComicQueryResultProcessor::ComicQueryResultProcessor() void YACReader::ComicQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, const QString &databasePath) { - querySearchQueue.cancellPending(); + querySearchQueue.cancelPending(); querySearchQueue.enqueue([=] { QString connectionName = ""; diff --git a/YACReaderLibrary/db/folder_query_result_processor.cpp b/YACReaderLibrary/db/folder_query_result_processor.cpp index eafeacf6..2ecc83b4 100644 --- a/YACReaderLibrary/db/folder_query_result_processor.cpp +++ b/YACReaderLibrary/db/folder_query_result_processor.cpp @@ -22,7 +22,7 @@ YACReader::FolderQueryResultProcessor::FolderQueryResultProcessor(FolderModel *m void YACReader::FolderQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, bool includeComics) { - querySearchQueue.cancellPending(); + querySearchQueue.cancelPending(); querySearchQueue.enqueue([=] { QString connectionName = ""; diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 99505df7..c983b277 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -43,7 +43,7 @@ public: jobAvailableVar.notify_one(); } - void cancellPending() + void cancelPending() { std::unique_lock lockQueue(queueMutex); std::unique_lock lockJobsLeft(jobsLeftMutex); From 4bbd16c3b36d309f2d1f581f491bfa110c19b03d Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Fri, 12 Mar 2021 12:52:59 +0200 Subject: [PATCH 04/22] tests: add ConcurrentQueueTest::cancelPending1UserThread() This new test consistently fails because of a bug in ConcurrentQueue::cancelPending() described in the following comment: https://github.com/YACReader/yacreader/issues/201#issuecomment-774987383 --- .../concurrent_queue_test.cpp | 134 +++++++++++++++--- 1 file changed, 115 insertions(+), 19 deletions(-) diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index 387b118b..6dd2bd69 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -126,8 +126,47 @@ private: const int threadId; }; +class QueueControlMessagePrinter +{ +public: + explicit QueueControlMessagePrinter(const Total &total, int threadId, int threadCount) + : total { total }, threadId { threadId }, threadCount { threadCount } { } + + void printStartedMessage() const + { + log() << messageFormatString().arg("started"); + } + void printCanceledMessage() const + { + log() << messageFormatString().arg("canceled"); + } + void printWaitedMessage() const + { + log() << messageFormatString().arg("waited for"); + } + +private: + QString messageFormatString() const + { + auto format = QStringLiteral("#%1 %5 %2 %3 => %4").arg(threadId); + const char *const threadStr = threadCount == 1 ? "thread" : "threads"; + return format.arg(threadCount).arg(threadStr).arg(total.load()); + } + + const Total &total; + const int threadId; + const int threadCount; +}; + +void waitAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) +{ + queue.waitAll(); + printer.printWaitedMessage(); } +} + +Q_DECLARE_METATYPE(Clock::duration) Q_DECLARE_METATYPE(JobData) class ConcurrentQueueTest : public QObject @@ -142,23 +181,15 @@ private slots: void multipleUserThreads_data(); void multipleUserThreads(); + void cancelPending1UserThread_data(); + void cancelPending1UserThread(); + private: static constexpr int primaryThreadId { 0 }; - QString messageFormatString(int threadCount) const + QueueControlMessagePrinter makeMessagePrinter(int threadCount) const { - auto format = QStringLiteral("#%1 %5 %2 %3 => %4").arg(primaryThreadId); - const char *const threadStr = threadCount == 1 ? "thread" : "threads"; - return format.arg(threadCount).arg(threadStr).arg(total.load()); - } - - void printStartedMessage(int threadCount) const - { - log() << messageFormatString(threadCount).arg("started"); - } - void printWaitedMessage(int threadCount) const - { - log() << messageFormatString(threadCount).arg("waited for"); + return QueueControlMessagePrinter(total, primaryThreadId, threadCount); } Total total { 0 }; @@ -191,13 +222,14 @@ void ConcurrentQueueTest::singleUserThread() QFETCH(const int, threadCount); QFETCH(const JobDataSet, jobs); + const auto printer = makeMessagePrinter(threadCount); + ConcurrentQueue queue(threadCount); - printStartedMessage(threadCount); + printer.printStartedMessage(); Enqueuer(queue, total, jobs, primaryThreadId)(); - queue.waitAll(); - printWaitedMessage(threadCount); + waitAndPrint(queue, printer); QCOMPARE(total.load(), expectedTotal(jobs)); } @@ -243,8 +275,10 @@ void ConcurrentQueueTest::multipleUserThreads() QFETCH(const int, threadCount); QFETCH(const QVector, jobs); + const auto printer = makeMessagePrinter(threadCount); + ConcurrentQueue queue(threadCount); - printStartedMessage(threadCount); + printer.printStartedMessage(); if (!jobs.empty()) { std::vector enqueuerThreads; @@ -257,12 +291,74 @@ void ConcurrentQueueTest::multipleUserThreads() t.join(); } - queue.waitAll(); - printWaitedMessage(threadCount); + waitAndPrint(queue, printer); QCOMPARE(total.load(), expectedTotal(jobs)); } +void ConcurrentQueueTest::cancelPending1UserThread_data() +{ + QTest::addColumn("threadCount"); + QTest::addColumn("jobs"); + QTest::addColumn("cancelDelay"); + QTest::addColumn("expectedTotal"); + + const auto ms = [](int count) -> Clock::duration { return chrono::milliseconds(count); }; + const auto us = [](int count) -> Clock::duration { return chrono::microseconds(count); }; + + QTest::newRow("-") << 0 << JobDataSet {} << ms(0) << 0; + QTest::newRow("01") << 2 << JobDataSet {} << ms(0) << 0; + QTest::newRow("02") << 3 << JobDataSet {} << ms(1) << 0; + QTest::newRow("A") << 1 << JobDataSet { { 5, ms(3) } } << ms(1) << 5; + QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } } << ms(1) << 12; + + JobDataSet dataSet { { 1, ms(3) }, { 5, ms(2) }, { 3, ms(1) } }; + QTest::newRow("C1") << 1 << dataSet << ms(1) << 1; + QTest::newRow("C2") << 1 << dataSet << ms(4) << 6; + QTest::newRow("C3") << 2 << dataSet << ms(1) << 6; + QTest::newRow("C4") << 3 << dataSet << ms(1) << 9; + QTest::newRow("C5") << 1 << dataSet << ms(7) << 9; + + dataSet.push_back({ 10, ms(5) }); + dataSet.push_back({ 20, ms(8) }); + dataSet.push_back({ 40, ms(20) }); + dataSet.push_back({ 80, ms(2) }); + QTest::newRow("D1") << 1 << dataSet << ms(1) << 1; + QTest::newRow("D2") << 1 << dataSet << ms(15) << 39; + QTest::newRow("D3") << 1 << dataSet << ms(50) << 159; + QTest::newRow("D4") << 2 << dataSet << ms(4) << 39; + QTest::newRow("D5") << 3 << dataSet << ms(4) << 79; + QTest::newRow("D6") << 4 << dataSet << ms(4) << 159; + QTest::newRow("D7") << 2 << dataSet << us(300) << 6; + QTest::newRow("D8") << 3 << dataSet << us(500) << 9; + QTest::newRow("D9") << 4 << dataSet << us(700) << 19; + + QTest::newRow("E") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } } << ms(1) << 33; +} + +void ConcurrentQueueTest::cancelPending1UserThread() +{ + QFETCH(const int, threadCount); + QFETCH(const JobDataSet, jobs); + QFETCH(const Clock::duration, cancelDelay); + QFETCH(const int, expectedTotal); + + const auto printer = makeMessagePrinter(threadCount); + + ConcurrentQueue queue(threadCount); + printer.printStartedMessage(); + + Enqueuer(queue, total, jobs, primaryThreadId)(); + + std::this_thread::sleep_for(cancelDelay); + queue.cancelPending(); + printer.printCanceledMessage(); + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), expectedTotal); +} + QTEST_APPLESS_MAIN(ConcurrentQueueTest) #include "concurrent_queue_test.moc" From a72fdb9ca2a71c57ab3f0b3d7aec0608307eea04 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Fri, 12 Mar 2021 12:57:25 +0200 Subject: [PATCH 05/22] ConcurrentQueue::cancelPending: don't reset jobsLeft to 0 Worker threads may well be executing jobs while this function is being called. If ConcurrentQueue::waitAll() is called soon enough after cancelPending(), the worker threads may still be running, but waitAll() would return immediately as jobsLeft would be nonpositive. Subtracting _queue.size() from jobsLeft sets this variable to the number of worker threads that are executing jobs at the moment. ConcurrentQueueTest::cancelPending1UserThread() passes most of the time now. But it still fails occasionally because it depends on the timing of thread scheduling, which is unreliable. --- common/concurrent_queue.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index c983b277..300f783d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -47,8 +47,8 @@ public: { std::unique_lock lockQueue(queueMutex); std::unique_lock lockJobsLeft(jobsLeftMutex); - _queue = std::queue>(); - jobsLeft = 0; + jobsLeft -= _queue.size(); + _queue = {}; } void waitAll() From b43e3383d5cd94cb936ddd2d806bbd950d9c4a9f Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 11 Mar 2021 16:19:35 +0200 Subject: [PATCH 06/22] ConcurrentQueue::cancelPending: return the number of canceled jobs The return value can be used to make ConcurrentQueueTest::cancelPending1UserThread() non-flaky. It may also be useful to non-testing code. Improve the performance of cancelPending() by locking the two mutexes separately and minimizing the locking time. --- common/concurrent_queue.h | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 300f783d..d7031dbf 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -43,12 +43,23 @@ public: jobAvailableVar.notify_one(); } - void cancelPending() + //! @brief Cancels all jobs that have not been picked up by worker threads yet. + //! @return The number of jobs that were canceled. + std::size_t cancelPending() { - std::unique_lock lockQueue(queueMutex); - std::unique_lock lockJobsLeft(jobsLeftMutex); - jobsLeft -= _queue.size(); - _queue = {}; + decltype(_queue) oldQueue; + { + const std::lock_guard lock(queueMutex); + // The mutex locking time is lower with swap() compared to assigning a + // temporary (which destroys _queue's elements and deallocates memory). + _queue.swap(oldQueue); + } + const auto size = oldQueue.size(); + { + const std::lock_guard lock(jobsLeftMutex); + jobsLeft -= size; + } + return size; } void waitAll() From 34b0698d0252db9d503a23220e5cc18f8b9d8719 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 11 Mar 2021 16:56:04 +0200 Subject: [PATCH 07/22] Make ConcurrentQueueTest::cancelPending1UserThread() non-flaky ConcurrentQueueTest::cancelPending1UserThread() often fails when ConcurrentQueueTest is launched from Qt Creator immediately after switching between Debug and Release YACReader build configurations. The CPU busyness must be affecting the thread scheduling timing, which breaks the test's timing assumptions in this case. Use the return value of ConcurrentQueue::cancelPending() instead of relying on the timing of thread scheduling to determine the number of canceled jobs. --- .../concurrent_queue_test.cpp | 91 ++++++++++++------- 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index 6dd2bd69..938956e0 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -47,14 +47,27 @@ struct JobData { }; using JobDataSet = QVector; -int expectedTotal(const JobDataSet &jobs) +int expectedTotal(JobDataSet::const_iterator first, JobDataSet::const_iterator last) { - return std::accumulate(jobs.cbegin(), jobs.cend(), 0, + return std::accumulate(first, last, 0, [](int total, JobData job) { return total + job.summand; }); } +int expectedTotal(const JobDataSet &jobs) +{ + return expectedTotal(jobs.cbegin(), jobs.cend()); +} + +int expectedTotal(const JobDataSet &jobs, std::size_t canceledCount) +{ + const auto count = jobs.size() - static_cast(canceledCount); + if (count < 0) + qFatal("Canceled more than the total number of jobs somehow!"); + return expectedTotal(jobs.cbegin(), jobs.cbegin() + count); +} + int expectedTotal(const QVector &jobs) { return std::accumulate(jobs.cbegin(), jobs.cend(), 0, @@ -134,23 +147,30 @@ public: void printStartedMessage() const { - log() << messageFormatString().arg("started"); + log() << threadMessageFormatString().arg("started"); } - void printCanceledMessage() const + void printCanceledMessage(std::size_t canceledCount) const { - log() << messageFormatString().arg("canceled"); + const char *const jobStr = canceledCount == 1 ? "job" : "jobs"; + const auto format = messageFormatString().arg("%1 %2 %3"); + log() << format.arg("canceled").arg(canceledCount).arg(jobStr); } void printWaitedMessage() const { - log() << messageFormatString().arg("waited for"); + log() << threadMessageFormatString().arg("waited for"); } private: QString messageFormatString() const { - auto format = QStringLiteral("#%1 %5 %2 %3 => %4").arg(threadId); + return QStringLiteral("#%1 %3 => %2").arg(threadId).arg(total.load()); + } + + QString threadMessageFormatString() const + { const char *const threadStr = threadCount == 1 ? "thread" : "threads"; - return format.arg(threadCount).arg(threadStr).arg(total.load()); + const auto format = messageFormatString().arg("%3 %1 %2"); + return format.arg(threadCount).arg(threadStr); } const Total &total; @@ -158,6 +178,13 @@ private: const int threadCount; }; +std::size_t cancelAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) +{ + const auto canceledCount = queue.cancelPending(); + printer.printCanceledMessage(canceledCount); + return canceledCount; +} + void waitAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) { queue.waitAll(); @@ -301,39 +328,38 @@ void ConcurrentQueueTest::cancelPending1UserThread_data() QTest::addColumn("threadCount"); QTest::addColumn("jobs"); QTest::addColumn("cancelDelay"); - QTest::addColumn("expectedTotal"); const auto ms = [](int count) -> Clock::duration { return chrono::milliseconds(count); }; const auto us = [](int count) -> Clock::duration { return chrono::microseconds(count); }; - QTest::newRow("-") << 0 << JobDataSet {} << ms(0) << 0; - QTest::newRow("01") << 2 << JobDataSet {} << ms(0) << 0; - QTest::newRow("02") << 3 << JobDataSet {} << ms(1) << 0; - QTest::newRow("A") << 1 << JobDataSet { { 5, ms(3) } } << ms(1) << 5; - QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } } << ms(1) << 12; + QTest::newRow("-") << 0 << JobDataSet {} << ms(0); + QTest::newRow("01") << 2 << JobDataSet {} << ms(0); + QTest::newRow("02") << 3 << JobDataSet {} << ms(1); + QTest::newRow("A") << 1 << JobDataSet { { 5, ms(3) } } << ms(1); + QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } } << ms(1); JobDataSet dataSet { { 1, ms(3) }, { 5, ms(2) }, { 3, ms(1) } }; - QTest::newRow("C1") << 1 << dataSet << ms(1) << 1; - QTest::newRow("C2") << 1 << dataSet << ms(4) << 6; - QTest::newRow("C3") << 2 << dataSet << ms(1) << 6; - QTest::newRow("C4") << 3 << dataSet << ms(1) << 9; - QTest::newRow("C5") << 1 << dataSet << ms(7) << 9; + QTest::newRow("C1") << 1 << dataSet << ms(1); + QTest::newRow("C2") << 1 << dataSet << ms(4); + QTest::newRow("C3") << 2 << dataSet << ms(1); + QTest::newRow("C4") << 3 << dataSet << ms(1); + QTest::newRow("C5") << 1 << dataSet << ms(7); dataSet.push_back({ 10, ms(5) }); dataSet.push_back({ 20, ms(8) }); dataSet.push_back({ 40, ms(20) }); dataSet.push_back({ 80, ms(2) }); - QTest::newRow("D1") << 1 << dataSet << ms(1) << 1; - QTest::newRow("D2") << 1 << dataSet << ms(15) << 39; - QTest::newRow("D3") << 1 << dataSet << ms(50) << 159; - QTest::newRow("D4") << 2 << dataSet << ms(4) << 39; - QTest::newRow("D5") << 3 << dataSet << ms(4) << 79; - QTest::newRow("D6") << 4 << dataSet << ms(4) << 159; - QTest::newRow("D7") << 2 << dataSet << us(300) << 6; - QTest::newRow("D8") << 3 << dataSet << us(500) << 9; - QTest::newRow("D9") << 4 << dataSet << us(700) << 19; + QTest::newRow("D1") << 1 << dataSet << ms(1); + QTest::newRow("D2") << 1 << dataSet << ms(15); + QTest::newRow("D3") << 1 << dataSet << ms(50); + QTest::newRow("D4") << 2 << dataSet << ms(4); + QTest::newRow("D5") << 3 << dataSet << ms(4); + QTest::newRow("D6") << 4 << dataSet << ms(4); + QTest::newRow("D7") << 2 << dataSet << us(300); + QTest::newRow("D8") << 3 << dataSet << us(500); + QTest::newRow("D9") << 4 << dataSet << us(700); - QTest::newRow("E") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } } << ms(1) << 33; + QTest::newRow("E") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } } << ms(1); } void ConcurrentQueueTest::cancelPending1UserThread() @@ -341,7 +367,6 @@ void ConcurrentQueueTest::cancelPending1UserThread() QFETCH(const int, threadCount); QFETCH(const JobDataSet, jobs); QFETCH(const Clock::duration, cancelDelay); - QFETCH(const int, expectedTotal); const auto printer = makeMessagePrinter(threadCount); @@ -351,12 +376,12 @@ void ConcurrentQueueTest::cancelPending1UserThread() Enqueuer(queue, total, jobs, primaryThreadId)(); std::this_thread::sleep_for(cancelDelay); - queue.cancelPending(); - printer.printCanceledMessage(); + const auto canceledCount = cancelAndPrint(queue, printer); + QVERIFY(canceledCount <= static_cast(jobs.size())); waitAndPrint(queue, printer); - QCOMPARE(total.load(), expectedTotal); + QCOMPARE(total.load(), expectedTotal(jobs, canceledCount)); } QTEST_APPLESS_MAIN(ConcurrentQueueTest) From 2cbcaaa39145a9a6d1fefe817359988465191b92 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Fri, 12 Mar 2021 17:30:46 +0200 Subject: [PATCH 08/22] tests: add ConcurrentQueueTest::waitAllFromMultipleThreads() This new test hangs because ConcurrentQueue::nextJob() unblocks only one of the threads that wait for _waitVar. --- .../concurrent_queue_test.cpp | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index 938956e0..f3fa5cfc 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -155,9 +155,13 @@ public: const auto format = messageFormatString().arg("%1 %2 %3"); log() << format.arg("canceled").arg(canceledCount).arg(jobStr); } - void printWaitedMessage() const + void printBeginWaitingMessage() const { - log() << threadMessageFormatString().arg("waited for"); + log() << threadMessageFormatString().arg("begin waiting for"); + } + void printEndWaitingMessage() const + { + log() << threadMessageFormatString().arg("end waiting for"); } private: @@ -187,8 +191,9 @@ std::size_t cancelAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrin void waitAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) { + printer.printBeginWaitingMessage(); queue.waitAll(); - printer.printWaitedMessage(); + printer.printEndWaitingMessage(); } } @@ -211,6 +216,9 @@ private slots: void cancelPending1UserThread_data(); void cancelPending1UserThread(); + void waitAllFromMultipleThreads_data(); + void waitAllFromMultipleThreads(); + private: static constexpr int primaryThreadId { 0 }; @@ -384,6 +392,44 @@ void ConcurrentQueueTest::cancelPending1UserThread() QCOMPARE(total.load(), expectedTotal(jobs, canceledCount)); } +void ConcurrentQueueTest::waitAllFromMultipleThreads_data() +{ + QTest::addColumn("waitingThreadCount"); + for (int i : { 1, 2, 4, 7, 19 }) + QTest::addRow("%d", i) << i; +} + +void ConcurrentQueueTest::waitAllFromMultipleThreads() +{ + QFETCH(const int, waitingThreadCount); + QVERIFY(waitingThreadCount > 0); + + constexpr auto queueThreadCount = 2; + const auto printer = makeMessagePrinter(queueThreadCount); + + ConcurrentQueue queue(queueThreadCount); + printer.printStartedMessage(); + + using ms = chrono::milliseconds; + const JobDataSet jobs { { 5, ms(1) }, { 7, ms(2) } }; + Enqueuer(queue, total, jobs, primaryThreadId)(); + + std::vector waitingThreads; + waitingThreads.reserve(waitingThreadCount - 1); + for (int id = 1; id < waitingThreadCount; ++id) { + waitingThreads.emplace_back([=, &queue] { + waitAndPrint(queue, QueueControlMessagePrinter(total, id, queueThreadCount)); + }); + } + + waitAndPrint(queue, printer); + + for (auto &t : waitingThreads) + t.join(); + + QCOMPARE(total.load(), expectedTotal(jobs)); +} + QTEST_APPLESS_MAIN(ConcurrentQueueTest) #include "concurrent_queue_test.moc" From e8b5f42e7520c74077db2cc11904a19b7ab14a73 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Fri, 12 Mar 2021 17:33:54 +0200 Subject: [PATCH 09/22] ConcurrentQueue::nextJob: notify all _waitVar threads Replace _waitVar.notify_one() with _waitVar.notify_all(). This was the only hurdle to documenting ConcurrentQueue::waitAll() as thread-safe. ConcurrentQueueTest::waitAllFromMultipleThreads() passes instead of hanging now. --- common/concurrent_queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index d7031dbf..0d8384fa 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -113,7 +113,7 @@ private: --jobsLeft; } - _waitVar.notify_one(); + _waitVar.notify_all(); } } From b514ba1270bac37f0289f8b21ad043a9e65885c1 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Fri, 12 Mar 2021 18:18:55 +0200 Subject: [PATCH 10/22] tests: add ConcurrentQueueTest::randomCalls() The new test is a randomized stress test. It consistently passes right now. The test's detailed output can be analyzed to reveal anomalies and bugs. The test can catch various bugs that future changes to ConcurrentQueue's code may introduce. --- .../concurrent_queue_test.cpp | 218 ++++++++++++++++++ 1 file changed, 218 insertions(+) diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index f3fa5cfc..f79b9754 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -9,9 +9,13 @@ #include #include +#include #include #include +#include +#include #include +#include #include #include #include @@ -33,6 +37,15 @@ QString currentThreadInfo() return QString::fromStdString(os.str()); } +//! This test prints thousands of lines of detailed output. The output allows to analyze +//! how ConcurrentQueue is being tested, how it works and why the test fails or crashes +//! (normally it passes). The default maximum number of warnings in Qt Test is 2000, +//! which is too low for this test. Therefore, the following warning is printed before +//! the log output is suppressed: "Maximum amount of warnings exceeded. Use -maxwarnings +//! to override.". Passing `-maxwarnings 100000` command line option to the test lets it +//! print everything. Passing -silent command line option to the test suppresses all its +//! output except for RandomEngineProvider's root seeds, which are necessary to reproduce +//! interesting test results. QDebug log() { return qInfo().noquote() << currentThreadInfo() << '|' @@ -196,6 +209,167 @@ void waitAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &prin printer.printEndWaitingMessage(); } +template +QDebug operator<<(QDebug debug, const std::array &array) +{ + QDebugStateSaver saver(debug); + debug.nospace(); + + debug << '('; + if (size != 0) { + debug << array.front(); + for (std::size_t i = 1; i != size; ++i) + debug << ", " << array[i]; + } + debug << ')'; + + return debug; +} + +using RandomEngine = std::mt19937_64; + +class RandomEngineProvider +{ +public: + RandomEngineProvider() + { + std::random_device rd; + const auto randomValues = generate(rd); + // Qt Test does not suppress output from the constructor of a test class + // even when -silent command line option is passed. This is fortunate + // because the root seeds can be used to reproduce a test failure. + log() << "RandomEngineProvider's root seeds:" << randomValues; + std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); + rootEngine.reset(new std::mt19937(seedSeq)); + } + + void resetEngines(std::size_t engineCount) + { + engines.clear(); + engines.reserve(engineCount); + for (; engineCount != 0; --engineCount) { + const auto randomValues = generate(*rootEngine); + std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); + engines.emplace_back(seedSeq); + } + } + + RandomEngine &engine(std::size_t index) + { + return engines.at(index); + } + +private: + // In this test we don't really care about uniformly choosing an initial state + // from the entire state-space of the engine. It is possible to generate more + // random numbers at the cost of performance and system entropy pool exhaustion. + static constexpr std::size_t rootSeedCount { 8 }; + static constexpr std::size_t seedCount { 32 }; + + template + static std::array generate(Generator &generator) + { + std::array result; + for (auto &value : result) + value = generator(); + return result; + } + + std::unique_ptr rootEngine; + std::vector engines; +}; + +//! Calls random member functions of ConcurrentQueue for a limited time. +//! Ensures that total equals 0 when all jobs are complete/canceled by: +//! * setting each job's summand to 1; +//! * subtracting a job set's size from total before enqueuing jobs in the set; +//! * adding canceled job count to total after cancelation. +class RandomCaller +{ +public: + explicit RandomCaller(ConcurrentQueue &queue, Total &total, int threadId, + int queueThreadCount, RandomEngine &engine) + : queue(queue), total(total), threadId { threadId }, printer(total, threadId, queueThreadCount), engine(engine) + { + } + + void operator()() + { + constexpr auto testDuration = chrono::milliseconds(10); + const auto testStartTime = Clock::now(); + + auto operation = operationDistribution(); + do { + switch (operation(engine)) { + case 0: + enqueue(); + break; + case 1: + cancel(); + break; + case 2: + waitAndPrint(queue, printer); + break; + default: + qFatal("Unsupported operation."); + } + } while (Clock::now() - testStartTime < testDuration); + } + +private: + int randomInt(int a, int b) + { + return uniformInt(engine, decltype(uniformInt)::param_type(a, b)); + } + + std::discrete_distribution operationDistribution() + { + constexpr int sumOfProbabilities { 100 }; + const auto enqueueProbability = randomInt(0, sumOfProbabilities); + const auto cancelProbability = randomInt(0, sumOfProbabilities - enqueueProbability); + const auto waitProbability = sumOfProbabilities - enqueueProbability - cancelProbability; + + log() << QStringLiteral("#%1 operation weights: %2%, %3%, %4%.").arg(threadId).arg(enqueueProbability).arg(cancelProbability).arg(waitProbability); + if (enqueueProbability + cancelProbability + waitProbability != sumOfProbabilities) + qFatal("The sum of probabilities is not 100%%."); + + const auto real = [](int x) { return static_cast(x); }; + return { real(enqueueProbability), real(cancelProbability), real(waitProbability) }; + } + + JobDataSet createJobs() + { + constexpr int minJobCount { 1 }, maxJobCount { 5 }; + JobDataSet jobs(randomInt(minJobCount, maxJobCount)); + for (auto &job : jobs) { + constexpr int minSleepingTime { 0 }, maxSleepingTime { 5 }; + const auto sleepingTime = randomInt(minSleepingTime, maxSleepingTime); + job = { 1, sleepingTime * chrono::microseconds(1) }; + } + return jobs; + } + + void enqueue() + { + const auto jobs = createJobs(); + total -= jobs.size(); + Enqueuer(queue, total, jobs, threadId)(); + } + + void cancel() + { + const auto canceledCount = cancelAndPrint(queue, printer); + total += canceledCount; + } + + ConcurrentQueue &queue; + Total &total; + const int threadId; + const QueueControlMessagePrinter printer; + RandomEngine &engine; + std::uniform_int_distribution uniformInt; +}; + } Q_DECLARE_METATYPE(Clock::duration) @@ -219,6 +393,9 @@ private slots: void waitAllFromMultipleThreads_data(); void waitAllFromMultipleThreads(); + void randomCalls_data(); + void randomCalls(); + private: static constexpr int primaryThreadId { 0 }; @@ -228,6 +405,7 @@ private: } Total total { 0 }; + RandomEngineProvider randomEngineProvider; }; void ConcurrentQueueTest::init() @@ -430,6 +608,46 @@ void ConcurrentQueueTest::waitAllFromMultipleThreads() QCOMPARE(total.load(), expectedTotal(jobs)); } +void ConcurrentQueueTest::randomCalls_data() +{ + QTest::addColumn("queueThreadCount"); + QTest::addColumn("userThreadCount"); + + for (int q : { 1, 2, 3, 4, 8, 12, 16, 20 }) + for (int u : { 1, 2, 3, 4, 7, 11, 18 }) + QTest::addRow("queue{%d}; %d user thread(s)", q, u) << q << u; +} + +void ConcurrentQueueTest::randomCalls() +{ + QFETCH(const int, queueThreadCount); + QFETCH(const int, userThreadCount); + QVERIFY(userThreadCount > 0); + + const auto printer = makeMessagePrinter(queueThreadCount); + + ConcurrentQueue queue(queueThreadCount); + printer.printStartedMessage(); + + randomEngineProvider.resetEngines(userThreadCount); + + std::vector userThreads; + userThreads.reserve(userThreadCount - 1); + for (int id = 1; id < userThreadCount; ++id) { + userThreads.emplace_back(RandomCaller(queue, total, id, queueThreadCount, + randomEngineProvider.engine(id))); + } + RandomCaller(queue, total, primaryThreadId, queueThreadCount, + randomEngineProvider.engine(primaryThreadId))(); + + for (auto &t : userThreads) + t.join(); + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), 0); +} + QTEST_APPLESS_MAIN(ConcurrentQueueTest) #include "concurrent_queue_test.moc" From 57eb8d0171f3beb235808734d3edeaf11b626550 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Tue, 16 Mar 2021 10:25:48 +0200 Subject: [PATCH 11/22] ConcurrentQueue::cancelPending: notify _waitVar This allows to call cancelPending() and waitAll() concurrently with no additional synchronization. While calling these two functions concurrently may not be useful often, supporting it costs little in terms of performance and code complexity. Furthermore, a completely thread-safe class is easier to document and use correctly. Optimize job finalization by notifying _waitVar only if jobsLeft is reduced to 0. Optimize cancelPending() by not locking jobsLeftMutex when no job is canceled (if the queue is empty when this function is called). Add assertions that verify invariants. The output of ConcurrentQueueTest::randomCalls() reflects the fact that a caller of waitAll() can be blocked indefinitely when another thread cancels all queued jobs while no job is being executed. The test output snippet below omits repetitive information: the "QINFO : ConcurrentQueueTest::randomCalls(queue{1}; 2 user thread(s))" prefix of each line, current thread id and the common hh:mm:ss current time part (leaving only the millisecond part). Note that thread #1 begins waiting at the 1st line of the snippet. "=> -8" means that total equals -8 and thus ConcurrentQueue::jobsLeft equals 8. Thread #0 then cancels all 8 queued jobs, then enqueues and cancels 3 jobs twice, then momentarily waits 3 times. #1 is not waked until #0 enqueues 8 more jobs and starts waiting for them too. Once these new 8 jobs are completed, both #0 and #1 end waiting. 1. [ms] 311 | #1 begin waiting for 1 thread => -8 2. [ms] 311 | #0 enqueuing complete. 3. [ms] 311 | #0 canceled 8 jobs => -8 4. [ms] 311 | #0 enqueuing 3 jobs... 5. [ms] 312 | #0 enqueuing complete. 6. [ms] 312 | #0 canceled 3 jobs => -3 7. [ms] 312 | #0 enqueuing 3 jobs... 8. [ms] 312 | #0 enqueuing complete. 9. [ms] 312 | #0 canceled 3 jobs => -3 10. [ms] 312 | #0 begin waiting for 1 thread => 0 11. [ms] 312 | #0 end waiting for 1 thread => 0 12. [ms] 312 | #0 begin waiting for 1 thread => 0 13. [ms] 312 | #0 end waiting for 1 thread => 0 14. [ms] 312 | #0 begin waiting for 1 thread => 0 15. [ms] 312 | #0 end waiting for 1 thread => 0 16. [ms] 312 | #0 canceled 0 jobs => 0 17. [ms] 312 | #0 canceled 0 jobs => 0 18. [ms] 312 | #0 enqueuing 3 jobs... 19. [ms] 312 | #0 enqueuing complete. 20. [ms] 312 | #0 enqueuing 3 jobs... 21. [ms] 312 | #0 enqueuing complete. 22. [ms] 312 | #0 enqueuing 2 jobs... 23. [ms] 312 | #0 enqueuing complete. 24. [ms] 312 | #0 begin waiting for 1 thread => -8 25. [ms] 312 | [0.1] sleep 0.003 ms... 26. [ms] 312 | [0.1] +1 => -7 27. [ms] 312 | [0.2] sleep 0.003 ms... 28. [ms] 312 | [0.2] +1 => -6 29. [ms] 312 | [0.3] sleep 0.003 ms... 30. [ms] 312 | [0.3] +1 => -5 31. [ms] 312 | [0.1] sleep 0 ms... 32. [ms] 313 | [0.1] +1 => -4 33. [ms] 313 | [0.2] sleep 0.005 ms... 34. [ms] 313 | [0.2] +1 => -3 35. [ms] 313 | [0.3] sleep 0 ms... 36. [ms] 313 | [0.3] +1 => -2 37. [ms] 313 | [0.1] sleep 0.001 ms... 38. [ms] 313 | [0.1] +1 => -1 39. [ms] 313 | [0.2] sleep 0.001 ms... 40. [ms] 313 | [0.2] +1 => 0 41. [ms] 313 | #0 end waiting for 1 thread => 0 42. [ms] 313 | #1 end waiting for 1 thread => 0 --- common/concurrent_queue.h | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 0d8384fa..5be10b0d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -1,11 +1,14 @@ #ifndef CONCURRENT_QUEUE_H #define CONCURRENT_QUEUE_H +#include #include +#include #include #include #include #include +#include namespace YACReader { class ConcurrentQueue @@ -54,11 +57,11 @@ public: // temporary (which destroys _queue's elements and deallocates memory). _queue.swap(oldQueue); } + const auto size = oldQueue.size(); - { - const std::lock_guard lock(jobsLeftMutex); - jobsLeft -= size; - } + assert(size <= std::numeric_limits::max()); + if (size != 0) + finalizeJobs(static_cast(size)); return size; } @@ -107,16 +110,27 @@ private: } job(); - - { - std::lock_guard lock(jobsLeftMutex); - --jobsLeft; - } - - _waitVar.notify_all(); + finalizeJobs(1); } } + void finalizeJobs(int count) + { + assert(count > 0); + + int remainingJobs; + { + std::lock_guard lock(jobsLeftMutex); + assert(jobsLeft >= count); + jobsLeft -= count; + remainingJobs = jobsLeft; + } + + assert(remainingJobs >= 0); + if (remainingJobs == 0) + _waitVar.notify_all(); + } + void joinAll() { { From d869e1230b6bd951eca48fd247557adc083bd249 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Tue, 16 Mar 2021 12:50:53 +0200 Subject: [PATCH 12/22] ConcurrentQueue::enqueue: increment jobsLeft before adding a job ConcurrentQueueTest::randomCalls() built in Debug mode often crashes when concurrent_queue_test.cpp is modified and ConcurrentQueueTest is launched from Qt Creator so that the test is built and immediately run. The crash is an assertion failure that occurs most of the time under the described circumstances at different test data rows: void YACReader::ConcurrentQueue::finalizeJobs(int): Assertion `jobsLeft >= count' failed. The assertion fails because ConcurrentQueue::enqueue() adds a job into the queue first and then increments jobsLeft. If the job is immediately picked up and executed very fast, ConcurrentQueue::nextJob() can try to finalize it before enqueue() increments jobsLeft. Simply reordering the modifications of jobsLeft and _queue in enqueue() ensures that jobsLeft is always non-negative and eliminates the assertion failures. Note that ConcurrentQueue::finalizeJobs() is the only other function that modifies (decreases) jobsLeft. finalizeJobs() is always called *after* the queue's size is reduced. So the following invariant is now maintained at all times and documented: jobsLeft >= _queue.size(). --- common/concurrent_queue.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 5be10b0d..d862bf3d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -34,13 +34,13 @@ public: void enqueue(std::function job) { { - std::lock_guard lock(queueMutex); - _queue.emplace(job); + std::lock_guard lock(jobsLeftMutex); + ++jobsLeft; } { - std::lock_guard lock(jobsLeftMutex); - ++jobsLeft; + std::lock_guard lock(queueMutex); + _queue.emplace(job); } jobAvailableVar.notify_one(); @@ -78,7 +78,7 @@ public: private: std::vector threads; std::queue> _queue; - int jobsLeft; + int jobsLeft; //!< @invariant jobsLeft >= _queue.size() bool bailout; std::condition_variable jobAvailableVar; std::condition_variable _waitVar; From d8a6b7f432568cbfa9bfb76ae5f8dce9189e62bc Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Tue, 16 Mar 2021 14:27:59 +0200 Subject: [PATCH 13/22] ConcurrentQueue: std::move jobs Moving a std::function can be faster than copying it. Correcting these normally minor inefficiencies is important here because they occur under a mutex lock. --- common/concurrent_queue.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index d862bf3d..506530ba 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -8,6 +8,7 @@ #include #include #include +#include #include namespace YACReader { @@ -40,7 +41,7 @@ public: { std::lock_guard lock(queueMutex); - _queue.emplace(job); + _queue.emplace(std::move(job)); } jobAvailableVar.notify_one(); @@ -105,7 +106,7 @@ private: return; } - job = _queue.front(); + job = std::move(_queue.front()); _queue.pop(); } From d026050d4907b4eba02833cd6a847a361b5d7705 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Tue, 16 Mar 2021 18:12:13 +0200 Subject: [PATCH 14/22] ConcurrentQueue::jobsLeft: int => std::size_t This data member's type can be unsigned because its value is never negative now. Matching std::queue::size_type allows to improve type safety, get rid of a static_cast and remove two assertions. The only downside is a slight increase of sizeof(ConcurrentQueue). --- common/concurrent_queue.h | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 506530ba..8cc8bf83 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -60,9 +59,8 @@ public: } const auto size = oldQueue.size(); - assert(size <= std::numeric_limits::max()); if (size != 0) - finalizeJobs(static_cast(size)); + finalizeJobs(size); return size; } @@ -79,7 +77,7 @@ public: private: std::vector threads; std::queue> _queue; - int jobsLeft; //!< @invariant jobsLeft >= _queue.size() + std::size_t jobsLeft; //!< @invariant jobsLeft >= _queue.size() bool bailout; std::condition_variable jobAvailableVar; std::condition_variable _waitVar; @@ -115,11 +113,11 @@ private: } } - void finalizeJobs(int count) + void finalizeJobs(std::size_t count) { assert(count > 0); - int remainingJobs; + std::size_t remainingJobs; { std::lock_guard lock(jobsLeftMutex); assert(jobsLeft >= count); @@ -127,7 +125,6 @@ private: remainingJobs = jobsLeft; } - assert(remainingJobs >= 0); if (remainingJobs == 0) _waitVar.notify_all(); } From c333fbc7d03281f9d6e754717109e86d5776ebe4 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Wed, 17 Mar 2021 16:31:41 +0200 Subject: [PATCH 15/22] ConcurrentQueueTest::randomCalls(): test dominant enqueue() Add another data column to randomCalls() - boostEnqueueOperationWeight - that determines whether or not enqueue() operation's weight/probability is much greater than the other operations' weights. The new column allows to retain the existing much-closer-to-equal weights, which test other aspects of ConcurrentQueue and have detected bugs already. Reduce the number of thread count data rows to offset the test duration increase somewhat. --- .../concurrent_queue_test.cpp | 66 ++++++++++++++++--- 1 file changed, 56 insertions(+), 10 deletions(-) diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index f79b9754..11ce55b6 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -288,8 +288,14 @@ class RandomCaller { public: explicit RandomCaller(ConcurrentQueue &queue, Total &total, int threadId, - int queueThreadCount, RandomEngine &engine) - : queue(queue), total(total), threadId { threadId }, printer(total, threadId, queueThreadCount), engine(engine) + int queueThreadCount, RandomEngine &engine, + bool boostEnqueueOperationWeight) + : queue(queue), + total(total), + threadId { threadId }, + boostEnqueueOperationWeight { boostEnqueueOperationWeight }, + printer(total, threadId, queueThreadCount), + engine(engine) { } @@ -322,14 +328,34 @@ private: return uniformInt(engine, decltype(uniformInt)::param_type(a, b)); } - std::discrete_distribution operationDistribution() + using OperationDistribution = std::discrete_distribution; + + void printProbabilities(const OperationDistribution &distribution) const + { + auto p = distribution.probabilities(); + constexpr std::size_t expectedProbabilityCount { 3 }; + if (p.size() != expectedProbabilityCount) + qFatal("Wrong number of operation probabilities: %zu != %zu", p.size(), expectedProbabilityCount); + + for (auto &x : p) + x *= 100; // Convert to percentages. + log() << QStringLiteral("#%1 operation probabilities: e=%2%, c=%3%, w=%4%.").arg(threadId).arg(p[0]).arg(p[1]).arg(p[2]); + } + + OperationDistribution operationDistribution() + { + auto distribution = boostEnqueueOperationWeight ? boostedEnqueueOperationDistribution() + : almostUniformOperationDistribution(); + printProbabilities(distribution); + return distribution; + } + + OperationDistribution almostUniformOperationDistribution() { constexpr int sumOfProbabilities { 100 }; const auto enqueueProbability = randomInt(0, sumOfProbabilities); const auto cancelProbability = randomInt(0, sumOfProbabilities - enqueueProbability); const auto waitProbability = sumOfProbabilities - enqueueProbability - cancelProbability; - - log() << QStringLiteral("#%1 operation weights: %2%, %3%, %4%.").arg(threadId).arg(enqueueProbability).arg(cancelProbability).arg(waitProbability); if (enqueueProbability + cancelProbability + waitProbability != sumOfProbabilities) qFatal("The sum of probabilities is not 100%%."); @@ -337,6 +363,18 @@ private: return { real(enqueueProbability), real(cancelProbability), real(waitProbability) }; } + OperationDistribution boostedEnqueueOperationDistribution() + { + // Make enqueue the most frequent operation to stress-test executing + // jobs rather than canceling them almost immediately. + const auto enqueueWeight = std::lognormal_distribution(2, 0.5)(engine); + const auto cancelWeight = 1.0; + // Waiting is uninteresting as it doesn't even modify the queue => make it rare. + const auto waitWeight = std::uniform_real_distribution(0, 0.2)(engine); + + return { enqueueWeight, cancelWeight, waitWeight }; + } + JobDataSet createJobs() { constexpr int minJobCount { 1 }, maxJobCount { 5 }; @@ -365,6 +403,7 @@ private: ConcurrentQueue &queue; Total &total; const int threadId; + const bool boostEnqueueOperationWeight; const QueueControlMessagePrinter printer; RandomEngine &engine; std::uniform_int_distribution uniformInt; @@ -612,10 +651,14 @@ void ConcurrentQueueTest::randomCalls_data() { QTest::addColumn("queueThreadCount"); QTest::addColumn("userThreadCount"); + QTest::addColumn("boostEnqueueOperationWeight"); - for (int q : { 1, 2, 3, 4, 8, 12, 16, 20 }) - for (int u : { 1, 2, 3, 4, 7, 11, 18 }) - QTest::addRow("queue{%d}; %d user thread(s)", q, u) << q << u; + const auto suffix = [](bool boost) { return boost ? " +enqueue" : ""; }; + + for (bool boost : { false, true }) + for (int q : { 1, 2, 4, 9, 12, 20 }) + for (int u : { 1, 2, 4, 7, 11, 18 }) + QTest::addRow("queue{%d}; %d user thread(s)%s", q, u, suffix(boost)) << q << u << boost; } void ConcurrentQueueTest::randomCalls() @@ -623,6 +666,7 @@ void ConcurrentQueueTest::randomCalls() QFETCH(const int, queueThreadCount); QFETCH(const int, userThreadCount); QVERIFY(userThreadCount > 0); + QFETCH(const bool, boostEnqueueOperationWeight); const auto printer = makeMessagePrinter(queueThreadCount); @@ -635,10 +679,12 @@ void ConcurrentQueueTest::randomCalls() userThreads.reserve(userThreadCount - 1); for (int id = 1; id < userThreadCount; ++id) { userThreads.emplace_back(RandomCaller(queue, total, id, queueThreadCount, - randomEngineProvider.engine(id))); + randomEngineProvider.engine(id), + boostEnqueueOperationWeight)); } RandomCaller(queue, total, primaryThreadId, queueThreadCount, - randomEngineProvider.engine(primaryThreadId))(); + randomEngineProvider.engine(primaryThreadId), + boostEnqueueOperationWeight)(); for (auto &t : userThreads) t.join(); From 2655613543bb79e93b78f12881a3a4febf6b39f0 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 08:30:05 +0200 Subject: [PATCH 16/22] ConcurrentQueue: simplify the constructor implementation * threadCount argument: int => std::size_t to avoid implicit casting; * eliminate temporary empty std::thread objects; * replace a trivial lambda with a function pointer and its argument; * get rid of the unused dedicated loop counter. --- common/concurrent_queue.h | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 8cc8bf83..c434805e 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -14,16 +14,13 @@ namespace YACReader { class ConcurrentQueue { public: - explicit ConcurrentQueue(int threadCount) + explicit ConcurrentQueue(std::size_t threadCount) : jobsLeft(0), bailout(false) { - threads = std::vector(threadCount); - for (int index = 0; index < threadCount; ++index) { - threads[index] = std::thread([this] { - this->nextJob(); - }); - } + threads.reserve(threadCount); + for (; threadCount != 0; --threadCount) + threads.emplace_back(&ConcurrentQueue::nextJob, this); } ~ConcurrentQueue() From 61cd2450376b585cc92c0915b7f1f5b4861d41c6 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 09:10:51 +0200 Subject: [PATCH 17/22] Document ConcurrentQueue and de-inline its implementation ConcurrentQueue is currently used only by two classes and a test, but modifying concurrent_queue.h requires recompiling 30 source files. None of the member functions is so lightweight as to make it worth inlining. An alternative to `@note ConcurrentQueue is unable to execute jobs if @p threadCount == 0.` is `assert(threadCount != 0);`. But this would force classes that contain a ConcurrentQueue data member to always start a thread, even if they detect at runtime that they are never going to enqueue a job. Add Job type alias to avoid repeating the type. Use default member initializers instead of the member initializer list to make it clear [to the reader of the header] that no data member is left uninitialized. --- YACReaderLibrary/YACReaderLibrary.pro | 1 + common/concurrent_queue.cpp | 127 ++++++++++++++++ common/concurrent_queue.h | 140 +++--------------- .../concurrent_queue_test.pro | 4 +- 4 files changed, 152 insertions(+), 120 deletions(-) create mode 100644 common/concurrent_queue.cpp diff --git a/YACReaderLibrary/YACReaderLibrary.pro b/YACReaderLibrary/YACReaderLibrary.pro index 0ba002d6..ad2dacf8 100644 --- a/YACReaderLibrary/YACReaderLibrary.pro +++ b/YACReaderLibrary/YACReaderLibrary.pro @@ -160,6 +160,7 @@ HEADERS += comic_flow.h \ } SOURCES += comic_flow.cpp \ + ../common/concurrent_queue.cpp \ create_library_dialog.cpp \ db/comic_query_result_processor.cpp \ db/folder_query_result_processor.cpp \ diff --git a/common/concurrent_queue.cpp b/common/concurrent_queue.cpp new file mode 100644 index 00000000..7f73c2d0 --- /dev/null +++ b/common/concurrent_queue.cpp @@ -0,0 +1,127 @@ +#include "concurrent_queue.h" + +#include +#include +#include +#include + +using namespace YACReader; + +ConcurrentQueue::ConcurrentQueue(std::size_t threadCount) +{ + threads.reserve(threadCount); + for (; threadCount != 0; --threadCount) + threads.emplace_back(&ConcurrentQueue::nextJob, this); +} + +ConcurrentQueue::~ConcurrentQueue() +{ + joinAll(); +} + +void ConcurrentQueue::enqueue(Job job) +{ + { + std::lock_guard lock(jobsLeftMutex); + ++jobsLeft; + } + + { + std::lock_guard lock(queueMutex); + _queue.emplace(std::move(job)); + } + + jobAvailableVar.notify_one(); +} + +std::size_t ConcurrentQueue::cancelPending() +{ + decltype(_queue) oldQueue; + { + const std::lock_guard lock(queueMutex); + // The mutex locking time is lower with swap() compared to assigning a + // temporary (which destroys _queue's elements and deallocates memory). + _queue.swap(oldQueue); + } + + const auto size = oldQueue.size(); + if (size != 0) + finalizeJobs(size); + return size; +} + +void ConcurrentQueue::waitAll() +{ + std::unique_lock lock(jobsLeftMutex); + if (jobsLeft > 0) { + _waitVar.wait(lock, [this] { + return jobsLeft == 0; + }); + } +} + +void ConcurrentQueue::nextJob() +{ + while (true) { + Job job; + + { + std::unique_lock lock(queueMutex); + + if (bailout) { + return; + } + + jobAvailableVar.wait(lock, [this] { + return _queue.size() > 0 || bailout; + }); + + if (bailout) { + return; + } + + job = std::move(_queue.front()); + _queue.pop(); + } + + job(); + finalizeJobs(1); + } +} + +void ConcurrentQueue::finalizeJobs(std::size_t count) +{ + assert(count > 0); + + std::size_t remainingJobs; + { + std::lock_guard lock(jobsLeftMutex); + assert(jobsLeft >= count); + jobsLeft -= count; + remainingJobs = jobsLeft; + } + + if (remainingJobs == 0) + _waitVar.notify_all(); +} + +void ConcurrentQueue::joinAll() +{ + { + std::lock_guard lock(queueMutex); + + if (bailout) { + return; + } + + bailout = true; + } + + jobAvailableVar.notify_all(); + + for (auto &x : threads) { + if (x.joinable()) { + x.join(); + } + } +} diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index c434805e..a27c505e 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -1,151 +1,53 @@ #ifndef CONCURRENT_QUEUE_H #define CONCURRENT_QUEUE_H -#include #include #include #include #include #include -#include #include namespace YACReader { +//! All functions in this class are thread-safe in the Qt documentation sense. class ConcurrentQueue { public: - explicit ConcurrentQueue(std::size_t threadCount) - : jobsLeft(0), - bailout(false) - { - threads.reserve(threadCount); - for (; threadCount != 0; --threadCount) - threads.emplace_back(&ConcurrentQueue::nextJob, this); - } + //! @brief Creates and starts executing @p threadCount worker threads. + //! @note ConcurrentQueue is unable to execute jobs if @p threadCount == 0. + explicit ConcurrentQueue(std::size_t threadCount); - ~ConcurrentQueue() - { - joinAll(); - } + //! Cancels all jobs that have not been picked up by worker threads yet, + //! waits for all worker threads to complete their jobs and joins them. + ~ConcurrentQueue(); - void enqueue(std::function job) - { - { - std::lock_guard lock(jobsLeftMutex); - ++jobsLeft; - } + using Job = std::function; - { - std::lock_guard lock(queueMutex); - _queue.emplace(std::move(job)); - } - - jobAvailableVar.notify_one(); - } + //! @brief Adds @p job to the queue. + //! @note A worker thread may start executing @p job immediately if it is idle. + //! Worker threads start executing jobs in the same order as they are enqueued. + void enqueue(Job job); //! @brief Cancels all jobs that have not been picked up by worker threads yet. //! @return The number of jobs that were canceled. - std::size_t cancelPending() - { - decltype(_queue) oldQueue; - { - const std::lock_guard lock(queueMutex); - // The mutex locking time is lower with swap() compared to assigning a - // temporary (which destroys _queue's elements and deallocates memory). - _queue.swap(oldQueue); - } + std::size_t cancelPending(); - const auto size = oldQueue.size(); - if (size != 0) - finalizeJobs(size); - return size; - } - - void waitAll() - { - std::unique_lock lock(jobsLeftMutex); - if (jobsLeft > 0) { - _waitVar.wait(lock, [this] { - return jobsLeft == 0; - }); - } - } + //! @brief Blocks the current thread until all enqueued jobs are completed. + void waitAll(); private: std::vector threads; - std::queue> _queue; - std::size_t jobsLeft; //!< @invariant jobsLeft >= _queue.size() - bool bailout; + std::queue _queue; + std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size() + bool bailout = false; std::condition_variable jobAvailableVar; std::condition_variable _waitVar; std::mutex jobsLeftMutex; std::mutex queueMutex; - void nextJob() - { - while (true) { - std::function job; - - { - std::unique_lock lock(queueMutex); - - if (bailout) { - return; - } - - jobAvailableVar.wait(lock, [this] { - return _queue.size() > 0 || bailout; - }); - - if (bailout) { - return; - } - - job = std::move(_queue.front()); - _queue.pop(); - } - - job(); - finalizeJobs(1); - } - } - - void finalizeJobs(std::size_t count) - { - assert(count > 0); - - std::size_t remainingJobs; - { - std::lock_guard lock(jobsLeftMutex); - assert(jobsLeft >= count); - jobsLeft -= count; - remainingJobs = jobsLeft; - } - - if (remainingJobs == 0) - _waitVar.notify_all(); - } - - void joinAll() - { - { - std::lock_guard lock(queueMutex); - - if (bailout) { - return; - } - - bailout = true; - } - - jobAvailableVar.notify_all(); - - for (auto &x : threads) { - if (x.joinable()) { - x.join(); - } - } - } + void nextJob(); + void finalizeJobs(std::size_t count); + void joinAll(); }; } diff --git a/tests/concurrent_queue_test/concurrent_queue_test.pro b/tests/concurrent_queue_test/concurrent_queue_test.pro index d6cb7f5e..71403911 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.pro +++ b/tests/concurrent_queue_test/concurrent_queue_test.pro @@ -4,4 +4,6 @@ PATH_TO_common = ../../common INCLUDEPATH += $$PATH_TO_common HEADERS += $${PATH_TO_common}/concurrent_queue.h -SOURCES += concurrent_queue_test.cpp +SOURCES += \ + $${PATH_TO_common}/concurrent_queue.cpp \ + concurrent_queue_test.cpp From 4cb542c8cca6aea8698491743ed65bb3cc8ed239 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 09:51:59 +0200 Subject: [PATCH 18/22] Remove ConcurrentQueue::joinAll() This function is called only from ~ConcurrentQueue(). joinAll() is not thread-safe and it cannot be called earlier without introducing a null state. Moving the function's implementation into the definition of ~ConcurrentQueue() makes the code clearer. Removing joinAll() also allows to establish and document invariants for two data members. Assert consistency between jobsLeft and _queue in ~ConcurrentQueue(). --- common/concurrent_queue.cpp | 32 ++++++++++---------------------- common/concurrent_queue.h | 4 ++-- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/common/concurrent_queue.cpp b/common/concurrent_queue.cpp index 7f73c2d0..28a0efe1 100644 --- a/common/concurrent_queue.cpp +++ b/common/concurrent_queue.cpp @@ -16,7 +16,16 @@ ConcurrentQueue::ConcurrentQueue(std::size_t threadCount) ConcurrentQueue::~ConcurrentQueue() { - joinAll(); + { + std::lock_guard lock(queueMutex); + assert(!bailout); + bailout = true; + } + jobAvailableVar.notify_all(); + + for (auto &x : threads) + x.join(); + assert(jobsLeft == _queue.size() && "Only not yet started jobs are left."); } void ConcurrentQueue::enqueue(Job job) @@ -104,24 +113,3 @@ void ConcurrentQueue::finalizeJobs(std::size_t count) if (remainingJobs == 0) _waitVar.notify_all(); } - -void ConcurrentQueue::joinAll() -{ - { - std::lock_guard lock(queueMutex); - - if (bailout) { - return; - } - - bailout = true; - } - - jobAvailableVar.notify_all(); - - for (auto &x : threads) { - if (x.joinable()) { - x.join(); - } - } -} diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index a27c505e..4d3c4f23 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -36,10 +36,11 @@ public: void waitAll(); private: + //! @invariant all worker threads are joinable until the destructor is called. std::vector threads; std::queue _queue; std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size() - bool bailout = false; + bool bailout = false; //!< @invariant is false until the destructor is called. std::condition_variable jobAvailableVar; std::condition_variable _waitVar; std::mutex jobsLeftMutex; @@ -47,7 +48,6 @@ private: void nextJob(); void finalizeJobs(std::size_t count); - void joinAll(); }; } From 72c78d0164e85b44148349b1bf21220d2fc9fb27 Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 11:02:07 +0200 Subject: [PATCH 19/22] ConcurrentQueue: remove redundant checks The C++ standard specifies `std::condition_variable::wait(lock, pred)` as equivalent to `while (!pred()) wait(lock);`. It is implemented exactly so in libstdc++, libc++ and MSVC. --- common/concurrent_queue.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/common/concurrent_queue.cpp b/common/concurrent_queue.cpp index 28a0efe1..fa8eba24 100644 --- a/common/concurrent_queue.cpp +++ b/common/concurrent_queue.cpp @@ -62,11 +62,7 @@ std::size_t ConcurrentQueue::cancelPending() void ConcurrentQueue::waitAll() { std::unique_lock lock(jobsLeftMutex); - if (jobsLeft > 0) { - _waitVar.wait(lock, [this] { - return jobsLeft == 0; - }); - } + _waitVar.wait(lock, [this] { return jobsLeft == 0; }); } void ConcurrentQueue::nextJob() @@ -77,10 +73,6 @@ void ConcurrentQueue::nextJob() { std::unique_lock lock(queueMutex); - if (bailout) { - return; - } - jobAvailableVar.wait(lock, [this] { return _queue.size() > 0 || bailout; }); From 05b384ed6dbfeeb9662c04a00c4fc6f3af06955e Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 11:30:55 +0200 Subject: [PATCH 20/22] Make ConcurrentQueue::waitAll() const This member function does not affect the logical state of the class. Making std::condition_variable and std::mutex data members mutable is idiomatic as these classes are thread-safe synchronization primitives. --- common/concurrent_queue.cpp | 2 +- common/concurrent_queue.h | 6 +++--- tests/concurrent_queue_test/concurrent_queue_test.cpp | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/concurrent_queue.cpp b/common/concurrent_queue.cpp index fa8eba24..a762f4db 100644 --- a/common/concurrent_queue.cpp +++ b/common/concurrent_queue.cpp @@ -59,7 +59,7 @@ std::size_t ConcurrentQueue::cancelPending() return size; } -void ConcurrentQueue::waitAll() +void ConcurrentQueue::waitAll() const { std::unique_lock lock(jobsLeftMutex); _waitVar.wait(lock, [this] { return jobsLeft == 0; }); diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 4d3c4f23..380f6d5d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -33,7 +33,7 @@ public: std::size_t cancelPending(); //! @brief Blocks the current thread until all enqueued jobs are completed. - void waitAll(); + void waitAll() const; private: //! @invariant all worker threads are joinable until the destructor is called. @@ -42,8 +42,8 @@ private: std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size() bool bailout = false; //!< @invariant is false until the destructor is called. std::condition_variable jobAvailableVar; - std::condition_variable _waitVar; - std::mutex jobsLeftMutex; + mutable std::condition_variable _waitVar; + mutable std::mutex jobsLeftMutex; std::mutex queueMutex; void nextJob(); diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index 11ce55b6..b09d05f6 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -202,7 +202,7 @@ std::size_t cancelAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrin return canceledCount; } -void waitAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) +void waitAndPrint(const ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) { printer.printBeginWaitingMessage(); queue.waitAll(); From d013abedc1769fc5d4ee2aab4e91095396b7df9e Mon Sep 17 00:00:00 2001 From: Igor Kushnir Date: Thu, 18 Mar 2021 16:09:39 +0200 Subject: [PATCH 21/22] Azure: run tests as a step of each platform's job --- azure-pipelines-windows-template.yml | 5 +++++ azure-pipelines.yml | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/azure-pipelines-windows-template.yml b/azure-pipelines-windows-template.yml index b7c4cfea..23d039d9 100644 --- a/azure-pipelines-windows-template.yml +++ b/azure-pipelines-windows-template.yml @@ -33,6 +33,11 @@ jobs: qmake CONFIG+="7zip" %DEFINES_VAR% nmake displayName: 'Build' + - script: | + call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\${{ parameters.vc_vars }}" + set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% + nmake check TESTARGS="-maxwarnings 100000" + displayName: 'Run tests' - script: | set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% cd $(Build.SourcesDirectory)\ci\win diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ae26052a..4dbfa60d 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -72,6 +72,9 @@ jobs: qmake CONFIG+="unarr" $DEFINES_VAR make displayName: 'Build' + - script: | + make check TESTARGS="-maxwarnings 100000" + displayName: 'Run tests' - task: CopyFiles@2 inputs: sourceFolder: $(Build.SourcesDirectory)/tarball @@ -118,6 +121,11 @@ jobs: SKIP_CODESIGN="$(tr [A-Z] [a-z] <<< "$IS_FORK")" ./compileOSX.sh $VERSION $(Build.BuildNumber) $SKIP_CODESIGN displayName: 'Build' + - script: | + cd $(Build.SourcesDirectory)/tests + qmake + make check TESTARGS="-maxwarnings 100000" + displayName: 'Build and run tests' - task: CopyFiles@2 inputs: contents: '*.dmg' From e2cc1064feaa533ac5d30f362369315b20723e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20=C3=81ngel=20San=20Mart=C3=ADn?= Date: Tue, 28 Dec 2021 20:34:42 +0100 Subject: [PATCH 22/22] Run tests in qt6 too --- azure-pipelines-windows-template-qt6.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/azure-pipelines-windows-template-qt6.yml b/azure-pipelines-windows-template-qt6.yml index ee709824..707b0b37 100644 --- a/azure-pipelines-windows-template-qt6.yml +++ b/azure-pipelines-windows-template-qt6.yml @@ -33,6 +33,11 @@ jobs: qmake CONFIG+="7zip" %DEFINES_VAR% nmake displayName: 'Build' + - script: | + call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\${{ parameters.vc_vars }}" + set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% + nmake check TESTARGS="-maxwarnings 100000" + displayName: 'Run tests' # - script: | # set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% # cd $(Build.SourcesDirectory)\ci\win