diff --git a/tests/concurrent_queue_test/concurrent_queue_test.cpp b/tests/concurrent_queue_test/concurrent_queue_test.cpp index f3fa5cfc..f79b9754 100644 --- a/tests/concurrent_queue_test/concurrent_queue_test.cpp +++ b/tests/concurrent_queue_test/concurrent_queue_test.cpp @@ -9,9 +9,13 @@ #include #include +#include #include #include +#include +#include #include +#include #include #include #include @@ -33,6 +37,15 @@ QString currentThreadInfo() return QString::fromStdString(os.str()); } +//! This test prints thousands of lines of detailed output. The output allows to analyze +//! how ConcurrentQueue is being tested, how it works and why the test fails or crashes +//! (normally it passes). The default maximum number of warnings in Qt Test is 2000, +//! which is too low for this test. Therefore, the following warning is printed before +//! the log output is suppressed: "Maximum amount of warnings exceeded. Use -maxwarnings +//! to override.". Passing `-maxwarnings 100000` command line option to the test lets it +//! print everything. Passing -silent command line option to the test suppresses all its +//! output except for RandomEngineProvider's root seeds, which are necessary to reproduce +//! interesting test results. QDebug log() { return qInfo().noquote() << currentThreadInfo() << '|' @@ -196,6 +209,167 @@ void waitAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &prin printer.printEndWaitingMessage(); } +template +QDebug operator<<(QDebug debug, const std::array &array) +{ + QDebugStateSaver saver(debug); + debug.nospace(); + + debug << '('; + if (size != 0) { + debug << array.front(); + for (std::size_t i = 1; i != size; ++i) + debug << ", " << array[i]; + } + debug << ')'; + + return debug; +} + +using RandomEngine = std::mt19937_64; + +class RandomEngineProvider +{ +public: + RandomEngineProvider() + { + std::random_device rd; + const auto randomValues = generate(rd); + // Qt Test does not suppress output from the constructor of a test class + // even when -silent command line option is passed. This is fortunate + // because the root seeds can be used to reproduce a test failure. + log() << "RandomEngineProvider's root seeds:" << randomValues; + std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); + rootEngine.reset(new std::mt19937(seedSeq)); + } + + void resetEngines(std::size_t engineCount) + { + engines.clear(); + engines.reserve(engineCount); + for (; engineCount != 0; --engineCount) { + const auto randomValues = generate(*rootEngine); + std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); + engines.emplace_back(seedSeq); + } + } + + RandomEngine &engine(std::size_t index) + { + return engines.at(index); + } + +private: + // In this test we don't really care about uniformly choosing an initial state + // from the entire state-space of the engine. It is possible to generate more + // random numbers at the cost of performance and system entropy pool exhaustion. + static constexpr std::size_t rootSeedCount { 8 }; + static constexpr std::size_t seedCount { 32 }; + + template + static std::array generate(Generator &generator) + { + std::array result; + for (auto &value : result) + value = generator(); + return result; + } + + std::unique_ptr rootEngine; + std::vector engines; +}; + +//! Calls random member functions of ConcurrentQueue for a limited time. +//! Ensures that total equals 0 when all jobs are complete/canceled by: +//! * setting each job's summand to 1; +//! * subtracting a job set's size from total before enqueuing jobs in the set; +//! * adding canceled job count to total after cancelation. +class RandomCaller +{ +public: + explicit RandomCaller(ConcurrentQueue &queue, Total &total, int threadId, + int queueThreadCount, RandomEngine &engine) + : queue(queue), total(total), threadId { threadId }, printer(total, threadId, queueThreadCount), engine(engine) + { + } + + void operator()() + { + constexpr auto testDuration = chrono::milliseconds(10); + const auto testStartTime = Clock::now(); + + auto operation = operationDistribution(); + do { + switch (operation(engine)) { + case 0: + enqueue(); + break; + case 1: + cancel(); + break; + case 2: + waitAndPrint(queue, printer); + break; + default: + qFatal("Unsupported operation."); + } + } while (Clock::now() - testStartTime < testDuration); + } + +private: + int randomInt(int a, int b) + { + return uniformInt(engine, decltype(uniformInt)::param_type(a, b)); + } + + std::discrete_distribution operationDistribution() + { + constexpr int sumOfProbabilities { 100 }; + const auto enqueueProbability = randomInt(0, sumOfProbabilities); + const auto cancelProbability = randomInt(0, sumOfProbabilities - enqueueProbability); + const auto waitProbability = sumOfProbabilities - enqueueProbability - cancelProbability; + + log() << QStringLiteral("#%1 operation weights: %2%, %3%, %4%.").arg(threadId).arg(enqueueProbability).arg(cancelProbability).arg(waitProbability); + if (enqueueProbability + cancelProbability + waitProbability != sumOfProbabilities) + qFatal("The sum of probabilities is not 100%%."); + + const auto real = [](int x) { return static_cast(x); }; + return { real(enqueueProbability), real(cancelProbability), real(waitProbability) }; + } + + JobDataSet createJobs() + { + constexpr int minJobCount { 1 }, maxJobCount { 5 }; + JobDataSet jobs(randomInt(minJobCount, maxJobCount)); + for (auto &job : jobs) { + constexpr int minSleepingTime { 0 }, maxSleepingTime { 5 }; + const auto sleepingTime = randomInt(minSleepingTime, maxSleepingTime); + job = { 1, sleepingTime * chrono::microseconds(1) }; + } + return jobs; + } + + void enqueue() + { + const auto jobs = createJobs(); + total -= jobs.size(); + Enqueuer(queue, total, jobs, threadId)(); + } + + void cancel() + { + const auto canceledCount = cancelAndPrint(queue, printer); + total += canceledCount; + } + + ConcurrentQueue &queue; + Total &total; + const int threadId; + const QueueControlMessagePrinter printer; + RandomEngine &engine; + std::uniform_int_distribution uniformInt; +}; + } Q_DECLARE_METATYPE(Clock::duration) @@ -219,6 +393,9 @@ private slots: void waitAllFromMultipleThreads_data(); void waitAllFromMultipleThreads(); + void randomCalls_data(); + void randomCalls(); + private: static constexpr int primaryThreadId { 0 }; @@ -228,6 +405,7 @@ private: } Total total { 0 }; + RandomEngineProvider randomEngineProvider; }; void ConcurrentQueueTest::init() @@ -430,6 +608,46 @@ void ConcurrentQueueTest::waitAllFromMultipleThreads() QCOMPARE(total.load(), expectedTotal(jobs)); } +void ConcurrentQueueTest::randomCalls_data() +{ + QTest::addColumn("queueThreadCount"); + QTest::addColumn("userThreadCount"); + + for (int q : { 1, 2, 3, 4, 8, 12, 16, 20 }) + for (int u : { 1, 2, 3, 4, 7, 11, 18 }) + QTest::addRow("queue{%d}; %d user thread(s)", q, u) << q << u; +} + +void ConcurrentQueueTest::randomCalls() +{ + QFETCH(const int, queueThreadCount); + QFETCH(const int, userThreadCount); + QVERIFY(userThreadCount > 0); + + const auto printer = makeMessagePrinter(queueThreadCount); + + ConcurrentQueue queue(queueThreadCount); + printer.printStartedMessage(); + + randomEngineProvider.resetEngines(userThreadCount); + + std::vector userThreads; + userThreads.reserve(userThreadCount - 1); + for (int id = 1; id < userThreadCount; ++id) { + userThreads.emplace_back(RandomCaller(queue, total, id, queueThreadCount, + randomEngineProvider.engine(id))); + } + RandomCaller(queue, total, primaryThreadId, queueThreadCount, + randomEngineProvider.engine(primaryThreadId))(); + + for (auto &t : userThreads) + t.join(); + + waitAndPrint(queue, printer); + + QCOMPARE(total.load(), 0); +} + QTEST_APPLESS_MAIN(ConcurrentQueueTest) #include "concurrent_queue_test.moc"