Skip to content

Commit 9c651de

Browse files
authored
New async utilities for lib/everest/util (threadpool and queues) (#1830)
* Bugfix threadpool run is now truely fire and forget check threads for joinable before actually joining adding doxygen adding more tests Signed-off-by: Jan Christoph Habig <jan.habig@pionix.de> * Scaling thread pool and bounded thread safe queue Signed-off-by: Jan Christoph Habig <jan.habig@pionix.de> --------- Signed-off-by: Jan Christoph Habig <jan.habig@pionix.de>
1 parent 346ac6b commit 9c651de

File tree

8 files changed

+1084
-26
lines changed

8 files changed

+1084
-26
lines changed
Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
// SPDX-License-Identifier: Apache-2.0
2-
// Copyright 2020 - 2025 Pionix GmbH and Contributors to EVerest
2+
// Copyright 2020 - 2026 Pionix GmbH and Contributors to EVerest
33

44
/**
5-
* @file Thread pool
6-
* @brief info
5+
* @file thread_pool.hpp
6+
* @brief Simple fixed-size thread pool implementation.
77
*/
88

99
#pragma once
@@ -15,32 +15,59 @@
1515

1616
namespace everest::lib::util {
1717

18+
/**
19+
* @brief A thread safe fixed-size pool for task execution.
20+
* @details This pool maintains a constant number of worker threads. It provides two
21+
* interfaces for task submission: operator() for tasks requiring a return value
22+
* (via std::future) and run() for fire-and-forget tasks.
23+
*/
1824
class thread_pool {
1925
public:
26+
/** @brief Type definition for the tasks held in the queue. */
2027
using action = std::function<void()>;
28+
29+
/**
30+
* @brief Constructs the thread pool and spawns worker threads.
31+
* @param thread_count The number of worker threads to maintain.
32+
*/
2133
thread_pool(unsigned int thread_count) {
22-
auto action = [this] {
34+
auto worker_loop = [this] {
2335
while (auto task = m_action_queue.wait_and_pop()) {
2436
// Task successful, execute it while handling exceptions
2537
try {
2638
task.value()();
2739
} catch (...) {
28-
// Log the error if possible, but keep the thread alive.
40+
// Keep the worker alive even if the task fails.
41+
// Exceptions for operator() are handled in the promise wrapper.
2942
}
3043
}
3144
};
32-
for (size_t i = 0; i < thread_count; ++i) {
33-
m_threads.emplace_back(action);
45+
for (std::size_t i = 0; i < thread_count; ++i) {
46+
m_threads.emplace_back(worker_loop);
3447
}
3548
}
3649

50+
/**
51+
* @brief Destructor. Signals all threads to stop and joins them.
52+
* @details Unblocks any threads waiting on the queue before joining.
53+
*/
3754
~thread_pool() {
3855
m_action_queue.stop();
3956
for (auto& elem : m_threads) {
40-
elem.join();
57+
if (elem.joinable()) {
58+
elem.join();
59+
}
4160
}
4261
}
4362

63+
/**
64+
* @brief Submits a task to the pool and returns a future for its result.
65+
* @tparam F The type of the callable.
66+
* @tparam Args The types of the arguments to pass to the callable.
67+
* @param f The callable to execute.
68+
* @param args The arguments to pass to the callable.
69+
* @return A std::future that will eventually contain the result of the callable.
70+
*/
4471
template <typename F, typename... Args>
4572
auto operator()(F&& f, Args&&... args) const -> std::future<std::invoke_result_t<F, Args...>> {
4673
using R = std::invoke_result_t<F, Args...>;
@@ -52,35 +79,57 @@ class thread_pool {
5279
return fut;
5380
}
5481

82+
/**
83+
* @brief Submits a "fire-and-forget" task to the pool.
84+
* @details This method is highly efficient as it avoids the overhead of creating
85+
* std::promise and std::future objects. It returns immediately after the task
86+
* is added to the queue.
87+
* @tparam F The type of the callable.
88+
* @tparam Args The types of the arguments to pass to the callable.
89+
* @param f The callable to execute.
90+
* @param args The arguments to pass to the callable.
91+
*/
5592
template <typename F, typename... Args> void run(F&& f, Args&&... args) const {
56-
this->operator()(std::forward<F>(f), std::forward<Args>(args)...);
93+
m_action_queue.push(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
5794
}
5895

5996
private:
97+
/**
98+
* @brief Helper to set a promise value for non-void return types.
99+
*/
60100
template <typename Fut, typename F> static void promise_set_value(std::promise<Fut>& prom, F& f) {
61101
prom.set_value(f());
62102
}
63103

104+
/**
105+
* @brief Helper to set a promise value for void return types.
106+
*/
64107
template <typename F> static void promise_set_value(std::promise<void>& prom, F& f) {
65108
f();
66109
prom.set_value();
67110
}
68111

112+
/**
113+
* @brief Wraps a task with a promise and enqueues it.
114+
*/
69115
template <typename F, typename... Args, typename R>
70116
void enqueue_task(F&& f, std::shared_ptr<std::promise<R>>& prom, Args&&... args) const {
71-
72117
auto bound_f = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
73118
m_action_queue.push([prom, task_f = std::move(bound_f)]() mutable {
74119
try {
75120
promise_set_value(*prom, task_f);
76-
} catch (std::exception&) {
121+
} catch (...) {
122+
// Ensure promise is settled with an exception if task throws
77123
prom->set_exception(std::current_exception());
78124
}
79125
});
80126
}
127+
128+
/** @brief Thread safe queue for incoming tasks. */
81129
mutable thread_safe_queue<action> m_action_queue;
130+
131+
/** @brief Container for worker thread handles. */
82132
std::vector<std::thread> m_threads;
83133
};
84134

85-
namespace everest::lib::util {}
86135
} // namespace everest::lib::util

0 commit comments

Comments
 (0)