diff --git a/YACReader.pro b/YACReader.pro index f788b405..f3106c4f 100644 --- a/YACReader.pro +++ b/YACReader.pro @@ -1,3 +1,4 @@ TEMPLATE = subdirs SUBDIRS = YACReader YACReaderLibrary YACReaderLibraryServer YACReaderLibrary.depends = YACReader +!CONFIG(no_tests): SUBDIRS += tests diff --git a/YACReader/YACReader.pro b/YACReader/YACReader.pro index 771ef583..d1996afd 100644 --- a/YACReader/YACReader.pro +++ b/YACReader/YACReader.pro @@ -6,7 +6,7 @@ QMAKE_TARGET_BUNDLE_PREFIX = "com.yacreader" DEPENDPATH += . \ release -DEFINES += NOMINMAX YACREADER +DEFINES += YACREADER #load default build flags include (../config.pri) diff --git a/YACReaderLibrary/YACReaderLibrary.pro b/YACReaderLibrary/YACReaderLibrary.pro index 3160ac8e..ad2dacf8 100644 --- a/YACReaderLibrary/YACReaderLibrary.pro +++ b/YACReaderLibrary/YACReaderLibrary.pro @@ -12,7 +12,7 @@ INCLUDEPATH += . \ ./comic_vine \ ./comic_vine/model -DEFINES += SERVER_RELEASE NOMINMAX YACREADER_LIBRARY +DEFINES += SERVER_RELEASE YACREADER_LIBRARY # load default build flags include (../config.pri) @@ -160,6 +160,7 @@ HEADERS += comic_flow.h \ } SOURCES += comic_flow.cpp \ + ../common/concurrent_queue.cpp \ create_library_dialog.cpp \ db/comic_query_result_processor.cpp \ db/folder_query_result_processor.cpp \ diff --git a/YACReaderLibrary/db/comic_query_result_processor.cpp b/YACReaderLibrary/db/comic_query_result_processor.cpp index 873ab3b5..b05acb5c 100644 --- a/YACReaderLibrary/db/comic_query_result_processor.cpp +++ b/YACReaderLibrary/db/comic_query_result_processor.cpp @@ -16,7 +16,7 @@ YACReader::ComicQueryResultProcessor::ComicQueryResultProcessor() void YACReader::ComicQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, const QString &databasePath) { - querySearchQueue.cancellPending(); + querySearchQueue.cancelPending(); querySearchQueue.enqueue([=] { QString connectionName = ""; diff --git a/YACReaderLibrary/db/folder_query_result_processor.cpp b/YACReaderLibrary/db/folder_query_result_processor.cpp index eafeacf6..2ecc83b4 100644 --- a/YACReaderLibrary/db/folder_query_result_processor.cpp +++ b/YACReaderLibrary/db/folder_query_result_processor.cpp @@ -22,7 +22,7 @@ YACReader::FolderQueryResultProcessor::FolderQueryResultProcessor(FolderModel *m void YACReader::FolderQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, bool includeComics) { - querySearchQueue.cancellPending(); + querySearchQueue.cancelPending(); querySearchQueue.enqueue([=] { QString connectionName = ""; diff --git a/YACReaderLibraryServer/YACReaderLibraryServer.pro b/YACReaderLibraryServer/YACReaderLibraryServer.pro index fd892f1d..67aedd06 100644 --- a/YACReaderLibraryServer/YACReaderLibraryServer.pro +++ b/YACReaderLibraryServer/YACReaderLibraryServer.pro @@ -10,7 +10,7 @@ INCLUDEPATH += ../YACReaderLibrary \ ../YACReaderLibrary/server \ ../YACReaderLibrary/db -DEFINES += SERVER_RELEASE NOMINMAX YACREADER_LIBRARY +DEFINES += SERVER_RELEASE YACREADER_LIBRARY # load default build flags # do a basic dependency check include(headless_config.pri) diff --git a/azure-pipelines-windows-template-qt6.yml b/azure-pipelines-windows-template-qt6.yml index ee709824..707b0b37 100644 --- a/azure-pipelines-windows-template-qt6.yml +++ b/azure-pipelines-windows-template-qt6.yml @@ -33,6 +33,11 @@ jobs: qmake CONFIG+="7zip" %DEFINES_VAR% nmake displayName: 'Build' + - script: | + call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\${{ parameters.vc_vars }}" + set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% + nmake check TESTARGS="-maxwarnings 100000" + displayName: 'Run tests' # - script: | # set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% # cd $(Build.SourcesDirectory)\ci\win diff --git a/azure-pipelines-windows-template.yml b/azure-pipelines-windows-template.yml index b7c4cfea..23d039d9 100644 --- a/azure-pipelines-windows-template.yml +++ b/azure-pipelines-windows-template.yml @@ -33,6 +33,11 @@ jobs: qmake CONFIG+="7zip" %DEFINES_VAR% nmake displayName: 'Build' + - script: | + call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\${{ parameters.vc_vars }}" + set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% + nmake check TESTARGS="-maxwarnings 100000" + displayName: 'Run tests' - script: | set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH% cd $(Build.SourcesDirectory)\ci\win diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ae26052a..4dbfa60d 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -72,6 +72,9 @@ jobs: qmake CONFIG+="unarr" $DEFINES_VAR make displayName: 'Build' + - script: | + make check TESTARGS="-maxwarnings 100000" + displayName: 'Run tests' - task: CopyFiles@2 inputs: sourceFolder: $(Build.SourcesDirectory)/tarball @@ -118,6 +121,11 @@ jobs: SKIP_CODESIGN="$(tr [A-Z] [a-z] <<< "$IS_FORK")" ./compileOSX.sh $VERSION $(Build.BuildNumber) $SKIP_CODESIGN displayName: 'Build' + - script: | + cd $(Build.SourcesDirectory)/tests + qmake + make check TESTARGS="-maxwarnings 100000" + displayName: 'Build and run tests' - task: CopyFiles@2 inputs: contents: '*.dmg' diff --git a/common/concurrent_queue.cpp b/common/concurrent_queue.cpp new file mode 100644 index 00000000..a762f4db --- /dev/null +++ b/common/concurrent_queue.cpp @@ -0,0 +1,107 @@ +#include "concurrent_queue.h" + +#include +#include +#include +#include + +using namespace YACReader; + +ConcurrentQueue::ConcurrentQueue(std::size_t threadCount) +{ + threads.reserve(threadCount); + for (; threadCount != 0; --threadCount) + threads.emplace_back(&ConcurrentQueue::nextJob, this); +} + +ConcurrentQueue::~ConcurrentQueue() +{ + { + std::lock_guard lock(queueMutex); + assert(!bailout); + bailout = true; + } + jobAvailableVar.notify_all(); + + for (auto &x : threads) + x.join(); + assert(jobsLeft == _queue.size() && "Only not yet started jobs are left."); +} + +void ConcurrentQueue::enqueue(Job job) +{ + { + std::lock_guard lock(jobsLeftMutex); + ++jobsLeft; + } + + { + std::lock_guard lock(queueMutex); + _queue.emplace(std::move(job)); + } + + jobAvailableVar.notify_one(); +} + +std::size_t ConcurrentQueue::cancelPending() +{ + decltype(_queue) oldQueue; + { + const std::lock_guard lock(queueMutex); + // The mutex locking time is lower with swap() compared to assigning a + // temporary (which destroys _queue's elements and deallocates memory). + _queue.swap(oldQueue); + } + + const auto size = oldQueue.size(); + if (size != 0) + finalizeJobs(size); + return size; +} + +void ConcurrentQueue::waitAll() const +{ + std::unique_lock lock(jobsLeftMutex); + _waitVar.wait(lock, [this] { return jobsLeft == 0; }); +} + +void ConcurrentQueue::nextJob() +{ + while (true) { + Job job; + + { + std::unique_lock lock(queueMutex); + + jobAvailableVar.wait(lock, [this] { + return _queue.size() > 0 || bailout; + }); + + if (bailout) { + return; + } + + job = std::move(_queue.front()); + _queue.pop(); + } + + job(); + finalizeJobs(1); + } +} + +void ConcurrentQueue::finalizeJobs(std::size_t count) +{ + assert(count > 0); + + std::size_t remainingJobs; + { + std::lock_guard lock(jobsLeftMutex); + assert(jobsLeft >= count); + jobsLeft -= count; + remainingJobs = jobsLeft; + } + + if (remainingJobs == 0) + _waitVar.notify_all(); +} diff --git a/common/concurrent_queue.h b/common/concurrent_queue.h index 99505df7..380f6d5d 100644 --- a/common/concurrent_queue.h +++ b/common/concurrent_queue.h @@ -6,126 +6,48 @@ #include #include #include +#include namespace YACReader { +//! All functions in this class are thread-safe in the Qt documentation sense. class ConcurrentQueue { public: - explicit ConcurrentQueue(int threadCount) - : jobsLeft(0), - bailout(false) - { - threads = std::vector(threadCount); - for (int index = 0; index < threadCount; ++index) { - threads[index] = std::thread([this] { - this->nextJob(); - }); - } - } + //! @brief Creates and starts executing @p threadCount worker threads. + //! @note ConcurrentQueue is unable to execute jobs if @p threadCount == 0. + explicit ConcurrentQueue(std::size_t threadCount); - ~ConcurrentQueue() - { - joinAll(); - } + //! Cancels all jobs that have not been picked up by worker threads yet, + //! waits for all worker threads to complete their jobs and joins them. + ~ConcurrentQueue(); - void enqueue(std::function job) - { - { - std::lock_guard lock(queueMutex); - _queue.emplace(job); - } + using Job = std::function; - { - std::lock_guard lock(jobsLeftMutex); - ++jobsLeft; - } + //! @brief Adds @p job to the queue. + //! @note A worker thread may start executing @p job immediately if it is idle. + //! Worker threads start executing jobs in the same order as they are enqueued. + void enqueue(Job job); - jobAvailableVar.notify_one(); - } + //! @brief Cancels all jobs that have not been picked up by worker threads yet. + //! @return The number of jobs that were canceled. + std::size_t cancelPending(); - void cancellPending() - { - std::unique_lock lockQueue(queueMutex); - std::unique_lock lockJobsLeft(jobsLeftMutex); - _queue = std::queue>(); - jobsLeft = 0; - } - - void waitAll() - { - std::unique_lock lock(jobsLeftMutex); - if (jobsLeft > 0) { - _waitVar.wait(lock, [this] { - return jobsLeft == 0; - }); - } - } + //! @brief Blocks the current thread until all enqueued jobs are completed. + void waitAll() const; private: + //! @invariant all worker threads are joinable until the destructor is called. std::vector threads; - std::queue> _queue; - int jobsLeft; - bool bailout; + std::queue _queue; + std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size() + bool bailout = false; //!< @invariant is false until the destructor is called. std::condition_variable jobAvailableVar; - std::condition_variable _waitVar; - std::mutex jobsLeftMutex; + mutable std::condition_variable _waitVar; + mutable std::mutex jobsLeftMutex; std::mutex queueMutex; - void nextJob() - { - while (true) { - std::function job; - - { - std::unique_lock lock(queueMutex); - - if (bailout) { - return; - } - - jobAvailableVar.wait(lock, [this] { - return _queue.size() > 0 || bailout; - }); - - if (bailout) { - return; - } - - job = _queue.front(); - _queue.pop(); - } - - job(); - - { - std::lock_guard lock(jobsLeftMutex); - --jobsLeft; - } - - _waitVar.notify_one(); - } - } - - void joinAll() - { - { - std::lock_guard lock(queueMutex); - - if (bailout) { - return; - } - - bailout = true; - } - - jobAvailableVar.notify_all(); - - for (auto &x : threads) { - if (x.joinable()) { - x.join(); - } - } - } + void nextJob(); + void finalizeJobs(std::size_t count); }; } diff --git a/config.pri b/config.pri index 14af1b2e..660f80c0 100644 --- a/config.pri +++ b/config.pri @@ -5,6 +5,8 @@ CONFIG += c++17 win32:QMAKE_CXXFLAGS += /std:c++17 #enable c++17 explicitly in msvc +DEFINES += NOMINMAX + if(unix|mingw):QMAKE_CXXFLAGS_RELEASE += -DNDEBUG win32:msvc:QMAKE_CXXFLAGS_RELEASE += /DNDEBUG diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp new file mode 100644 index 00000000..b09d05f6 --- /dev/null +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -0,0 +1,699 @@ +#include "concurrent_queue.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace chrono = std::chrono; +using Clock = chrono::steady_clock; +using YACReader::ConcurrentQueue; + +namespace { +double toMilliseconds(Clock::duration duration) +{ + return chrono::duration_cast(duration).count() / 1000.0; +} + +QString currentThreadInfo() +{ + std::ostringstream os; + os << std::this_thread::get_id(); + return QString::fromStdString(os.str()); +} + +//! This test prints thousands of lines of detailed output. The output allows to analyze +//! how ConcurrentQueue is being tested, how it works and why the test fails or crashes +//! (normally it passes). The default maximum number of warnings in Qt Test is 2000, +//! which is too low for this test. Therefore, the following warning is printed before +//! the log output is suppressed: "Maximum amount of warnings exceeded. Use -maxwarnings +//! to override.". Passing `-maxwarnings 100000` command line option to the test lets it +//! print everything. Passing -silent command line option to the test suppresses all its +//! output except for RandomEngineProvider's root seeds, which are necessary to reproduce +//! interesting test results. +QDebug log() +{ + return qInfo().noquote() << currentThreadInfo() << '|' + << QTime::currentTime().toString(Qt::ISODateWithMs) << '|'; +} + +using Total = std::atomic; + +struct JobData { + int summand; + Clock::duration sleepingTime; +}; +using JobDataSet = QVector; + +int expectedTotal(JobDataSet::const_iterator first, JobDataSet::const_iterator last) +{ + return std::accumulate(first, last, 0, + [](int total, JobData job) { + return total + job.summand; + }); +} + +int expectedTotal(const JobDataSet &jobs) +{ + return expectedTotal(jobs.cbegin(), jobs.cend()); +} + +int expectedTotal(const JobDataSet &jobs, std::size_t canceledCount) +{ + const auto count = jobs.size() - static_cast(canceledCount); + if (count < 0) + qFatal("Canceled more than the total number of jobs somehow!"); + return expectedTotal(jobs.cbegin(), jobs.cbegin() + count); +} + +int expectedTotal(const QVector &jobs) +{ + return std::accumulate(jobs.cbegin(), jobs.cend(), 0, + [](int total, const JobDataSet &dataSet) { + return total + expectedTotal(dataSet); + }); +} + +class Id +{ +public: + explicit Id(int threadId, int jobId) + : threadId { threadId }, jobId { jobId } { } + + QString toString() const { return QStringLiteral("[%1.%2]").arg(threadId).arg(jobId); } + +private: + const int threadId; + const int jobId; +}; + +QDebug operator<<(QDebug debug, Id id) +{ + QDebugStateSaver saver(debug); + debug.noquote() << id.toString(); + return debug; +} + +class Job +{ +public: + explicit Job(Total &total, JobData data, Id id) + : total { total }, data { data }, id { id } { } + + void operator()() + { + log().nospace() << id << " sleep " << toMilliseconds(data.sleepingTime) << " ms..."; + std::this_thread::sleep_for(data.sleepingTime); + + const auto updatedTotal = (total += data.summand); + log().nospace() << id << " +" << data.summand << " => " << updatedTotal; + } + +private: + Total &total; + const JobData data; + const Id id; +}; + +class Enqueuer +{ +public: + explicit Enqueuer(ConcurrentQueue &queue, Total &total, const JobDataSet &jobs, int threadId) + : queue { queue }, total { total }, jobs { jobs }, threadId { threadId } { } + + void operator()() + { + const char *const jobStr = jobs.size() == 1 ? "job" : "jobs"; + log() << QStringLiteral("#%1 enqueuing %2 %3...").arg(threadId).arg(jobs.size()).arg(jobStr); + for (int i = 0; i < jobs.size(); ++i) + queue.enqueue(Job(total, jobs.at(i), Id(threadId, i + 1))); + log() << QStringLiteral("#%1 enqueuing complete.").arg(threadId); + } + +private: + ConcurrentQueue &queue; + Total &total; + const JobDataSet jobs; + const int threadId; +}; + +class QueueControlMessagePrinter +{ +public: + explicit QueueControlMessagePrinter(const Total &total, int threadId, int threadCount) + : total { total }, threadId { threadId }, threadCount { threadCount } { } + + void printStartedMessage() const + { + log() << threadMessageFormatString().arg("started"); + } + void printCanceledMessage(std::size_t canceledCount) const + { + const char *const jobStr = canceledCount == 1 ? "job" : "jobs"; + const auto format = messageFormatString().arg("%1 %2 %3"); + log() << format.arg("canceled").arg(canceledCount).arg(jobStr); + } + void printBeginWaitingMessage() const + { + log() << threadMessageFormatString().arg("begin waiting for"); + } + void printEndWaitingMessage() const + { + log() << threadMessageFormatString().arg("end waiting for"); + } + +private: + QString messageFormatString() const + { + return QStringLiteral("#%1 %3 => %2").arg(threadId).arg(total.load()); + } + + QString threadMessageFormatString() const + { + const char *const threadStr = threadCount == 1 ? "thread" : "threads"; + const auto format = messageFormatString().arg("%3 %1 %2"); + return format.arg(threadCount).arg(threadStr); + } + + const Total &total; + const int threadId; + const int threadCount; +}; + +std::size_t cancelAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) +{ + const auto canceledCount = queue.cancelPending(); + printer.printCanceledMessage(canceledCount); + return canceledCount; +} + +void waitAndPrint(const ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) +{ + printer.printBeginWaitingMessage(); + queue.waitAll(); + printer.printEndWaitingMessage(); +} + +template +QDebug operator<<(QDebug debug, const std::array &array) +{ + QDebugStateSaver saver(debug); + debug.nospace(); + + debug << '('; + if (size != 0) { + debug << array.front(); + for (std::size_t i = 1; i != size; ++i) + debug << ", " << array[i]; + } + debug << ')'; + + return debug; +} + +using RandomEngine = std::mt19937_64; + +class RandomEngineProvider +{ +public: + RandomEngineProvider() + { + std::random_device rd; + const auto randomValues = generate(rd); + // Qt Test does not suppress output from the constructor of a test class + // even when -silent command line option is passed. This is fortunate + // because the root seeds can be used to reproduce a test failure. + log() << "RandomEngineProvider's root seeds:" << randomValues; + std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); + rootEngine.reset(new std::mt19937(seedSeq)); + } + + void resetEngines(std::size_t engineCount) + { + engines.clear(); + engines.reserve(engineCount); + for (; engineCount != 0; --engineCount) { + const auto randomValues = generate(*rootEngine); + std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); + engines.emplace_back(seedSeq); + } + } + + RandomEngine &engine(std::size_t index) + { + return engines.at(index); + } + +private: + // In this test we don't really care about uniformly choosing an initial state + // from the entire state-space of the engine. It is possible to generate more + // random numbers at the cost of performance and system entropy pool exhaustion. + static constexpr std::size_t rootSeedCount { 8 }; + static constexpr std::size_t seedCount { 32 }; + + template + static std::array generate(Generator &generator) + { + std::array result; + for (auto &value : result) + value = generator(); + return result; + } + + std::unique_ptr rootEngine; + std::vector engines; +}; + +//! Calls random member functions of ConcurrentQueue for a limited time. +//! Ensures that total equals 0 when all jobs are complete/canceled by: +//! * setting each job's summand to 1; +//! * subtracting a job set's size from total before enqueuing jobs in the set; +//! * adding canceled job count to total after cancelation. +class RandomCaller +{ +public: + explicit RandomCaller(ConcurrentQueue &queue, Total &total, int threadId, + int queueThreadCount, RandomEngine &engine, + bool boostEnqueueOperationWeight) + : queue(queue), + total(total), + threadId { threadId }, + boostEnqueueOperationWeight { boostEnqueueOperationWeight }, + printer(total, threadId, queueThreadCount), + engine(engine) + { + } + + void operator()() + { + constexpr auto testDuration = chrono::milliseconds(10); + const auto testStartTime = Clock::now(); + + auto operation = operationDistribution(); + do { + switch (operation(engine)) { + case 0: + enqueue(); + break; + case 1: + cancel(); + break; + case 2: + waitAndPrint(queue, printer); + break; + default: + qFatal("Unsupported operation."); + } + } while (Clock::now() - testStartTime < testDuration); + } + +private: + int randomInt(int a, int b) + { + return uniformInt(engine, decltype(uniformInt)::param_type(a, b)); + } + + using OperationDistribution = std::discrete_distribution; + + void printProbabilities(const OperationDistribution &distribution) const + { + auto p = distribution.probabilities(); + constexpr std::size_t expectedProbabilityCount { 3 }; + if (p.size() != expectedProbabilityCount) + qFatal("Wrong number of operation probabilities: %zu != %zu", p.size(), expectedProbabilityCount); + + for (auto &x : p) + x *= 100; // Convert to percentages. + log() << QStringLiteral("#%1 operation probabilities: e=%2%, c=%3%, w=%4%.").arg(threadId).arg(p[0]).arg(p[1]).arg(p[2]); + } + + OperationDistribution operationDistribution() + { + auto distribution = boostEnqueueOperationWeight ? boostedEnqueueOperationDistribution() + : almostUniformOperationDistribution(); + printProbabilities(distribution); + return distribution; + } + + OperationDistribution almostUniformOperationDistribution() + { + constexpr int sumOfProbabilities { 100 }; + const auto enqueueProbability = randomInt(0, sumOfProbabilities); + const auto cancelProbability = randomInt(0, sumOfProbabilities - enqueueProbability); + const auto waitProbability = sumOfProbabilities - enqueueProbability - cancelProbability; + if (enqueueProbability + cancelProbability + waitProbability != sumOfProbabilities) + qFatal("The sum of probabilities is not 100%%."); + + const auto real = [](int x) { return static_cast(x); }; + return { real(enqueueProbability), real(cancelProbability), real(waitProbability) }; + } + + OperationDistribution boostedEnqueueOperationDistribution() + { + // Make enqueue the most frequent operation to stress-test executing + // jobs rather than canceling them almost immediately. + const auto enqueueWeight = std::lognormal_distribution(2, 0.5)(engine); + const auto cancelWeight = 1.0; + // Waiting is uninteresting as it doesn't even modify the queue => make it rare. + const auto waitWeight = std::uniform_real_distribution(0, 0.2)(engine); + + return { enqueueWeight, cancelWeight, waitWeight }; + } + + JobDataSet createJobs() + { + constexpr int minJobCount { 1 }, maxJobCount { 5 }; + JobDataSet jobs(randomInt(minJobCount, maxJobCount)); + for (auto &job : jobs) { + constexpr int minSleepingTime { 0 }, maxSleepingTime { 5 }; + const auto sleepingTime = randomInt(minSleepingTime, maxSleepingTime); + job = { 1, sleepingTime * chrono::microseconds(1) }; + } + return jobs; + } + + void enqueue() + { + const auto jobs = createJobs(); + total -= jobs.size(); + Enqueuer(queue, total, jobs, threadId)(); + } + + void cancel() + { + const auto canceledCount = cancelAndPrint(queue, printer); + total += canceledCount; + } + + ConcurrentQueue &queue; + Total &total; + const int threadId; + const bool boostEnqueueOperationWeight; + const QueueControlMessagePrinter printer; + RandomEngine &engine; + std::uniform_int_distribution uniformInt; +}; + +} + +Q_DECLARE_METATYPE(Clock::duration) +Q_DECLARE_METATYPE(JobData) + +class ConcurrentQueueTest : public QObject +{ + Q_OBJECT +private slots: + void init(); + + void singleUserThread_data(); + void singleUserThread(); + + void multipleUserThreads_data(); + void multipleUserThreads(); + + void cancelPending1UserThread_data(); + void cancelPending1UserThread(); + + void waitAllFromMultipleThreads_data(); + void waitAllFromMultipleThreads(); + + void randomCalls_data(); + void randomCalls(); + +private: + static constexpr int primaryThreadId { 0 }; + + QueueControlMessagePrinter makeMessagePrinter(int threadCount) const + { + return QueueControlMessagePrinter(total, primaryThreadId, threadCount); + } + + Total total { 0 }; + RandomEngineProvider randomEngineProvider; +}; + +void ConcurrentQueueTest::init() +{ + total = 0; +} + +void ConcurrentQueueTest::singleUserThread_data() +{ + QTest::addColumn("threadCount"); + QTest::addColumn("jobs"); + + using ms = chrono::milliseconds; + + QTest::newRow("-") << 0 << JobDataSet {}; + QTest::newRow("0") << 7 << JobDataSet {}; + QTest::newRow("A") << 1 << JobDataSet { { 5, ms(0) } }; + QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } }; + QTest::newRow("C") << 1 << JobDataSet { { 1, ms(0) }, { 5, ms(2) }, { 3, ms(1) } }; + QTest::newRow("D") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } }; + QTest::newRow("E") << 2 << JobDataSet { { 1, ms(2) }, { 2, ms(1) } }; + QTest::newRow("F") << 3 << JobDataSet { { 8, ms(3) }, { 5, ms(4) }, { 2, ms(1) }, { 11, ms(1) }, { 100, ms(3) } }; +} + +void ConcurrentQueueTest::singleUserThread() +{ + QFETCH(const int, threadCount); + QFETCH(const JobDataSet, jobs); + + const auto printer = makeMessagePrinter(threadCount); + + ConcurrentQueue queue(threadCount); + printer.printStartedMessage(); + + Enqueuer(queue, total, jobs, primaryThreadId)(); + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), expectedTotal(jobs)); +} + +void ConcurrentQueueTest::multipleUserThreads_data() +{ + QTest::addColumn("threadCount"); + QTest::addColumn>("jobs"); + + using ms = chrono::milliseconds; + + JobDataSet jobs1 { { 1, ms(1) } }; + JobDataSet jobs2 { { 2, ms(4) } }; + QVector allJobs { jobs1, jobs2 }; + QTest::newRow("A1") << 1 << allJobs; + QTest::newRow("A2") << 2 << allJobs; + + jobs1.push_back({ 5, ms(3) }); + jobs2.push_back({ 10, ms(1) }); + allJobs = { jobs1, jobs2 }; + QTest::newRow("B1") << 2 << allJobs; + QTest::newRow("B2") << 3 << allJobs; + QTest::newRow("B3") << 8 << allJobs; + + jobs1.push_back({ 20, ms(0) }); + jobs2.push_back({ 40, ms(2) }); + allJobs = { jobs1, jobs2 }; + QTest::newRow("C") << 4 << allJobs; + + JobDataSet jobs3 { { 80, ms(0) }, { 160, ms(2) }, { 320, ms(1) }, { 640, ms(0) }, { 2000, ms(3) } }; + allJobs.push_back(jobs3); + QTest::newRow("D1") << 3 << allJobs; + QTest::newRow("D2") << 5 << allJobs; + + JobDataSet jobs4 { { 4000, ms(1) }, { 8000, ms(3) } }; + allJobs.push_back(jobs4); + QTest::newRow("E1") << 4 << allJobs; + QTest::newRow("E2") << 6 << allJobs; +} + +void ConcurrentQueueTest::multipleUserThreads() +{ + QFETCH(const int, threadCount); + QFETCH(const QVector, jobs); + + const auto printer = makeMessagePrinter(threadCount); + + ConcurrentQueue queue(threadCount); + printer.printStartedMessage(); + + if (!jobs.empty()) { + std::vector enqueuerThreads; + enqueuerThreads.reserve(jobs.size() - 1); + for (int i = 1; i < jobs.size(); ++i) + enqueuerThreads.emplace_back(Enqueuer(queue, total, jobs.at(i), i)); + + Enqueuer(queue, total, jobs.constFirst(), primaryThreadId)(); + for (auto &t : enqueuerThreads) + t.join(); + } + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), expectedTotal(jobs)); +} + +void ConcurrentQueueTest::cancelPending1UserThread_data() +{ + QTest::addColumn("threadCount"); + QTest::addColumn("jobs"); + QTest::addColumn("cancelDelay"); + + const auto ms = [](int count) -> Clock::duration { return chrono::milliseconds(count); }; + const auto us = [](int count) -> Clock::duration { return chrono::microseconds(count); }; + + QTest::newRow("-") << 0 << JobDataSet {} << ms(0); + QTest::newRow("01") << 2 << JobDataSet {} << ms(0); + QTest::newRow("02") << 3 << JobDataSet {} << ms(1); + QTest::newRow("A") << 1 << JobDataSet { { 5, ms(3) } } << ms(1); + QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } } << ms(1); + + JobDataSet dataSet { { 1, ms(3) }, { 5, ms(2) }, { 3, ms(1) } }; + QTest::newRow("C1") << 1 << dataSet << ms(1); + QTest::newRow("C2") << 1 << dataSet << ms(4); + QTest::newRow("C3") << 2 << dataSet << ms(1); + QTest::newRow("C4") << 3 << dataSet << ms(1); + QTest::newRow("C5") << 1 << dataSet << ms(7); + + dataSet.push_back({ 10, ms(5) }); + dataSet.push_back({ 20, ms(8) }); + dataSet.push_back({ 40, ms(20) }); + dataSet.push_back({ 80, ms(2) }); + QTest::newRow("D1") << 1 << dataSet << ms(1); + QTest::newRow("D2") << 1 << dataSet << ms(15); + QTest::newRow("D3") << 1 << dataSet << ms(50); + QTest::newRow("D4") << 2 << dataSet << ms(4); + QTest::newRow("D5") << 3 << dataSet << ms(4); + QTest::newRow("D6") << 4 << dataSet << ms(4); + QTest::newRow("D7") << 2 << dataSet << us(300); + QTest::newRow("D8") << 3 << dataSet << us(500); + QTest::newRow("D9") << 4 << dataSet << us(700); + + QTest::newRow("E") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } } << ms(1); +} + +void ConcurrentQueueTest::cancelPending1UserThread() +{ + QFETCH(const int, threadCount); + QFETCH(const JobDataSet, jobs); + QFETCH(const Clock::duration, cancelDelay); + + const auto printer = makeMessagePrinter(threadCount); + + ConcurrentQueue queue(threadCount); + printer.printStartedMessage(); + + Enqueuer(queue, total, jobs, primaryThreadId)(); + + std::this_thread::sleep_for(cancelDelay); + const auto canceledCount = cancelAndPrint(queue, printer); + QVERIFY(canceledCount <= static_cast(jobs.size())); + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), expectedTotal(jobs, canceledCount)); +} + +void ConcurrentQueueTest::waitAllFromMultipleThreads_data() +{ + QTest::addColumn("waitingThreadCount"); + for (int i : { 1, 2, 4, 7, 19 }) + QTest::addRow("%d", i) << i; +} + +void ConcurrentQueueTest::waitAllFromMultipleThreads() +{ + QFETCH(const int, waitingThreadCount); + QVERIFY(waitingThreadCount > 0); + + constexpr auto queueThreadCount = 2; + const auto printer = makeMessagePrinter(queueThreadCount); + + ConcurrentQueue queue(queueThreadCount); + printer.printStartedMessage(); + + using ms = chrono::milliseconds; + const JobDataSet jobs { { 5, ms(1) }, { 7, ms(2) } }; + Enqueuer(queue, total, jobs, primaryThreadId)(); + + std::vector waitingThreads; + waitingThreads.reserve(waitingThreadCount - 1); + for (int id = 1; id < waitingThreadCount; ++id) { + waitingThreads.emplace_back([=, &queue] { + waitAndPrint(queue, QueueControlMessagePrinter(total, id, queueThreadCount)); + }); + } + + waitAndPrint(queue, printer); + + for (auto &t : waitingThreads) + t.join(); + + QCOMPARE(total.load(), expectedTotal(jobs)); +} + +void ConcurrentQueueTest::randomCalls_data() +{ + QTest::addColumn("queueThreadCount"); + QTest::addColumn("userThreadCount"); + QTest::addColumn("boostEnqueueOperationWeight"); + + const auto suffix = [](bool boost) { return boost ? " +enqueue" : ""; }; + + for (bool boost : { false, true }) + for (int q : { 1, 2, 4, 9, 12, 20 }) + for (int u : { 1, 2, 4, 7, 11, 18 }) + QTest::addRow("queue{%d}; %d user thread(s)%s", q, u, suffix(boost)) << q << u << boost; +} + +void ConcurrentQueueTest::randomCalls() +{ + QFETCH(const int, queueThreadCount); + QFETCH(const int, userThreadCount); + QVERIFY(userThreadCount > 0); + QFETCH(const bool, boostEnqueueOperationWeight); + + const auto printer = makeMessagePrinter(queueThreadCount); + + ConcurrentQueue queue(queueThreadCount); + printer.printStartedMessage(); + + randomEngineProvider.resetEngines(userThreadCount); + + std::vector userThreads; + userThreads.reserve(userThreadCount - 1); + for (int id = 1; id < userThreadCount; ++id) { + userThreads.emplace_back(RandomCaller(queue, total, id, queueThreadCount, + randomEngineProvider.engine(id), + boostEnqueueOperationWeight)); + } + RandomCaller(queue, total, primaryThreadId, queueThreadCount, + randomEngineProvider.engine(primaryThreadId), + boostEnqueueOperationWeight)(); + + for (auto &t : userThreads) + t.join(); + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), 0); +} + +QTEST_APPLESS_MAIN(ConcurrentQueueTest) + +#include "concurrent_queue_test.moc" diff --git a/tests/concurrent_queue_test/concurrent_queue_test.pro b/tests/concurrent_queue_test/concurrent_queue_test.pro new file mode 100644 index 00000000..71403911 --- /dev/null +++ b/tests/concurrent_queue_test/concurrent_queue_test.pro @@ -0,0 +1,9 @@ +include(../qt_test.pri) + +PATH_TO_common = ../../common + +INCLUDEPATH += $$PATH_TO_common +HEADERS += $${PATH_TO_common}/concurrent_queue.h +SOURCES += \ + $${PATH_TO_common}/concurrent_queue.cpp \ + concurrent_queue_test.cpp diff --git a/tests/qt_test.pri b/tests/qt_test.pri new file mode 100644 index 00000000..1005fa03 --- /dev/null +++ b/tests/qt_test.pri @@ -0,0 +1,9 @@ +QT += testlib +QT -= gui + +CONFIG += qt console warn_on testcase no_testcase_installs +CONFIG -= app_bundle + +TEMPLATE = app + +include(../config.pri) diff --git a/tests/tests.pro b/tests/tests.pro new file mode 100644 index 00000000..15317f23 --- /dev/null +++ b/tests/tests.pro @@ -0,0 +1,2 @@ +TEMPLATE = subdirs +SUBDIRS += concurrent_queue_test