1313#include < thread>
1414
1515namespace cpr {
16+ /* *
17+ * cpr thread pool implementation used by async requests.
18+ *
19+ * Example:
20+ * // Create a new thread pool object
21+ * cpr::ThreadPool tp;
22+ * // Start the thread pool and spawn initial set of worker threads.
23+ * tp.Start()
24+ * // Add work
25+ * tp.Submit(..)
26+ * ...
27+ * // Stop/join workers and flush the task queue
28+ * tp.Stop()
29+ * // Start the thread pool again spawning the initial set of worker threads.
30+ * tp.Start()
31+ * ...
32+ **/
1633class ThreadPool {
1734 public:
35+ /* *
36+ * The default minimum thread count for the thread pool.
37+ * Even if there is no work this number of threads should be in standby for once new work arrives.
38+ **/
1839 static constexpr size_t DEFAULT_MIN_THREAD_COUNT = 0 ;
40+ /* *
41+ * The default maximum thread count for the thread pool.
42+ * Even if there is a lot of work, the thread pool is not allowed to create more threads than this number.
43+ **/
1944 static size_t DEFAULT_MAX_THREAD_COUNT;
2045
2146 private:
47+ /* *
48+ * The thread pool or worker thread state.
49+ **/
2250 enum class State : uint8_t { STOP, RUNNING };
51+ /* *
52+ * Collection of properties identifying a worker thread for the thread pool.
53+ **/
2354 struct WorkerThread {
2455 std::unique_ptr<std::thread> thread{nullptr };
56+ /* *
57+ * RUNNING: The thread is still active and working on or awaiting new work.
58+ * STOP: The thread is shutting down or has already been shut down and is ready to be joined.
59+ **/
2560 State state{State::RUNNING};
2661 };
2762
63+ /* *
64+ * Mutex for synchronizing access to the worker thread list.
65+ **/
2866 std::mutex workerMutex;
67+ /* *
68+ * A list of all worker threads
69+ **/
2970 std::list<WorkerThread> workers;
71+ /* *
72+ * Number of threads ready to be joined where their state is 'STOP'.
73+ **/
3074 std::atomic_size_t workerJoinReadyCount{0 };
3175
76+ /* *
77+ * Mutex for synchronizing access to the task queue.
78+ **/
3279 std::mutex taskQueueMutex;
80+ /* *
81+ * Conditional variable to let threads wait for new work to arrive.
82+ **/
3383 std::condition_variable taskQueueCondVar;
84+ /* *
85+ * A queue of tasks synchronized by 'taskQueueMutex'.
86+ **/
3487 std::queue<std::function<void ()>> tasks;
3588
36- std::atomic<State> state = State::STOP;
89+ /* *
90+ * The current state for the thread pool.
91+ **/
92+ std::atomic<State> state = State::RUNNING;
93+ /* *
94+ * The number of threads that should always be in standby or working.
95+ **/
3796 std::atomic_size_t minThreadCount;
97+ /* *
98+ * The current number of threads available to the thread pool (working or idle).
99+ **/
38100 std::atomic_size_t curThreadCount{0 };
101+ /* *
102+ * The maximum number of threads allowed to be used by this thread pool.
103+ **/
39104 std::atomic_size_t maxThreadCount;
105+ /* *
106+ * The number of idle threads without any work awaiting new tasks.
107+ **/
40108 std::atomic_size_t idleThreadCount{0 };
41109
110+ /* *
111+ * General control mutex synchronizing access to internal thread pool resources.
112+ **/
42113 std::recursive_mutex controlMutex;
43114
44115 public:
116+ /* *
117+ * Creates a new thread pool object with a minimum and maximum thread count.
118+ * minThreadCount: Number of threads that should always be in standby or working.
119+ * maxThreadCount: The maximum number of threads allowed to be used by this thread pool.
120+ **/
45121 explicit ThreadPool (size_t minThreadCount = DEFAULT_MIN_THREAD_COUNT, size_t maxThreadCount = DEFAULT_MAX_THREAD_COUNT);
46122 ThreadPool (const ThreadPool& other) = delete ;
47123 ThreadPool (ThreadPool&& old) = delete ;
@@ -50,24 +126,60 @@ class ThreadPool {
50126 ThreadPool& operator =(const ThreadPool& other) = delete ;
51127 ThreadPool& operator =(ThreadPool&& old) = delete ;
52128
129+ /* *
130+ * Returns the current thread pool state.
131+ * The thread pool is in STOP state when initially created and will move over to RUNNING once Start() is invoked for the first time.
132+ **/
53133 [[nodiscard]] State GetState () const ;
134+ /* *
135+ * Returns the maximum number of threads allowed to be used by this thread pool.
136+ **/
54137 [[nodiscard]] size_t GetMaxThreadCount () const ;
138+ /* *
139+ * Returns the current number of threads available to the thread pool (working or idle).
140+ **/
55141 [[nodiscard]] size_t GetCurThreadCount () const ;
142+ /* *
143+ * Returns the number of idle threads without any work awaiting new tasks.
144+ **/
56145 [[nodiscard]] size_t GetIdleThreadCount () const ;
146+ /* *
147+ * Returns the number of threads that should always be in standby or working.
148+ **/
57149 [[nodiscard]] size_t GetMinThreadCount () const ;
58150
151+ /* *
152+ * Sets the number of threads that should always be in standby or working.
153+ **/
59154 void SetMinThreadCount (size_t minThreadCount);
155+ /* *
156+ * Sets the current number of threads available to the thread pool (working or idle).
157+ **/
60158 void SetMaxThreadCount (size_t maxThreadCount);
61159
160+ /* *
161+ * Starts the thread pool by spawning GetMinThreadCount() threads.
162+ * Does nothing in case the thread pool state is already RUNNING.
163+ **/
62164 void Start ();
165+ /* *
166+ * Sets the state to STOP, clears the task queue and joins all running threads.
167+ * This means waiting for all threads that currently work on something letting them finish their task.
168+ **/
63169 void Stop ();
170+ /* *
171+ * Returns as soon as the task queue is empty and all threads are either stopped/joined or in idel state.
172+ **/
64173 void Wait ();
65174
66175 /* *
176+ * Enqueues a new task to the thread pool.
67177 * Return a future, calling future.get() will wait task done and return RetType.
68178 * Submit(fn, args...)
69179 * Submit(std::bind(&Class::mem_fn, &obj))
70180 * Submit(std::mem_fn(&Class::mem_fn, &obj))
181+ *
182+ * Will start a new thread in case all other threads are currently working and GetCurThreadCount() < GetMaxThreadCount().
71183 **/
72184 template <class Fn , class ... Args>
73185 auto Submit (Fn&& fn, Args&&... args) {
@@ -96,10 +208,23 @@ class ThreadPool {
96208 }
97209
98210 private:
211+ /* *
212+ * Sets the new thread pool state.
213+ * Returns true in case the current state was different to the newState.
214+ **/
99215 bool setState (State newState);
216+ /* *
217+ * Adds a new worker thread.
218+ **/
100219 void addThread ();
220+ /* *
221+ * Goes through the worker threads list and joins all threads where their state is STOP.
222+ **/
101223 void joinStoppedThreads ();
102224
225+ /* *
226+ * The thread entry point where the heavy lifting happens.
227+ **/
103228 void threadFunc (WorkerThread& workerThread);
104229};
105230} // namespace cpr
0 commit comments