#include "concurrent_queue.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace chrono = std::chrono; using Clock = chrono::steady_clock; using YACReader::ConcurrentQueue; namespace { double toMilliseconds(Clock::duration duration) { return chrono::duration_cast(duration).count() / 1000.0; } QString currentThreadInfo() { std::ostringstream os; os << std::this_thread::get_id(); return QString::fromStdString(os.str()); } //! This test prints thousands of lines of detailed output. The output allows to analyze //! how ConcurrentQueue is being tested, how it works and why the test fails or crashes //! (normally it passes). The default maximum number of warnings in Qt Test is 2000, //! which is too low for this test. Therefore, the following warning is printed before //! the log output is suppressed: "Maximum amount of warnings exceeded. Use -maxwarnings //! to override.". Passing `-maxwarnings 100000` command line option to the test lets it //! print everything. Passing -silent command line option to the test suppresses all its //! output except for RandomEngineProvider's root seeds, which are necessary to reproduce //! interesting test results. QDebug log() { return qInfo().noquote() << currentThreadInfo() << '|' << QTime::currentTime().toString(Qt::ISODateWithMs) << '|'; } using Total = std::atomic; struct JobData { int summand; Clock::duration sleepingTime; }; using JobDataSet = QVector; int expectedTotal(JobDataSet::const_iterator first, JobDataSet::const_iterator last) { return std::accumulate(first, last, 0, [](int total, JobData job) { return total + job.summand; }); } int expectedTotal(const JobDataSet &jobs) { return expectedTotal(jobs.cbegin(), jobs.cend()); } int expectedTotal(const JobDataSet &jobs, std::size_t canceledCount) { const auto count = jobs.size() - static_cast(canceledCount); if (count < 0) qFatal("Canceled more than the total number of jobs somehow!"); return expectedTotal(jobs.cbegin(), jobs.cbegin() + count); } int expectedTotal(const QVector &jobs) { return std::accumulate(jobs.cbegin(), jobs.cend(), 0, [](int total, const JobDataSet &dataSet) { return total + expectedTotal(dataSet); }); } class Id { public: explicit Id(int threadId, int jobId) : threadId { threadId }, jobId { jobId } { } QString toString() const { return QStringLiteral("[%1.%2]").arg(threadId).arg(jobId); } private: const int threadId; const int jobId; }; QDebug operator<<(QDebug debug, Id id) { QDebugStateSaver saver(debug); debug.noquote() << id.toString(); return debug; } class Job { public: explicit Job(Total &total, JobData data, Id id) : total { total }, data { data }, id { id } { } void operator()() { log().nospace() << id << " sleep " << toMilliseconds(data.sleepingTime) << " ms..."; std::this_thread::sleep_for(data.sleepingTime); const auto updatedTotal = (total += data.summand); log().nospace() << id << " +" << data.summand << " => " << updatedTotal; } private: Total &total; const JobData data; const Id id; }; class Enqueuer { public: explicit Enqueuer(ConcurrentQueue &queue, Total &total, const JobDataSet &jobs, int threadId) : queue { queue }, total { total }, jobs { jobs }, threadId { threadId } { } void operator()() { const char *const jobStr = jobs.size() == 1 ? "job" : "jobs"; log() << QStringLiteral("#%1 enqueuing %2 %3...").arg(threadId).arg(jobs.size()).arg(jobStr); for (int i = 0; i < jobs.size(); ++i) queue.enqueue(Job(total, jobs.at(i), Id(threadId, i + 1))); log() << QStringLiteral("#%1 enqueuing complete.").arg(threadId); } private: ConcurrentQueue &queue; Total &total; const JobDataSet jobs; const int threadId; }; class QueueControlMessagePrinter { public: explicit QueueControlMessagePrinter(const Total &total, int threadId, int threadCount) : total { total }, threadId { threadId }, threadCount { threadCount } { } void printStartedMessage() const { log() << threadMessageFormatString().arg("started"); } void printCanceledMessage(std::size_t canceledCount) const { const char *const jobStr = canceledCount == 1 ? "job" : "jobs"; const auto format = messageFormatString().arg("%1 %2 %3"); log() << format.arg("canceled").arg(canceledCount).arg(jobStr); } void printBeginWaitingMessage() const { log() << threadMessageFormatString().arg("begin waiting for"); } void printEndWaitingMessage() const { log() << threadMessageFormatString().arg("end waiting for"); } private: QString messageFormatString() const { return QStringLiteral("#%1 %3 => %2").arg(threadId).arg(total.load()); } QString threadMessageFormatString() const { const char *const threadStr = threadCount == 1 ? "thread" : "threads"; const auto format = messageFormatString().arg("%3 %1 %2"); return format.arg(threadCount).arg(threadStr); } const Total &total; const int threadId; const int threadCount; }; std::size_t cancelAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) { const auto canceledCount = queue.cancelPending(); printer.printCanceledMessage(canceledCount); return canceledCount; } void waitAndPrint(const ConcurrentQueue &queue, const QueueControlMessagePrinter &printer) { printer.printBeginWaitingMessage(); queue.waitAll(); printer.printEndWaitingMessage(); } template QDebug operator<<(QDebug debug, const std::array &array) { QDebugStateSaver saver(debug); debug.nospace(); debug << '('; if (size != 0) { debug << array.front(); for (std::size_t i = 1; i != size; ++i) debug << ", " << array[i]; } debug << ')'; return debug; } using RandomEngine = std::mt19937_64; class RandomEngineProvider { public: RandomEngineProvider() { std::random_device rd; const auto randomValues = generate(rd); // Qt Test does not suppress output from the constructor of a test class // even when -silent command line option is passed. This is fortunate // because the root seeds can be used to reproduce a test failure. log() << "RandomEngineProvider's root seeds:" << randomValues; std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); rootEngine.reset(new std::mt19937(seedSeq)); } void resetEngines(std::size_t engineCount) { engines.clear(); engines.reserve(engineCount); for (; engineCount != 0; --engineCount) { const auto randomValues = generate(*rootEngine); std::seed_seq seedSeq(randomValues.begin(), randomValues.end()); engines.emplace_back(seedSeq); } } RandomEngine &engine(std::size_t index) { return engines.at(index); } private: // In this test we don't really care about uniformly choosing an initial state // from the entire state-space of the engine. It is possible to generate more // random numbers at the cost of performance and system entropy pool exhaustion. static constexpr std::size_t rootSeedCount { 8 }; static constexpr std::size_t seedCount { 32 }; template static std::array generate(Generator &generator) { std::array result; for (auto &value : result) value = generator(); return result; } std::unique_ptr rootEngine; std::vector engines; }; //! Calls random member functions of ConcurrentQueue for a limited time. //! Ensures that total equals 0 when all jobs are complete/canceled by: //! * setting each job's summand to 1; //! * subtracting a job set's size from total before enqueuing jobs in the set; //! * adding canceled job count to total after cancelation. class RandomCaller { public: explicit RandomCaller(ConcurrentQueue &queue, Total &total, int threadId, int queueThreadCount, RandomEngine &engine, bool boostEnqueueOperationWeight) : queue(queue), total(total), threadId { threadId }, boostEnqueueOperationWeight { boostEnqueueOperationWeight }, printer(total, threadId, queueThreadCount), engine(engine) { } void operator()() { constexpr auto testDuration = chrono::milliseconds(10); const auto testStartTime = Clock::now(); auto operation = operationDistribution(); do { switch (operation(engine)) { case 0: enqueue(); break; case 1: cancel(); break; case 2: waitAndPrint(queue, printer); break; default: qFatal("Unsupported operation."); } } while (Clock::now() - testStartTime < testDuration); } private: int randomInt(int a, int b) { return uniformInt(engine, decltype(uniformInt)::param_type(a, b)); } using OperationDistribution = std::discrete_distribution; void printProbabilities(const OperationDistribution &distribution) const { auto p = distribution.probabilities(); constexpr std::size_t expectedProbabilityCount { 3 }; if (p.size() != expectedProbabilityCount) qFatal("Wrong number of operation probabilities: %zu != %zu", p.size(), expectedProbabilityCount); for (auto &x : p) x *= 100; // Convert to percentages. log() << QStringLiteral("#%1 operation probabilities: e=%2%, c=%3%, w=%4%.").arg(threadId).arg(p[0]).arg(p[1]).arg(p[2]); } OperationDistribution operationDistribution() { auto distribution = boostEnqueueOperationWeight ? boostedEnqueueOperationDistribution() : almostUniformOperationDistribution(); printProbabilities(distribution); return distribution; } OperationDistribution almostUniformOperationDistribution() { constexpr int sumOfProbabilities { 100 }; const auto enqueueProbability = randomInt(0, sumOfProbabilities); const auto cancelProbability = randomInt(0, sumOfProbabilities - enqueueProbability); const auto waitProbability = sumOfProbabilities - enqueueProbability - cancelProbability; if (enqueueProbability + cancelProbability + waitProbability != sumOfProbabilities) qFatal("The sum of probabilities is not 100%%."); const auto real = [](int x) { return static_cast(x); }; return { real(enqueueProbability), real(cancelProbability), real(waitProbability) }; } OperationDistribution boostedEnqueueOperationDistribution() { // Make enqueue the most frequent operation to stress-test executing // jobs rather than canceling them almost immediately. const auto enqueueWeight = std::lognormal_distribution(2, 0.5)(engine); const auto cancelWeight = 1.0; // Waiting is uninteresting as it doesn't even modify the queue => make it rare. const auto waitWeight = std::uniform_real_distribution(0, 0.2)(engine); return { enqueueWeight, cancelWeight, waitWeight }; } JobDataSet createJobs() { constexpr int minJobCount { 1 }, maxJobCount { 5 }; JobDataSet jobs(randomInt(minJobCount, maxJobCount)); for (auto &job : jobs) { constexpr int minSleepingTime { 0 }, maxSleepingTime { 5 }; const auto sleepingTime = randomInt(minSleepingTime, maxSleepingTime); job = { 1, sleepingTime * chrono::microseconds(1) }; } return jobs; } void enqueue() { const auto jobs = createJobs(); total -= jobs.size(); Enqueuer(queue, total, jobs, threadId)(); } void cancel() { const auto canceledCount = cancelAndPrint(queue, printer); total += canceledCount; } ConcurrentQueue &queue; Total &total; const int threadId; const bool boostEnqueueOperationWeight; const QueueControlMessagePrinter printer; RandomEngine &engine; std::uniform_int_distribution uniformInt; }; } Q_DECLARE_METATYPE(Clock::duration) Q_DECLARE_METATYPE(JobData) class ConcurrentQueueTest : public QObject { Q_OBJECT private slots: void init(); void singleUserThread_data(); void singleUserThread(); void multipleUserThreads_data(); void multipleUserThreads(); void cancelPending1UserThread_data(); void cancelPending1UserThread(); void waitAllFromMultipleThreads_data(); void waitAllFromMultipleThreads(); void randomCalls_data(); void randomCalls(); private: static constexpr int primaryThreadId { 0 }; QueueControlMessagePrinter makeMessagePrinter(int threadCount) const { return QueueControlMessagePrinter(total, primaryThreadId, threadCount); } Total total { 0 }; RandomEngineProvider randomEngineProvider; }; void ConcurrentQueueTest::init() { total = 0; } void ConcurrentQueueTest::singleUserThread_data() { QTest::addColumn("threadCount"); QTest::addColumn("jobs"); using ms = chrono::milliseconds; QTest::newRow("-") << 0 << JobDataSet {}; QTest::newRow("0") << 7 << JobDataSet {}; QTest::newRow("A") << 1 << JobDataSet { { 5, ms(0) } }; QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } }; QTest::newRow("C") << 1 << JobDataSet { { 1, ms(0) }, { 5, ms(2) }, { 3, ms(1) } }; QTest::newRow("D") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } }; QTest::newRow("E") << 2 << JobDataSet { { 1, ms(2) }, { 2, ms(1) } }; QTest::newRow("F") << 3 << JobDataSet { { 8, ms(3) }, { 5, ms(4) }, { 2, ms(1) }, { 11, ms(1) }, { 100, ms(3) } }; } void ConcurrentQueueTest::singleUserThread() { QFETCH(const int, threadCount); QFETCH(const JobDataSet, jobs); const auto printer = makeMessagePrinter(threadCount); ConcurrentQueue queue(threadCount); printer.printStartedMessage(); Enqueuer(queue, total, jobs, primaryThreadId)(); waitAndPrint(queue, printer); QCOMPARE(total.load(), expectedTotal(jobs)); } void ConcurrentQueueTest::multipleUserThreads_data() { QTest::addColumn("threadCount"); QTest::addColumn>("jobs"); using ms = chrono::milliseconds; JobDataSet jobs1 { { 1, ms(1) } }; JobDataSet jobs2 { { 2, ms(4) } }; QVector allJobs { jobs1, jobs2 }; QTest::newRow("A1") << 1 << allJobs; QTest::newRow("A2") << 2 << allJobs; jobs1.push_back({ 5, ms(3) }); jobs2.push_back({ 10, ms(1) }); allJobs = { jobs1, jobs2 }; QTest::newRow("B1") << 2 << allJobs; QTest::newRow("B2") << 3 << allJobs; QTest::newRow("B3") << 8 << allJobs; jobs1.push_back({ 20, ms(0) }); jobs2.push_back({ 40, ms(2) }); allJobs = { jobs1, jobs2 }; QTest::newRow("C") << 4 << allJobs; JobDataSet jobs3 { { 80, ms(0) }, { 160, ms(2) }, { 320, ms(1) }, { 640, ms(0) }, { 2000, ms(3) } }; allJobs.push_back(jobs3); QTest::newRow("D1") << 3 << allJobs; QTest::newRow("D2") << 5 << allJobs; JobDataSet jobs4 { { 4000, ms(1) }, { 8000, ms(3) } }; allJobs.push_back(jobs4); QTest::newRow("E1") << 4 << allJobs; QTest::newRow("E2") << 6 << allJobs; } void ConcurrentQueueTest::multipleUserThreads() { QFETCH(const int, threadCount); QFETCH(const QVector, jobs); const auto printer = makeMessagePrinter(threadCount); ConcurrentQueue queue(threadCount); printer.printStartedMessage(); if (!jobs.empty()) { std::vector enqueuerThreads; enqueuerThreads.reserve(jobs.size() - 1); for (int i = 1; i < jobs.size(); ++i) enqueuerThreads.emplace_back(Enqueuer(queue, total, jobs.at(i), i)); Enqueuer(queue, total, jobs.constFirst(), primaryThreadId)(); for (auto &t : enqueuerThreads) t.join(); } waitAndPrint(queue, printer); QCOMPARE(total.load(), expectedTotal(jobs)); } void ConcurrentQueueTest::cancelPending1UserThread_data() { QTest::addColumn("threadCount"); QTest::addColumn("jobs"); QTest::addColumn("cancelDelay"); const auto ms = [](int count) -> Clock::duration { return chrono::milliseconds(count); }; const auto us = [](int count) -> Clock::duration { return chrono::microseconds(count); }; QTest::newRow("-") << 0 << JobDataSet {} << ms(0); QTest::newRow("01") << 2 << JobDataSet {} << ms(0); QTest::newRow("02") << 3 << JobDataSet {} << ms(1); QTest::newRow("A") << 1 << JobDataSet { { 5, ms(3) } } << ms(1); QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } } << ms(1); JobDataSet dataSet { { 1, ms(3) }, { 5, ms(2) }, { 3, ms(1) } }; QTest::newRow("C1") << 1 << dataSet << ms(1); QTest::newRow("C2") << 1 << dataSet << ms(4); QTest::newRow("C3") << 2 << dataSet << ms(1); QTest::newRow("C4") << 3 << dataSet << ms(1); QTest::newRow("C5") << 1 << dataSet << ms(7); dataSet.push_back({ 10, ms(5) }); dataSet.push_back({ 20, ms(8) }); dataSet.push_back({ 40, ms(20) }); dataSet.push_back({ 80, ms(2) }); QTest::newRow("D1") << 1 << dataSet << ms(1); QTest::newRow("D2") << 1 << dataSet << ms(15); QTest::newRow("D3") << 1 << dataSet << ms(50); QTest::newRow("D4") << 2 << dataSet << ms(4); QTest::newRow("D5") << 3 << dataSet << ms(4); QTest::newRow("D6") << 4 << dataSet << ms(4); QTest::newRow("D7") << 2 << dataSet << us(300); QTest::newRow("D8") << 3 << dataSet << us(500); QTest::newRow("D9") << 4 << dataSet << us(700); QTest::newRow("E") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } } << ms(1); } void ConcurrentQueueTest::cancelPending1UserThread() { QFETCH(const int, threadCount); QFETCH(const JobDataSet, jobs); QFETCH(const Clock::duration, cancelDelay); const auto printer = makeMessagePrinter(threadCount); ConcurrentQueue queue(threadCount); printer.printStartedMessage(); Enqueuer(queue, total, jobs, primaryThreadId)(); std::this_thread::sleep_for(cancelDelay); const auto canceledCount = cancelAndPrint(queue, printer); QVERIFY(canceledCount <= static_cast(jobs.size())); waitAndPrint(queue, printer); QCOMPARE(total.load(), expectedTotal(jobs, canceledCount)); } void ConcurrentQueueTest::waitAllFromMultipleThreads_data() { QTest::addColumn("waitingThreadCount"); for (int i : { 1, 2, 4, 7, 19 }) QTest::addRow("%d", i) << i; } void ConcurrentQueueTest::waitAllFromMultipleThreads() { QFETCH(const int, waitingThreadCount); QVERIFY(waitingThreadCount > 0); constexpr auto queueThreadCount = 2; const auto printer = makeMessagePrinter(queueThreadCount); ConcurrentQueue queue(queueThreadCount); printer.printStartedMessage(); using ms = chrono::milliseconds; const JobDataSet jobs { { 5, ms(1) }, { 7, ms(2) } }; Enqueuer(queue, total, jobs, primaryThreadId)(); std::vector waitingThreads; waitingThreads.reserve(waitingThreadCount - 1); for (int id = 1; id < waitingThreadCount; ++id) { waitingThreads.emplace_back([=, &queue] { waitAndPrint(queue, QueueControlMessagePrinter(total, id, queueThreadCount)); }); } waitAndPrint(queue, printer); for (auto &t : waitingThreads) t.join(); QCOMPARE(total.load(), expectedTotal(jobs)); } void ConcurrentQueueTest::randomCalls_data() { QTest::addColumn("queueThreadCount"); QTest::addColumn("userThreadCount"); QTest::addColumn("boostEnqueueOperationWeight"); const auto suffix = [](bool boost) { return boost ? " +enqueue" : ""; }; for (bool boost : { false, true }) for (int q : { 1, 2, 4, 9, 12, 20 }) for (int u : { 1, 2, 4, 7, 11, 18 }) QTest::addRow("queue{%d}; %d user thread(s)%s", q, u, suffix(boost)) << q << u << boost; } void ConcurrentQueueTest::randomCalls() { QFETCH(const int, queueThreadCount); QFETCH(const int, userThreadCount); QVERIFY(userThreadCount > 0); QFETCH(const bool, boostEnqueueOperationWeight); const auto printer = makeMessagePrinter(queueThreadCount); ConcurrentQueue queue(queueThreadCount); printer.printStartedMessage(); randomEngineProvider.resetEngines(userThreadCount); std::vector userThreads; userThreads.reserve(userThreadCount - 1); for (int id = 1; id < userThreadCount; ++id) { userThreads.emplace_back(RandomCaller(queue, total, id, queueThreadCount, randomEngineProvider.engine(id), boostEnqueueOperationWeight)); } RandomCaller(queue, total, primaryThreadId, queueThreadCount, randomEngineProvider.engine(primaryThreadId), boostEnqueueOperationWeight)(); for (auto &t : userThreads) t.join(); waitAndPrint(queue, printer); QCOMPARE(total.load(), 0); } QTEST_APPLESS_MAIN(ConcurrentQueueTest) #include "concurrent_queue_test.moc"