mirror of
https://github.com/YACReader/yacreader
synced 2025-06-04 01:28:55 -04:00
Add concurrent queue based on lambdas
This commit is contained in:
parent
7b36100458
commit
fa5ce25425
@ -76,6 +76,7 @@ QT += sql network widgets script
|
||||
|
||||
# Input
|
||||
HEADERS += comic_flow.h \
|
||||
../common/concurrent_queue.h \
|
||||
create_library_dialog.h \
|
||||
db/query_lexer.h \
|
||||
library_creator.h \
|
||||
|
132
common/concurrent_queue.h
Normal file
132
common/concurrent_queue.h
Normal file
@ -0,0 +1,132 @@
|
||||
#ifndef CONCURRENT_QUEUE_H
|
||||
#define CONCURRENT_QUEUE_H
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <functional>
|
||||
#include <condition_variable>
|
||||
#include <queue>
|
||||
|
||||
namespace YACReader {
|
||||
class ConcurrentQueue
|
||||
{
|
||||
public:
|
||||
explicit ConcurrentQueue(int threadCount)
|
||||
: jobsLeft(0),
|
||||
bailout(false)
|
||||
{
|
||||
threads = std::vector<std::thread>(threadCount);
|
||||
for (int index = 0; index < threadCount; ++index) {
|
||||
threads[index] = std::thread([this] {
|
||||
this->nextJob();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
~ConcurrentQueue()
|
||||
{
|
||||
joinAll();
|
||||
}
|
||||
|
||||
void enqueue(std::function<void(void)> job)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queueMutex);
|
||||
_queue.emplace(job);
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(jobsLeftMutex);
|
||||
++jobsLeft;
|
||||
}
|
||||
|
||||
jobAvailableVar.notify_one();
|
||||
}
|
||||
|
||||
void cancellPending()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(jobsLeftMutex);
|
||||
_queue = std::queue<std::function<void(void)>>();
|
||||
jobsLeft = 0;
|
||||
}
|
||||
|
||||
void waitAll()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(jobsLeftMutex);
|
||||
if (jobsLeft > 0) {
|
||||
_waitVar.wait(lock, [this] {
|
||||
return jobsLeft == 0;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<std::thread> threads;
|
||||
std::queue<std::function<void(void)>> _queue;
|
||||
int jobsLeft;
|
||||
bool bailout;
|
||||
std::condition_variable jobAvailableVar;
|
||||
std::condition_variable _waitVar;
|
||||
std::mutex jobsLeftMutex;
|
||||
std::mutex queueMutex;
|
||||
|
||||
void nextJob()
|
||||
{
|
||||
while (true) {
|
||||
std::function<void(void)> job;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queueMutex);
|
||||
|
||||
if (bailout) {
|
||||
return;
|
||||
}
|
||||
|
||||
jobAvailableVar.wait(lock, [this] {
|
||||
return _queue.size() > 0 || bailout;
|
||||
});
|
||||
|
||||
if (bailout) {
|
||||
return;
|
||||
}
|
||||
|
||||
job = _queue.front();
|
||||
_queue.pop();
|
||||
}
|
||||
|
||||
job();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(jobsLeftMutex);
|
||||
--jobsLeft;
|
||||
}
|
||||
|
||||
_waitVar.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void joinAll()
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queueMutex);
|
||||
|
||||
if (bailout) {
|
||||
return;
|
||||
}
|
||||
|
||||
bailout = true;
|
||||
}
|
||||
|
||||
jobAvailableVar.notify_all();
|
||||
|
||||
for (auto &x : threads) {
|
||||
if (x.joinable()) {
|
||||
x.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif // CONCURRENT_QUEUE_H
|
Loading…
x
Reference in New Issue
Block a user