1111#include < atomic>
1212#include < condition_variable>
1313#include < cstdlib>
14+ #include < forward_list>
1415#include < functional>
1516#include < future>
16- #include < iostream >
17+ #include < iterator >
1718#include < mutex>
1819#include < numeric>
1920#include < queue>
@@ -30,18 +31,14 @@ namespace detail {
3031class worker_thread {
3132public:
3233 // Initializes state, but does not start the worker thread
33- worker_thread () noexcept : m_isRunning(false ), m_numTasks(0 ) {}
34-
35- // Creates and launches the worker thread
36- inline void start (size_t threadId) {
34+ worker_thread (size_t threadId) noexcept
35+ : m_threadId(threadId), m_isRunning(false ), m_numTasks(0 ) {
3736 std::lock_guard<std::mutex> lock (m_workMutex);
3837 if (this ->is_running ()) {
3938 return ;
4039 }
41- m_threadId = threadId;
4240 m_worker = std::thread ([this ]() {
4341 while (true ) {
44- // pin the thread to the cpu
4542 std::unique_lock<std::mutex> lock (m_workMutex);
4643 // Wait until there's work available
4744 m_startWorkCondition.wait (
@@ -51,7 +48,7 @@ class worker_thread {
5148 break ;
5249 }
5350 // Retrieve a task from the queue
54- auto task = m_tasks.front ();
51+ worker_task_t task = std::move ( m_tasks.front () );
5552 m_tasks.pop ();
5653
5754 // Not modifying internal state anymore, can release the mutex
@@ -63,7 +60,7 @@ class worker_thread {
6360 }
6461 });
6562
66- m_isRunning = true ;
63+ m_isRunning. store ( true , std::memory_order_release) ;
6764 }
6865
6966 inline void schedule (const worker_task_t &task) {
@@ -79,16 +76,12 @@ class worker_thread {
7976 size_t num_pending_tasks () const noexcept {
8077 // m_numTasks is an atomic counter because we don't want to lock the mutex
8178 // here, num_pending_tasks is only used for heuristics
82- return m_numTasks.load ();
79+ return m_numTasks.load (std::memory_order_acquire );
8380 }
8481
8582 // Waits for all tasks to finish and destroys the worker thread
8683 inline void stop () {
87- {
88- // Notify the worker thread to stop executing
89- std::lock_guard<std::mutex> lock (m_workMutex);
90- m_isRunning = false ;
91- }
84+ m_isRunning.store (false , std::memory_order_release);
9285 m_startWorkCondition.notify_all ();
9386 if (m_worker.joinable ()) {
9487 // Wait for the worker thread to finish handling the task queue
@@ -97,18 +90,21 @@ class worker_thread {
9790 }
9891
9992 // Checks whether the thread pool is currently running threads
100- inline bool is_running () const noexcept { return m_isRunning; }
93+ inline bool is_running () const noexcept {
94+ return m_isRunning.load (std::memory_order_acquire);
95+ }
10196
10297private:
10398 // Unique ID identifying the thread in the threadpool
104- size_t m_threadId;
99+ const size_t m_threadId;
100+
105101 std::thread m_worker;
106102
107103 std::mutex m_workMutex;
108104
109105 std::condition_variable m_startWorkCondition;
110106
111- bool m_isRunning;
107+ std::atomic< bool > m_isRunning;
112108
113109 std::queue<worker_task_t > m_tasks;
114110
@@ -121,47 +117,21 @@ class worker_thread {
121117// parameters and futures.
122118class simple_thread_pool {
123119public:
124- simple_thread_pool (size_t numThreads = 0 ) noexcept : m_isRunning(false ) {
125- this ->resize (numThreads);
126- this ->start ();
127- }
128-
129- ~simple_thread_pool () { this ->stop (); }
130-
131- // Creates and launches the worker threads
132- inline void start () {
133- if (this ->is_running ()) {
134- return ;
135- }
136- size_t threadId = 0 ;
137- for (auto &t : m_workers) {
138- t.start (threadId);
139- threadId++;
120+ simple_thread_pool () noexcept
121+ : m_isRunning(false ), m_numThreads(get_num_threads()) {
122+ for (size_t i = 0 ; i < m_numThreads; i++) {
123+ m_workers.emplace_front (i);
140124 }
141125 m_isRunning.store (true , std::memory_order_release);
142126 }
143127
144- // Waits for all tasks to finish and destroys the worker threads
145- inline void stop () {
128+ ~simple_thread_pool () {
146129 for (auto &t : m_workers) {
147130 t.stop ();
148131 }
149132 m_isRunning.store (false , std::memory_order_release);
150133 }
151134
152- inline void resize (size_t numThreads) {
153- char *envVar = std::getenv (" SYCL_NATIVE_CPU_HOST_THREADS" );
154- if (envVar) {
155- numThreads = std::stoul (envVar);
156- }
157- if (numThreads == 0 ) {
158- numThreads = std::thread::hardware_concurrency ();
159- }
160- if (!this ->is_running () && (numThreads != this ->num_threads ())) {
161- m_workers = decltype (m_workers)(numThreads);
162- }
163- }
164-
165135 inline void schedule (const worker_task_t &task) {
166136 // Schedule the task on the best available worker thread
167137 this ->best_worker ().schedule (task);
@@ -171,7 +141,7 @@ class simple_thread_pool {
171141 return m_isRunning.load (std::memory_order_acquire);
172142 }
173143
174- inline size_t num_threads () const noexcept { return m_workers. size () ; }
144+ inline size_t num_threads () const noexcept { return m_numThreads ; }
175145
176146 inline size_t num_pending_tasks () const noexcept {
177147 return std::accumulate (std::begin (m_workers), std::end (m_workers),
@@ -201,24 +171,32 @@ class simple_thread_pool {
201171 }
202172
203173private:
204- std::vector<worker_thread> m_workers;
174+ static size_t get_num_threads () {
175+ size_t numThreads;
176+ char *envVar = std::getenv (" SYCL_NATIVE_CPU_HOST_THREADS" );
177+ if (envVar) {
178+ numThreads = std::stoul (envVar);
179+ } else {
180+ numThreads = std::thread::hardware_concurrency ();
181+ }
182+ return numThreads;
183+ }
184+
185+ std::forward_list<worker_thread> m_workers;
205186
206187 std::atomic<bool > m_isRunning;
188+
189+ const size_t m_numThreads;
207190};
208191} // namespace detail
209192
210193template <typename ThreadPoolT> class threadpool_interface {
211194 ThreadPoolT threadpool;
212195
213196public:
214- void start () { threadpool.start (); }
215-
216- void stop () { threadpool.stop (); }
217-
218197 size_t num_threads () const noexcept { return threadpool.num_threads (); }
219198
220- threadpool_interface (size_t numThreads) : threadpool(numThreads) {}
221- threadpool_interface () : threadpool(0 ) {}
199+ threadpool_interface () : threadpool() {}
222200
223201 auto schedule_task (worker_task_t &&task) {
224202 auto workerTask = std::make_shared<std::packaged_task<void (size_t )>>(
0 commit comments