-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtask_queue.cpp
More file actions
84 lines (76 loc) · 2.15 KB
/
task_queue.cpp
File metadata and controls
84 lines (76 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* @Description :
* @Author : chenht2022
* @Date : 2024-07-17 12:25:51
* @Version : 1.0.0
* @LastEditors : chenht2022
* @LastEditTime : 2024-10-09 11:08:10
* @Copyright (c) 2024 by KVCache.AI, All Rights Reserved.
**/
#include "task_queue.h"
#include <pthread.h>
#include <sched.h>
#include <chrono>
#include <iostream>
#include <thread>
TaskQueue::TaskQueue() : done(false), pending(0) {
Node* dummy = new Node();
head.store(dummy, std::memory_order_relaxed);
tail.store(dummy, std::memory_order_relaxed);
workerThread = std::thread(&TaskQueue::worker, this);
}
TaskQueue::~TaskQueue() {
{
std::lock_guard<std::mutex> lock(mtx);
done.store(true, std::memory_order_release);
}
cv.notify_all();
if (workerThread.joinable()) workerThread.join();
Node* node = head.load(std::memory_order_relaxed);
while (node) {
Node* next = node->next.load(std::memory_order_relaxed);
delete node;
node = next;
}
}
void TaskQueue::enqueue(std::function<void()> task) {
pending.fetch_add(1, std::memory_order_acq_rel);
Node* node = new Node(task);
Node* prev = tail.exchange(node, std::memory_order_acq_rel);
prev->next.store(node, std::memory_order_release);
{
std::lock_guard<std::mutex> lock(mtx);
}
cv.notify_one();
}
void TaskQueue::sync(size_t allow_n_pending) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&] {
return pending.load(std::memory_order_acquire) <= allow_n_pending;
});
}
void TaskQueue::worker() {
Node* curr = head.load(std::memory_order_relaxed);
while (!done.load(std::memory_order_acquire)) {
Node* next = curr->next.load(std::memory_order_acquire);
if (next) {
if (next->task) {
next->task();
}
delete curr;
curr = next;
head.store(curr, std::memory_order_release);
{
std::lock_guard<std::mutex> lock(mtx);
pending.fetch_sub(1, std::memory_order_acq_rel);
}
cv.notify_all();
} else {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&] {
return curr->next.load(std::memory_order_acquire) != nullptr
|| done.load(std::memory_order_acquire);
});
}
}
}