mirror of
https://github.com/YACReader/yacreader
synced 2025-05-28 03:10:27 -04:00
Merge pull request #239 from vedgy/concurrent-queue-fixes
Test, fix, improve, optimize ConcurrentQueue
This commit is contained in:
commit
3a60d89f06
@ -1,3 +1,4 @@
|
|||||||
TEMPLATE = subdirs
|
TEMPLATE = subdirs
|
||||||
SUBDIRS = YACReader YACReaderLibrary YACReaderLibraryServer
|
SUBDIRS = YACReader YACReaderLibrary YACReaderLibraryServer
|
||||||
YACReaderLibrary.depends = YACReader
|
YACReaderLibrary.depends = YACReader
|
||||||
|
!CONFIG(no_tests): SUBDIRS += tests
|
||||||
|
@ -6,7 +6,7 @@ QMAKE_TARGET_BUNDLE_PREFIX = "com.yacreader"
|
|||||||
DEPENDPATH += . \
|
DEPENDPATH += . \
|
||||||
release
|
release
|
||||||
|
|
||||||
DEFINES += NOMINMAX YACREADER
|
DEFINES += YACREADER
|
||||||
|
|
||||||
#load default build flags
|
#load default build flags
|
||||||
include (../config.pri)
|
include (../config.pri)
|
||||||
|
@ -12,7 +12,7 @@ INCLUDEPATH += . \
|
|||||||
./comic_vine \
|
./comic_vine \
|
||||||
./comic_vine/model
|
./comic_vine/model
|
||||||
|
|
||||||
DEFINES += SERVER_RELEASE NOMINMAX YACREADER_LIBRARY
|
DEFINES += SERVER_RELEASE YACREADER_LIBRARY
|
||||||
|
|
||||||
# load default build flags
|
# load default build flags
|
||||||
include (../config.pri)
|
include (../config.pri)
|
||||||
@ -160,6 +160,7 @@ HEADERS += comic_flow.h \
|
|||||||
}
|
}
|
||||||
|
|
||||||
SOURCES += comic_flow.cpp \
|
SOURCES += comic_flow.cpp \
|
||||||
|
../common/concurrent_queue.cpp \
|
||||||
create_library_dialog.cpp \
|
create_library_dialog.cpp \
|
||||||
db/comic_query_result_processor.cpp \
|
db/comic_query_result_processor.cpp \
|
||||||
db/folder_query_result_processor.cpp \
|
db/folder_query_result_processor.cpp \
|
||||||
|
@ -16,7 +16,7 @@ YACReader::ComicQueryResultProcessor::ComicQueryResultProcessor()
|
|||||||
|
|
||||||
void YACReader::ComicQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, const QString &databasePath)
|
void YACReader::ComicQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, const QString &databasePath)
|
||||||
{
|
{
|
||||||
querySearchQueue.cancellPending();
|
querySearchQueue.cancelPending();
|
||||||
|
|
||||||
querySearchQueue.enqueue([=] {
|
querySearchQueue.enqueue([=] {
|
||||||
QString connectionName = "";
|
QString connectionName = "";
|
||||||
|
@ -22,7 +22,7 @@ YACReader::FolderQueryResultProcessor::FolderQueryResultProcessor(FolderModel *m
|
|||||||
|
|
||||||
void YACReader::FolderQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, bool includeComics)
|
void YACReader::FolderQueryResultProcessor::createModelData(const YACReader::SearchModifiers modifier, const QString &filter, bool includeComics)
|
||||||
{
|
{
|
||||||
querySearchQueue.cancellPending();
|
querySearchQueue.cancelPending();
|
||||||
|
|
||||||
querySearchQueue.enqueue([=] {
|
querySearchQueue.enqueue([=] {
|
||||||
QString connectionName = "";
|
QString connectionName = "";
|
||||||
|
@ -10,7 +10,7 @@ INCLUDEPATH += ../YACReaderLibrary \
|
|||||||
../YACReaderLibrary/server \
|
../YACReaderLibrary/server \
|
||||||
../YACReaderLibrary/db
|
../YACReaderLibrary/db
|
||||||
|
|
||||||
DEFINES += SERVER_RELEASE NOMINMAX YACREADER_LIBRARY
|
DEFINES += SERVER_RELEASE YACREADER_LIBRARY
|
||||||
# load default build flags
|
# load default build flags
|
||||||
# do a basic dependency check
|
# do a basic dependency check
|
||||||
include(headless_config.pri)
|
include(headless_config.pri)
|
||||||
|
@ -33,6 +33,11 @@ jobs:
|
|||||||
qmake CONFIG+="7zip" %DEFINES_VAR%
|
qmake CONFIG+="7zip" %DEFINES_VAR%
|
||||||
nmake
|
nmake
|
||||||
displayName: 'Build'
|
displayName: 'Build'
|
||||||
|
- script: |
|
||||||
|
call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\${{ parameters.vc_vars }}"
|
||||||
|
set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH%
|
||||||
|
nmake check TESTARGS="-maxwarnings 100000"
|
||||||
|
displayName: 'Run tests'
|
||||||
# - script: |
|
# - script: |
|
||||||
# set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH%
|
# set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH%
|
||||||
# cd $(Build.SourcesDirectory)\ci\win
|
# cd $(Build.SourcesDirectory)\ci\win
|
||||||
|
@ -33,6 +33,11 @@ jobs:
|
|||||||
qmake CONFIG+="7zip" %DEFINES_VAR%
|
qmake CONFIG+="7zip" %DEFINES_VAR%
|
||||||
nmake
|
nmake
|
||||||
displayName: 'Build'
|
displayName: 'Build'
|
||||||
|
- script: |
|
||||||
|
call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\${{ parameters.vc_vars }}"
|
||||||
|
set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH%
|
||||||
|
nmake check TESTARGS="-maxwarnings 100000"
|
||||||
|
displayName: 'Run tests'
|
||||||
- script: |
|
- script: |
|
||||||
set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH%
|
set PATH=C:\Qt\${{ parameters.qt_version }}\${{ parameters.qt_spec }}\bin;%PATH%
|
||||||
cd $(Build.SourcesDirectory)\ci\win
|
cd $(Build.SourcesDirectory)\ci\win
|
||||||
|
@ -72,6 +72,9 @@ jobs:
|
|||||||
qmake CONFIG+="unarr" $DEFINES_VAR
|
qmake CONFIG+="unarr" $DEFINES_VAR
|
||||||
make
|
make
|
||||||
displayName: 'Build'
|
displayName: 'Build'
|
||||||
|
- script: |
|
||||||
|
make check TESTARGS="-maxwarnings 100000"
|
||||||
|
displayName: 'Run tests'
|
||||||
- task: CopyFiles@2
|
- task: CopyFiles@2
|
||||||
inputs:
|
inputs:
|
||||||
sourceFolder: $(Build.SourcesDirectory)/tarball
|
sourceFolder: $(Build.SourcesDirectory)/tarball
|
||||||
@ -118,6 +121,11 @@ jobs:
|
|||||||
SKIP_CODESIGN="$(tr [A-Z] [a-z] <<< "$IS_FORK")"
|
SKIP_CODESIGN="$(tr [A-Z] [a-z] <<< "$IS_FORK")"
|
||||||
./compileOSX.sh $VERSION $(Build.BuildNumber) $SKIP_CODESIGN
|
./compileOSX.sh $VERSION $(Build.BuildNumber) $SKIP_CODESIGN
|
||||||
displayName: 'Build'
|
displayName: 'Build'
|
||||||
|
- script: |
|
||||||
|
cd $(Build.SourcesDirectory)/tests
|
||||||
|
qmake
|
||||||
|
make check TESTARGS="-maxwarnings 100000"
|
||||||
|
displayName: 'Build and run tests'
|
||||||
- task: CopyFiles@2
|
- task: CopyFiles@2
|
||||||
inputs:
|
inputs:
|
||||||
contents: '*.dmg'
|
contents: '*.dmg'
|
||||||
|
107
common/concurrent_queue.cpp
Normal file
107
common/concurrent_queue.cpp
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
#include "concurrent_queue.h"
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <mutex>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
using namespace YACReader;
|
||||||
|
|
||||||
|
ConcurrentQueue::ConcurrentQueue(std::size_t threadCount)
|
||||||
|
{
|
||||||
|
threads.reserve(threadCount);
|
||||||
|
for (; threadCount != 0; --threadCount)
|
||||||
|
threads.emplace_back(&ConcurrentQueue::nextJob, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentQueue::~ConcurrentQueue()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(queueMutex);
|
||||||
|
assert(!bailout);
|
||||||
|
bailout = true;
|
||||||
|
}
|
||||||
|
jobAvailableVar.notify_all();
|
||||||
|
|
||||||
|
for (auto &x : threads)
|
||||||
|
x.join();
|
||||||
|
assert(jobsLeft == _queue.size() && "Only not yet started jobs are left.");
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueue::enqueue(Job job)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(jobsLeftMutex);
|
||||||
|
++jobsLeft;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(queueMutex);
|
||||||
|
_queue.emplace(std::move(job));
|
||||||
|
}
|
||||||
|
|
||||||
|
jobAvailableVar.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t ConcurrentQueue::cancelPending()
|
||||||
|
{
|
||||||
|
decltype(_queue) oldQueue;
|
||||||
|
{
|
||||||
|
const std::lock_guard<std::mutex> lock(queueMutex);
|
||||||
|
// The mutex locking time is lower with swap() compared to assigning a
|
||||||
|
// temporary (which destroys _queue's elements and deallocates memory).
|
||||||
|
_queue.swap(oldQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto size = oldQueue.size();
|
||||||
|
if (size != 0)
|
||||||
|
finalizeJobs(size);
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueue::waitAll() const
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(jobsLeftMutex);
|
||||||
|
_waitVar.wait(lock, [this] { return jobsLeft == 0; });
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueue::nextJob()
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
Job job;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(queueMutex);
|
||||||
|
|
||||||
|
jobAvailableVar.wait(lock, [this] {
|
||||||
|
return _queue.size() > 0 || bailout;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (bailout) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
job = std::move(_queue.front());
|
||||||
|
_queue.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
job();
|
||||||
|
finalizeJobs(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueue::finalizeJobs(std::size_t count)
|
||||||
|
{
|
||||||
|
assert(count > 0);
|
||||||
|
|
||||||
|
std::size_t remainingJobs;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(jobsLeftMutex);
|
||||||
|
assert(jobsLeft >= count);
|
||||||
|
jobsLeft -= count;
|
||||||
|
remainingJobs = jobsLeft;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remainingJobs == 0)
|
||||||
|
_waitVar.notify_all();
|
||||||
|
}
|
@ -6,126 +6,48 @@
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace YACReader {
|
namespace YACReader {
|
||||||
|
//! All functions in this class are thread-safe in the Qt documentation sense.
|
||||||
class ConcurrentQueue
|
class ConcurrentQueue
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit ConcurrentQueue(int threadCount)
|
//! @brief Creates and starts executing @p threadCount worker threads.
|
||||||
: jobsLeft(0),
|
//! @note ConcurrentQueue is unable to execute jobs if @p threadCount == 0.
|
||||||
bailout(false)
|
explicit ConcurrentQueue(std::size_t threadCount);
|
||||||
{
|
|
||||||
threads = std::vector<std::thread>(threadCount);
|
|
||||||
for (int index = 0; index < threadCount; ++index) {
|
|
||||||
threads[index] = std::thread([this] {
|
|
||||||
this->nextJob();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
~ConcurrentQueue()
|
//! Cancels all jobs that have not been picked up by worker threads yet,
|
||||||
{
|
//! waits for all worker threads to complete their jobs and joins them.
|
||||||
joinAll();
|
~ConcurrentQueue();
|
||||||
}
|
|
||||||
|
|
||||||
void enqueue(std::function<void(void)> job)
|
using Job = std::function<void()>;
|
||||||
{
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(queueMutex);
|
|
||||||
_queue.emplace(job);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
//! @brief Adds @p job to the queue.
|
||||||
std::lock_guard<std::mutex> lock(jobsLeftMutex);
|
//! @note A worker thread may start executing @p job immediately if it is idle.
|
||||||
++jobsLeft;
|
//! Worker threads start executing jobs in the same order as they are enqueued.
|
||||||
}
|
void enqueue(Job job);
|
||||||
|
|
||||||
jobAvailableVar.notify_one();
|
//! @brief Cancels all jobs that have not been picked up by worker threads yet.
|
||||||
}
|
//! @return The number of jobs that were canceled.
|
||||||
|
std::size_t cancelPending();
|
||||||
|
|
||||||
void cancellPending()
|
//! @brief Blocks the current thread until all enqueued jobs are completed.
|
||||||
{
|
void waitAll() const;
|
||||||
std::unique_lock<std::mutex> lockQueue(queueMutex);
|
|
||||||
std::unique_lock<std::mutex> lockJobsLeft(jobsLeftMutex);
|
|
||||||
_queue = std::queue<std::function<void(void)>>();
|
|
||||||
jobsLeft = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void waitAll()
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(jobsLeftMutex);
|
|
||||||
if (jobsLeft > 0) {
|
|
||||||
_waitVar.wait(lock, [this] {
|
|
||||||
return jobsLeft == 0;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
//! @invariant all worker threads are joinable until the destructor is called.
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
std::queue<std::function<void(void)>> _queue;
|
std::queue<Job> _queue;
|
||||||
int jobsLeft;
|
std::size_t jobsLeft = 0; //!< @invariant jobsLeft >= _queue.size()
|
||||||
bool bailout;
|
bool bailout = false; //!< @invariant is false until the destructor is called.
|
||||||
std::condition_variable jobAvailableVar;
|
std::condition_variable jobAvailableVar;
|
||||||
std::condition_variable _waitVar;
|
mutable std::condition_variable _waitVar;
|
||||||
std::mutex jobsLeftMutex;
|
mutable std::mutex jobsLeftMutex;
|
||||||
std::mutex queueMutex;
|
std::mutex queueMutex;
|
||||||
|
|
||||||
void nextJob()
|
void nextJob();
|
||||||
{
|
void finalizeJobs(std::size_t count);
|
||||||
while (true) {
|
|
||||||
std::function<void(void)> job;
|
|
||||||
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(queueMutex);
|
|
||||||
|
|
||||||
if (bailout) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
jobAvailableVar.wait(lock, [this] {
|
|
||||||
return _queue.size() > 0 || bailout;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (bailout) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
job = _queue.front();
|
|
||||||
_queue.pop();
|
|
||||||
}
|
|
||||||
|
|
||||||
job();
|
|
||||||
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(jobsLeftMutex);
|
|
||||||
--jobsLeft;
|
|
||||||
}
|
|
||||||
|
|
||||||
_waitVar.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void joinAll()
|
|
||||||
{
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(queueMutex);
|
|
||||||
|
|
||||||
if (bailout) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bailout = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
jobAvailableVar.notify_all();
|
|
||||||
|
|
||||||
for (auto &x : threads) {
|
|
||||||
if (x.joinable()) {
|
|
||||||
x.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,8 @@
|
|||||||
CONFIG += c++17
|
CONFIG += c++17
|
||||||
win32:QMAKE_CXXFLAGS += /std:c++17 #enable c++17 explicitly in msvc
|
win32:QMAKE_CXXFLAGS += /std:c++17 #enable c++17 explicitly in msvc
|
||||||
|
|
||||||
|
DEFINES += NOMINMAX
|
||||||
|
|
||||||
if(unix|mingw):QMAKE_CXXFLAGS_RELEASE += -DNDEBUG
|
if(unix|mingw):QMAKE_CXXFLAGS_RELEASE += -DNDEBUG
|
||||||
win32:msvc:QMAKE_CXXFLAGS_RELEASE += /DNDEBUG
|
win32:msvc:QMAKE_CXXFLAGS_RELEASE += /DNDEBUG
|
||||||
|
|
||||||
|
699
tests/concurrent_queue_test/concurrent_queue_test.cpp
Normal file
699
tests/concurrent_queue_test/concurrent_queue_test.cpp
Normal file
@ -0,0 +1,699 @@
|
|||||||
|
#include "concurrent_queue.h"
|
||||||
|
|
||||||
|
#include <QDebug>
|
||||||
|
#include <QDebugStateSaver>
|
||||||
|
#include <QMetaType>
|
||||||
|
#include <QObject>
|
||||||
|
#include <QString>
|
||||||
|
#include <QTest>
|
||||||
|
#include <QTime>
|
||||||
|
#include <QVector>
|
||||||
|
|
||||||
|
#include <array>
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <memory>
|
||||||
|
#include <numeric>
|
||||||
|
#include <random>
|
||||||
|
#include <sstream>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace chrono = std::chrono;
|
||||||
|
using Clock = chrono::steady_clock;
|
||||||
|
using YACReader::ConcurrentQueue;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
double toMilliseconds(Clock::duration duration)
|
||||||
|
{
|
||||||
|
return chrono::duration_cast<chrono::microseconds>(duration).count() / 1000.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
QString currentThreadInfo()
|
||||||
|
{
|
||||||
|
std::ostringstream os;
|
||||||
|
os << std::this_thread::get_id();
|
||||||
|
return QString::fromStdString(os.str());
|
||||||
|
}
|
||||||
|
|
||||||
|
//! This test prints thousands of lines of detailed output. The output allows to analyze
|
||||||
|
//! how ConcurrentQueue is being tested, how it works and why the test fails or crashes
|
||||||
|
//! (normally it passes). The default maximum number of warnings in Qt Test is 2000,
|
||||||
|
//! which is too low for this test. Therefore, the following warning is printed before
|
||||||
|
//! the log output is suppressed: "Maximum amount of warnings exceeded. Use -maxwarnings
|
||||||
|
//! to override.". Passing `-maxwarnings 100000` command line option to the test lets it
|
||||||
|
//! print everything. Passing -silent command line option to the test suppresses all its
|
||||||
|
//! output except for RandomEngineProvider's root seeds, which are necessary to reproduce
|
||||||
|
//! interesting test results.
|
||||||
|
QDebug log()
|
||||||
|
{
|
||||||
|
return qInfo().noquote() << currentThreadInfo() << '|'
|
||||||
|
<< QTime::currentTime().toString(Qt::ISODateWithMs) << '|';
|
||||||
|
}
|
||||||
|
|
||||||
|
using Total = std::atomic<int>;
|
||||||
|
|
||||||
|
struct JobData {
|
||||||
|
int summand;
|
||||||
|
Clock::duration sleepingTime;
|
||||||
|
};
|
||||||
|
using JobDataSet = QVector<JobData>;
|
||||||
|
|
||||||
|
int expectedTotal(JobDataSet::const_iterator first, JobDataSet::const_iterator last)
|
||||||
|
{
|
||||||
|
return std::accumulate(first, last, 0,
|
||||||
|
[](int total, JobData job) {
|
||||||
|
return total + job.summand;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
int expectedTotal(const JobDataSet &jobs)
|
||||||
|
{
|
||||||
|
return expectedTotal(jobs.cbegin(), jobs.cend());
|
||||||
|
}
|
||||||
|
|
||||||
|
int expectedTotal(const JobDataSet &jobs, std::size_t canceledCount)
|
||||||
|
{
|
||||||
|
const auto count = jobs.size() - static_cast<int>(canceledCount);
|
||||||
|
if (count < 0)
|
||||||
|
qFatal("Canceled more than the total number of jobs somehow!");
|
||||||
|
return expectedTotal(jobs.cbegin(), jobs.cbegin() + count);
|
||||||
|
}
|
||||||
|
|
||||||
|
int expectedTotal(const QVector<JobDataSet> &jobs)
|
||||||
|
{
|
||||||
|
return std::accumulate(jobs.cbegin(), jobs.cend(), 0,
|
||||||
|
[](int total, const JobDataSet &dataSet) {
|
||||||
|
return total + expectedTotal(dataSet);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
class Id
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit Id(int threadId, int jobId)
|
||||||
|
: threadId { threadId }, jobId { jobId } { }
|
||||||
|
|
||||||
|
QString toString() const { return QStringLiteral("[%1.%2]").arg(threadId).arg(jobId); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
const int threadId;
|
||||||
|
const int jobId;
|
||||||
|
};
|
||||||
|
|
||||||
|
QDebug operator<<(QDebug debug, Id id)
|
||||||
|
{
|
||||||
|
QDebugStateSaver saver(debug);
|
||||||
|
debug.noquote() << id.toString();
|
||||||
|
return debug;
|
||||||
|
}
|
||||||
|
|
||||||
|
class Job
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit Job(Total &total, JobData data, Id id)
|
||||||
|
: total { total }, data { data }, id { id } { }
|
||||||
|
|
||||||
|
void operator()()
|
||||||
|
{
|
||||||
|
log().nospace() << id << " sleep " << toMilliseconds(data.sleepingTime) << " ms...";
|
||||||
|
std::this_thread::sleep_for(data.sleepingTime);
|
||||||
|
|
||||||
|
const auto updatedTotal = (total += data.summand);
|
||||||
|
log().nospace() << id << " +" << data.summand << " => " << updatedTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Total &total;
|
||||||
|
const JobData data;
|
||||||
|
const Id id;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Enqueuer
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit Enqueuer(ConcurrentQueue &queue, Total &total, const JobDataSet &jobs, int threadId)
|
||||||
|
: queue { queue }, total { total }, jobs { jobs }, threadId { threadId } { }
|
||||||
|
|
||||||
|
void operator()()
|
||||||
|
{
|
||||||
|
const char *const jobStr = jobs.size() == 1 ? "job" : "jobs";
|
||||||
|
log() << QStringLiteral("#%1 enqueuing %2 %3...").arg(threadId).arg(jobs.size()).arg(jobStr);
|
||||||
|
for (int i = 0; i < jobs.size(); ++i)
|
||||||
|
queue.enqueue(Job(total, jobs.at(i), Id(threadId, i + 1)));
|
||||||
|
log() << QStringLiteral("#%1 enqueuing complete.").arg(threadId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
ConcurrentQueue &queue;
|
||||||
|
Total &total;
|
||||||
|
const JobDataSet jobs;
|
||||||
|
const int threadId;
|
||||||
|
};
|
||||||
|
|
||||||
|
class QueueControlMessagePrinter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit QueueControlMessagePrinter(const Total &total, int threadId, int threadCount)
|
||||||
|
: total { total }, threadId { threadId }, threadCount { threadCount } { }
|
||||||
|
|
||||||
|
void printStartedMessage() const
|
||||||
|
{
|
||||||
|
log() << threadMessageFormatString().arg("started");
|
||||||
|
}
|
||||||
|
void printCanceledMessage(std::size_t canceledCount) const
|
||||||
|
{
|
||||||
|
const char *const jobStr = canceledCount == 1 ? "job" : "jobs";
|
||||||
|
const auto format = messageFormatString().arg("%1 %2 %3");
|
||||||
|
log() << format.arg("canceled").arg(canceledCount).arg(jobStr);
|
||||||
|
}
|
||||||
|
void printBeginWaitingMessage() const
|
||||||
|
{
|
||||||
|
log() << threadMessageFormatString().arg("begin waiting for");
|
||||||
|
}
|
||||||
|
void printEndWaitingMessage() const
|
||||||
|
{
|
||||||
|
log() << threadMessageFormatString().arg("end waiting for");
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
QString messageFormatString() const
|
||||||
|
{
|
||||||
|
return QStringLiteral("#%1 %3 => %2").arg(threadId).arg(total.load());
|
||||||
|
}
|
||||||
|
|
||||||
|
QString threadMessageFormatString() const
|
||||||
|
{
|
||||||
|
const char *const threadStr = threadCount == 1 ? "thread" : "threads";
|
||||||
|
const auto format = messageFormatString().arg("%3 %1 %2");
|
||||||
|
return format.arg(threadCount).arg(threadStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
const Total &total;
|
||||||
|
const int threadId;
|
||||||
|
const int threadCount;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::size_t cancelAndPrint(ConcurrentQueue &queue, const QueueControlMessagePrinter &printer)
|
||||||
|
{
|
||||||
|
const auto canceledCount = queue.cancelPending();
|
||||||
|
printer.printCanceledMessage(canceledCount);
|
||||||
|
return canceledCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
void waitAndPrint(const ConcurrentQueue &queue, const QueueControlMessagePrinter &printer)
|
||||||
|
{
|
||||||
|
printer.printBeginWaitingMessage();
|
||||||
|
queue.waitAll();
|
||||||
|
printer.printEndWaitingMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T, std::size_t size>
|
||||||
|
QDebug operator<<(QDebug debug, const std::array<T, size> &array)
|
||||||
|
{
|
||||||
|
QDebugStateSaver saver(debug);
|
||||||
|
debug.nospace();
|
||||||
|
|
||||||
|
debug << '(';
|
||||||
|
if (size != 0) {
|
||||||
|
debug << array.front();
|
||||||
|
for (std::size_t i = 1; i != size; ++i)
|
||||||
|
debug << ", " << array[i];
|
||||||
|
}
|
||||||
|
debug << ')';
|
||||||
|
|
||||||
|
return debug;
|
||||||
|
}
|
||||||
|
|
||||||
|
using RandomEngine = std::mt19937_64;
|
||||||
|
|
||||||
|
class RandomEngineProvider
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RandomEngineProvider()
|
||||||
|
{
|
||||||
|
std::random_device rd;
|
||||||
|
const auto randomValues = generate<rootSeedCount>(rd);
|
||||||
|
// Qt Test does not suppress output from the constructor of a test class
|
||||||
|
// even when -silent command line option is passed. This is fortunate
|
||||||
|
// because the root seeds can be used to reproduce a test failure.
|
||||||
|
log() << "RandomEngineProvider's root seeds:" << randomValues;
|
||||||
|
std::seed_seq seedSeq(randomValues.begin(), randomValues.end());
|
||||||
|
rootEngine.reset(new std::mt19937(seedSeq));
|
||||||
|
}
|
||||||
|
|
||||||
|
void resetEngines(std::size_t engineCount)
|
||||||
|
{
|
||||||
|
engines.clear();
|
||||||
|
engines.reserve(engineCount);
|
||||||
|
for (; engineCount != 0; --engineCount) {
|
||||||
|
const auto randomValues = generate<seedCount>(*rootEngine);
|
||||||
|
std::seed_seq seedSeq(randomValues.begin(), randomValues.end());
|
||||||
|
engines.emplace_back(seedSeq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RandomEngine &engine(std::size_t index)
|
||||||
|
{
|
||||||
|
return engines.at(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// In this test we don't really care about uniformly choosing an initial state
|
||||||
|
// from the entire state-space of the engine. It is possible to generate more
|
||||||
|
// random numbers at the cost of performance and system entropy pool exhaustion.
|
||||||
|
static constexpr std::size_t rootSeedCount { 8 };
|
||||||
|
static constexpr std::size_t seedCount { 32 };
|
||||||
|
|
||||||
|
template<std::size_t size, typename Generator>
|
||||||
|
static std::array<std::uint32_t, size> generate(Generator &generator)
|
||||||
|
{
|
||||||
|
std::array<std::uint32_t, size> result;
|
||||||
|
for (auto &value : result)
|
||||||
|
value = generator();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<std::mt19937> rootEngine;
|
||||||
|
std::vector<RandomEngine> engines;
|
||||||
|
};
|
||||||
|
|
||||||
|
//! Calls random member functions of ConcurrentQueue for a limited time.
|
||||||
|
//! Ensures that total equals 0 when all jobs are complete/canceled by:
|
||||||
|
//! * setting each job's summand to 1;
|
||||||
|
//! * subtracting a job set's size from total before enqueuing jobs in the set;
|
||||||
|
//! * adding canceled job count to total after cancelation.
|
||||||
|
class RandomCaller
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit RandomCaller(ConcurrentQueue &queue, Total &total, int threadId,
|
||||||
|
int queueThreadCount, RandomEngine &engine,
|
||||||
|
bool boostEnqueueOperationWeight)
|
||||||
|
: queue(queue),
|
||||||
|
total(total),
|
||||||
|
threadId { threadId },
|
||||||
|
boostEnqueueOperationWeight { boostEnqueueOperationWeight },
|
||||||
|
printer(total, threadId, queueThreadCount),
|
||||||
|
engine(engine)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void operator()()
|
||||||
|
{
|
||||||
|
constexpr auto testDuration = chrono::milliseconds(10);
|
||||||
|
const auto testStartTime = Clock::now();
|
||||||
|
|
||||||
|
auto operation = operationDistribution();
|
||||||
|
do {
|
||||||
|
switch (operation(engine)) {
|
||||||
|
case 0:
|
||||||
|
enqueue();
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
cancel();
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
waitAndPrint(queue, printer);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
qFatal("Unsupported operation.");
|
||||||
|
}
|
||||||
|
} while (Clock::now() - testStartTime < testDuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
int randomInt(int a, int b)
|
||||||
|
{
|
||||||
|
return uniformInt(engine, decltype(uniformInt)::param_type(a, b));
|
||||||
|
}
|
||||||
|
|
||||||
|
using OperationDistribution = std::discrete_distribution<int>;
|
||||||
|
|
||||||
|
void printProbabilities(const OperationDistribution &distribution) const
|
||||||
|
{
|
||||||
|
auto p = distribution.probabilities();
|
||||||
|
constexpr std::size_t expectedProbabilityCount { 3 };
|
||||||
|
if (p.size() != expectedProbabilityCount)
|
||||||
|
qFatal("Wrong number of operation probabilities: %zu != %zu", p.size(), expectedProbabilityCount);
|
||||||
|
|
||||||
|
for (auto &x : p)
|
||||||
|
x *= 100; // Convert to percentages.
|
||||||
|
log() << QStringLiteral("#%1 operation probabilities: e=%2%, c=%3%, w=%4%.").arg(threadId).arg(p[0]).arg(p[1]).arg(p[2]);
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationDistribution operationDistribution()
|
||||||
|
{
|
||||||
|
auto distribution = boostEnqueueOperationWeight ? boostedEnqueueOperationDistribution()
|
||||||
|
: almostUniformOperationDistribution();
|
||||||
|
printProbabilities(distribution);
|
||||||
|
return distribution;
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationDistribution almostUniformOperationDistribution()
|
||||||
|
{
|
||||||
|
constexpr int sumOfProbabilities { 100 };
|
||||||
|
const auto enqueueProbability = randomInt(0, sumOfProbabilities);
|
||||||
|
const auto cancelProbability = randomInt(0, sumOfProbabilities - enqueueProbability);
|
||||||
|
const auto waitProbability = sumOfProbabilities - enqueueProbability - cancelProbability;
|
||||||
|
if (enqueueProbability + cancelProbability + waitProbability != sumOfProbabilities)
|
||||||
|
qFatal("The sum of probabilities is not 100%%.");
|
||||||
|
|
||||||
|
const auto real = [](int x) { return static_cast<double>(x); };
|
||||||
|
return { real(enqueueProbability), real(cancelProbability), real(waitProbability) };
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationDistribution boostedEnqueueOperationDistribution()
|
||||||
|
{
|
||||||
|
// Make enqueue the most frequent operation to stress-test executing
|
||||||
|
// jobs rather than canceling them almost immediately.
|
||||||
|
const auto enqueueWeight = std::lognormal_distribution<double>(2, 0.5)(engine);
|
||||||
|
const auto cancelWeight = 1.0;
|
||||||
|
// Waiting is uninteresting as it doesn't even modify the queue => make it rare.
|
||||||
|
const auto waitWeight = std::uniform_real_distribution<double>(0, 0.2)(engine);
|
||||||
|
|
||||||
|
return { enqueueWeight, cancelWeight, waitWeight };
|
||||||
|
}
|
||||||
|
|
||||||
|
JobDataSet createJobs()
|
||||||
|
{
|
||||||
|
constexpr int minJobCount { 1 }, maxJobCount { 5 };
|
||||||
|
JobDataSet jobs(randomInt(minJobCount, maxJobCount));
|
||||||
|
for (auto &job : jobs) {
|
||||||
|
constexpr int minSleepingTime { 0 }, maxSleepingTime { 5 };
|
||||||
|
const auto sleepingTime = randomInt(minSleepingTime, maxSleepingTime);
|
||||||
|
job = { 1, sleepingTime * chrono::microseconds(1) };
|
||||||
|
}
|
||||||
|
return jobs;
|
||||||
|
}
|
||||||
|
|
||||||
|
void enqueue()
|
||||||
|
{
|
||||||
|
const auto jobs = createJobs();
|
||||||
|
total -= jobs.size();
|
||||||
|
Enqueuer(queue, total, jobs, threadId)();
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancel()
|
||||||
|
{
|
||||||
|
const auto canceledCount = cancelAndPrint(queue, printer);
|
||||||
|
total += canceledCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentQueue &queue;
|
||||||
|
Total &total;
|
||||||
|
const int threadId;
|
||||||
|
const bool boostEnqueueOperationWeight;
|
||||||
|
const QueueControlMessagePrinter printer;
|
||||||
|
RandomEngine &engine;
|
||||||
|
std::uniform_int_distribution<int> uniformInt;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
Q_DECLARE_METATYPE(Clock::duration)
|
||||||
|
Q_DECLARE_METATYPE(JobData)
|
||||||
|
|
||||||
|
class ConcurrentQueueTest : public QObject
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
private slots:
|
||||||
|
void init();
|
||||||
|
|
||||||
|
void singleUserThread_data();
|
||||||
|
void singleUserThread();
|
||||||
|
|
||||||
|
void multipleUserThreads_data();
|
||||||
|
void multipleUserThreads();
|
||||||
|
|
||||||
|
void cancelPending1UserThread_data();
|
||||||
|
void cancelPending1UserThread();
|
||||||
|
|
||||||
|
void waitAllFromMultipleThreads_data();
|
||||||
|
void waitAllFromMultipleThreads();
|
||||||
|
|
||||||
|
void randomCalls_data();
|
||||||
|
void randomCalls();
|
||||||
|
|
||||||
|
private:
|
||||||
|
static constexpr int primaryThreadId { 0 };
|
||||||
|
|
||||||
|
QueueControlMessagePrinter makeMessagePrinter(int threadCount) const
|
||||||
|
{
|
||||||
|
return QueueControlMessagePrinter(total, primaryThreadId, threadCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
Total total { 0 };
|
||||||
|
RandomEngineProvider randomEngineProvider;
|
||||||
|
};
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::init()
|
||||||
|
{
|
||||||
|
total = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::singleUserThread_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<int>("threadCount");
|
||||||
|
QTest::addColumn<JobDataSet>("jobs");
|
||||||
|
|
||||||
|
using ms = chrono::milliseconds;
|
||||||
|
|
||||||
|
QTest::newRow("-") << 0 << JobDataSet {};
|
||||||
|
QTest::newRow("0") << 7 << JobDataSet {};
|
||||||
|
QTest::newRow("A") << 1 << JobDataSet { { 5, ms(0) } };
|
||||||
|
QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } };
|
||||||
|
QTest::newRow("C") << 1 << JobDataSet { { 1, ms(0) }, { 5, ms(2) }, { 3, ms(1) } };
|
||||||
|
QTest::newRow("D") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } };
|
||||||
|
QTest::newRow("E") << 2 << JobDataSet { { 1, ms(2) }, { 2, ms(1) } };
|
||||||
|
QTest::newRow("F") << 3 << JobDataSet { { 8, ms(3) }, { 5, ms(4) }, { 2, ms(1) }, { 11, ms(1) }, { 100, ms(3) } };
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::singleUserThread()
|
||||||
|
{
|
||||||
|
QFETCH(const int, threadCount);
|
||||||
|
QFETCH(const JobDataSet, jobs);
|
||||||
|
|
||||||
|
const auto printer = makeMessagePrinter(threadCount);
|
||||||
|
|
||||||
|
ConcurrentQueue queue(threadCount);
|
||||||
|
printer.printStartedMessage();
|
||||||
|
|
||||||
|
Enqueuer(queue, total, jobs, primaryThreadId)();
|
||||||
|
|
||||||
|
waitAndPrint(queue, printer);
|
||||||
|
|
||||||
|
QCOMPARE(total.load(), expectedTotal(jobs));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::multipleUserThreads_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<int>("threadCount");
|
||||||
|
QTest::addColumn<QVector<JobDataSet>>("jobs");
|
||||||
|
|
||||||
|
using ms = chrono::milliseconds;
|
||||||
|
|
||||||
|
JobDataSet jobs1 { { 1, ms(1) } };
|
||||||
|
JobDataSet jobs2 { { 2, ms(4) } };
|
||||||
|
QVector<JobDataSet> allJobs { jobs1, jobs2 };
|
||||||
|
QTest::newRow("A1") << 1 << allJobs;
|
||||||
|
QTest::newRow("A2") << 2 << allJobs;
|
||||||
|
|
||||||
|
jobs1.push_back({ 5, ms(3) });
|
||||||
|
jobs2.push_back({ 10, ms(1) });
|
||||||
|
allJobs = { jobs1, jobs2 };
|
||||||
|
QTest::newRow("B1") << 2 << allJobs;
|
||||||
|
QTest::newRow("B2") << 3 << allJobs;
|
||||||
|
QTest::newRow("B3") << 8 << allJobs;
|
||||||
|
|
||||||
|
jobs1.push_back({ 20, ms(0) });
|
||||||
|
jobs2.push_back({ 40, ms(2) });
|
||||||
|
allJobs = { jobs1, jobs2 };
|
||||||
|
QTest::newRow("C") << 4 << allJobs;
|
||||||
|
|
||||||
|
JobDataSet jobs3 { { 80, ms(0) }, { 160, ms(2) }, { 320, ms(1) }, { 640, ms(0) }, { 2000, ms(3) } };
|
||||||
|
allJobs.push_back(jobs3);
|
||||||
|
QTest::newRow("D1") << 3 << allJobs;
|
||||||
|
QTest::newRow("D2") << 5 << allJobs;
|
||||||
|
|
||||||
|
JobDataSet jobs4 { { 4000, ms(1) }, { 8000, ms(3) } };
|
||||||
|
allJobs.push_back(jobs4);
|
||||||
|
QTest::newRow("E1") << 4 << allJobs;
|
||||||
|
QTest::newRow("E2") << 6 << allJobs;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::multipleUserThreads()
|
||||||
|
{
|
||||||
|
QFETCH(const int, threadCount);
|
||||||
|
QFETCH(const QVector<JobDataSet>, jobs);
|
||||||
|
|
||||||
|
const auto printer = makeMessagePrinter(threadCount);
|
||||||
|
|
||||||
|
ConcurrentQueue queue(threadCount);
|
||||||
|
printer.printStartedMessage();
|
||||||
|
|
||||||
|
if (!jobs.empty()) {
|
||||||
|
std::vector<std::thread> enqueuerThreads;
|
||||||
|
enqueuerThreads.reserve(jobs.size() - 1);
|
||||||
|
for (int i = 1; i < jobs.size(); ++i)
|
||||||
|
enqueuerThreads.emplace_back(Enqueuer(queue, total, jobs.at(i), i));
|
||||||
|
|
||||||
|
Enqueuer(queue, total, jobs.constFirst(), primaryThreadId)();
|
||||||
|
for (auto &t : enqueuerThreads)
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
waitAndPrint(queue, printer);
|
||||||
|
|
||||||
|
QCOMPARE(total.load(), expectedTotal(jobs));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::cancelPending1UserThread_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<int>("threadCount");
|
||||||
|
QTest::addColumn<JobDataSet>("jobs");
|
||||||
|
QTest::addColumn<Clock::duration>("cancelDelay");
|
||||||
|
|
||||||
|
const auto ms = [](int count) -> Clock::duration { return chrono::milliseconds(count); };
|
||||||
|
const auto us = [](int count) -> Clock::duration { return chrono::microseconds(count); };
|
||||||
|
|
||||||
|
QTest::newRow("-") << 0 << JobDataSet {} << ms(0);
|
||||||
|
QTest::newRow("01") << 2 << JobDataSet {} << ms(0);
|
||||||
|
QTest::newRow("02") << 3 << JobDataSet {} << ms(1);
|
||||||
|
QTest::newRow("A") << 1 << JobDataSet { { 5, ms(3) } } << ms(1);
|
||||||
|
QTest::newRow("B") << 5 << JobDataSet { { 12, ms(1) } } << ms(1);
|
||||||
|
|
||||||
|
JobDataSet dataSet { { 1, ms(3) }, { 5, ms(2) }, { 3, ms(1) } };
|
||||||
|
QTest::newRow("C1") << 1 << dataSet << ms(1);
|
||||||
|
QTest::newRow("C2") << 1 << dataSet << ms(4);
|
||||||
|
QTest::newRow("C3") << 2 << dataSet << ms(1);
|
||||||
|
QTest::newRow("C4") << 3 << dataSet << ms(1);
|
||||||
|
QTest::newRow("C5") << 1 << dataSet << ms(7);
|
||||||
|
|
||||||
|
dataSet.push_back({ 10, ms(5) });
|
||||||
|
dataSet.push_back({ 20, ms(8) });
|
||||||
|
dataSet.push_back({ 40, ms(20) });
|
||||||
|
dataSet.push_back({ 80, ms(2) });
|
||||||
|
QTest::newRow("D1") << 1 << dataSet << ms(1);
|
||||||
|
QTest::newRow("D2") << 1 << dataSet << ms(15);
|
||||||
|
QTest::newRow("D3") << 1 << dataSet << ms(50);
|
||||||
|
QTest::newRow("D4") << 2 << dataSet << ms(4);
|
||||||
|
QTest::newRow("D5") << 3 << dataSet << ms(4);
|
||||||
|
QTest::newRow("D6") << 4 << dataSet << ms(4);
|
||||||
|
QTest::newRow("D7") << 2 << dataSet << us(300);
|
||||||
|
QTest::newRow("D8") << 3 << dataSet << us(500);
|
||||||
|
QTest::newRow("D9") << 4 << dataSet << us(700);
|
||||||
|
|
||||||
|
QTest::newRow("E") << 4 << JobDataSet { { 20, ms(1) }, { 8, ms(5) }, { 5, ms(2) } } << ms(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::cancelPending1UserThread()
|
||||||
|
{
|
||||||
|
QFETCH(const int, threadCount);
|
||||||
|
QFETCH(const JobDataSet, jobs);
|
||||||
|
QFETCH(const Clock::duration, cancelDelay);
|
||||||
|
|
||||||
|
const auto printer = makeMessagePrinter(threadCount);
|
||||||
|
|
||||||
|
ConcurrentQueue queue(threadCount);
|
||||||
|
printer.printStartedMessage();
|
||||||
|
|
||||||
|
Enqueuer(queue, total, jobs, primaryThreadId)();
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(cancelDelay);
|
||||||
|
const auto canceledCount = cancelAndPrint(queue, printer);
|
||||||
|
QVERIFY(canceledCount <= static_cast<std::size_t>(jobs.size()));
|
||||||
|
|
||||||
|
waitAndPrint(queue, printer);
|
||||||
|
|
||||||
|
QCOMPARE(total.load(), expectedTotal(jobs, canceledCount));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::waitAllFromMultipleThreads_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<int>("waitingThreadCount");
|
||||||
|
for (int i : { 1, 2, 4, 7, 19 })
|
||||||
|
QTest::addRow("%d", i) << i;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::waitAllFromMultipleThreads()
|
||||||
|
{
|
||||||
|
QFETCH(const int, waitingThreadCount);
|
||||||
|
QVERIFY(waitingThreadCount > 0);
|
||||||
|
|
||||||
|
constexpr auto queueThreadCount = 2;
|
||||||
|
const auto printer = makeMessagePrinter(queueThreadCount);
|
||||||
|
|
||||||
|
ConcurrentQueue queue(queueThreadCount);
|
||||||
|
printer.printStartedMessage();
|
||||||
|
|
||||||
|
using ms = chrono::milliseconds;
|
||||||
|
const JobDataSet jobs { { 5, ms(1) }, { 7, ms(2) } };
|
||||||
|
Enqueuer(queue, total, jobs, primaryThreadId)();
|
||||||
|
|
||||||
|
std::vector<std::thread> waitingThreads;
|
||||||
|
waitingThreads.reserve(waitingThreadCount - 1);
|
||||||
|
for (int id = 1; id < waitingThreadCount; ++id) {
|
||||||
|
waitingThreads.emplace_back([=, &queue] {
|
||||||
|
waitAndPrint(queue, QueueControlMessagePrinter(total, id, queueThreadCount));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
waitAndPrint(queue, printer);
|
||||||
|
|
||||||
|
for (auto &t : waitingThreads)
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
QCOMPARE(total.load(), expectedTotal(jobs));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::randomCalls_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<int>("queueThreadCount");
|
||||||
|
QTest::addColumn<int>("userThreadCount");
|
||||||
|
QTest::addColumn<bool>("boostEnqueueOperationWeight");
|
||||||
|
|
||||||
|
const auto suffix = [](bool boost) { return boost ? " +enqueue" : ""; };
|
||||||
|
|
||||||
|
for (bool boost : { false, true })
|
||||||
|
for (int q : { 1, 2, 4, 9, 12, 20 })
|
||||||
|
for (int u : { 1, 2, 4, 7, 11, 18 })
|
||||||
|
QTest::addRow("queue{%d}; %d user thread(s)%s", q, u, suffix(boost)) << q << u << boost;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcurrentQueueTest::randomCalls()
|
||||||
|
{
|
||||||
|
QFETCH(const int, queueThreadCount);
|
||||||
|
QFETCH(const int, userThreadCount);
|
||||||
|
QVERIFY(userThreadCount > 0);
|
||||||
|
QFETCH(const bool, boostEnqueueOperationWeight);
|
||||||
|
|
||||||
|
const auto printer = makeMessagePrinter(queueThreadCount);
|
||||||
|
|
||||||
|
ConcurrentQueue queue(queueThreadCount);
|
||||||
|
printer.printStartedMessage();
|
||||||
|
|
||||||
|
randomEngineProvider.resetEngines(userThreadCount);
|
||||||
|
|
||||||
|
std::vector<std::thread> userThreads;
|
||||||
|
userThreads.reserve(userThreadCount - 1);
|
||||||
|
for (int id = 1; id < userThreadCount; ++id) {
|
||||||
|
userThreads.emplace_back(RandomCaller(queue, total, id, queueThreadCount,
|
||||||
|
randomEngineProvider.engine(id),
|
||||||
|
boostEnqueueOperationWeight));
|
||||||
|
}
|
||||||
|
RandomCaller(queue, total, primaryThreadId, queueThreadCount,
|
||||||
|
randomEngineProvider.engine(primaryThreadId),
|
||||||
|
boostEnqueueOperationWeight)();
|
||||||
|
|
||||||
|
for (auto &t : userThreads)
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
waitAndPrint(queue, printer);
|
||||||
|
|
||||||
|
QCOMPARE(total.load(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
QTEST_APPLESS_MAIN(ConcurrentQueueTest)
|
||||||
|
|
||||||
|
#include "concurrent_queue_test.moc"
|
9
tests/concurrent_queue_test/concurrent_queue_test.pro
Normal file
9
tests/concurrent_queue_test/concurrent_queue_test.pro
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
include(../qt_test.pri)
|
||||||
|
|
||||||
|
PATH_TO_common = ../../common
|
||||||
|
|
||||||
|
INCLUDEPATH += $$PATH_TO_common
|
||||||
|
HEADERS += $${PATH_TO_common}/concurrent_queue.h
|
||||||
|
SOURCES += \
|
||||||
|
$${PATH_TO_common}/concurrent_queue.cpp \
|
||||||
|
concurrent_queue_test.cpp
|
9
tests/qt_test.pri
Normal file
9
tests/qt_test.pri
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
QT += testlib
|
||||||
|
QT -= gui
|
||||||
|
|
||||||
|
CONFIG += qt console warn_on testcase no_testcase_installs
|
||||||
|
CONFIG -= app_bundle
|
||||||
|
|
||||||
|
TEMPLATE = app
|
||||||
|
|
||||||
|
include(../config.pri)
|
2
tests/tests.pro
Normal file
2
tests/tests.pro
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
TEMPLATE = subdirs
|
||||||
|
SUBDIRS += concurrent_queue_test
|
Loading…
Reference in New Issue
Block a user