-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadPool.h
More file actions
92 lines (82 loc) · 2.81 KB
/
ThreadPool.h
File metadata and controls
92 lines (82 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#pragma once
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <future>
#include <vector>
class ThreadPool {
private:
enum { running, stopping, stopped } status;
std::queue<std::function<void()>> queue;
std::vector<std::thread> worker_threads;
std::mutex status_and_queue_mutex; // to lock both status and queue
std::condition_variable status_and_queue_cv;
public:
ThreadPool(const size_t n_threads) : worker_threads(std::vector<std::thread>(n_threads)), status(stopped) {}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;
~ThreadPool(){ shutdown(); }
// Create threads in the pool and let them run
void start() {
for (auto &t : worker_threads) {
t = std::thread([this](){
std::function<void()> func;
while (true) {
{
std::unique_lock<std::mutex> lock(status_and_queue_mutex);
status_and_queue_cv.wait(lock, [this](){ return !queue.empty() || status == stopping; });
if (status == stopping) break;
func = std::move(queue.front());
queue.pop();
}
func();
}
});
}
{
std::unique_lock<std::mutex> lock(status_and_queue_mutex);
status = running;
}
}
// Waits until threads finish their current task and shutdowns the pool
void shutdown() {
{
std::unique_lock<std::mutex> lock(status_and_queue_mutex);
if (status != running) return;
status = stopping;
}
status_and_queue_cv.notify_all();
for (auto &t : worker_threads) { t.join(); }
{
std::unique_lock<std::mutex> lock(status_and_queue_mutex);
status = stopped;
}
}
// Submit a void (*)(...) function to be executed asynchronously by the pool
// need to use std::ref to pass in reference
template<typename F, typename...Args>
void submit(F&& f, Args&&... args) {
{
std::unique_lock<std::mutex> lock(status_and_queue_mutex);
queue.emplace(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
status_and_queue_cv.notify_one();
}
// Submit any function
// This is a more general form than "Submit" but may be slower???
template<typename F, typename...Args>
auto submitFuture(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>> (
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
{
std::unique_lock<std::mutex> lock(status_and_queue_mutex);
queue.emplace( [task_ptr]() { (*task_ptr)(); } );
}
status_and_queue_cv.notify_one();
return task_ptr->get_future(); // note: calling thread will not wait at future destruction
}
};