Skip to content

Commit 3edeb90

Browse files
author
Janosch Machowinski
committed
feat(MultiThreadedExecutor): Added ability to handle exceptions from threads
This commit adds external exception handling for the worker threads, allowing application code to implement custom exception handling. Signed-off-by: Janosch Machowinski <[email protected]> # Conflicts: # rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
1 parent f510db1 commit 3edeb90

File tree

3 files changed

+102
-31
lines changed

3 files changed

+102
-31
lines changed

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,26 @@ class MultiThreadedExecutor : public rclcpp::Executor
6969
void
7070
spin() override;
7171

72+
/**
73+
* \sa rclcpp::Executor:spin() for more details
74+
* \throws std::runtime_error when spin() called while already spinning
75+
* @param exception_handler will be called for every exception in the processing threads
76+
*
77+
* The exception_handler can be called from multiple threads at the same time.
78+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
79+
*/
80+
RCLCPP_PUBLIC
81+
void
82+
spin(std::function<void(const std::exception & e)> exception_handler);
83+
7284
RCLCPP_PUBLIC
7385
size_t
7486
get_number_of_threads();
7587

7688
protected:
7789
RCLCPP_PUBLIC
7890
void
79-
run(size_t this_thread_number);
91+
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);
8092

8193
private:
8294
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
5151

5252
void
5353
MultiThreadedExecutor::spin()
54+
{
55+
spin([](const std::exception & e) {throw e;});
56+
}
57+
58+
void
59+
60+
MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
5461
{
5562
if (spinning.exchange(true)) {
5663
throw std::runtime_error("spin() called while already spinning");
@@ -61,12 +68,12 @@ MultiThreadedExecutor::spin()
6168
{
6269
std::lock_guard wait_lock{wait_mutex_};
6370
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
64-
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
71+
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
6572
threads.emplace_back(func);
6673
}
6774
}
6875

69-
run(thread_id);
76+
run(thread_id, exception_handler);
7077
for (auto & thread : threads) {
7178
thread.join();
7279
}
@@ -79,40 +86,50 @@ MultiThreadedExecutor::get_number_of_threads()
7986
}
8087

8188
void
82-
MultiThreadedExecutor::run(size_t this_thread_number)
89+
MultiThreadedExecutor::run(
90+
size_t this_thread_number,
91+
std::function<void(const std::exception & e)> exception_handler)
8392
{
84-
(void)this_thread_number;
85-
while (rclcpp::ok(this->context_) && spinning.load()) {
86-
rclcpp::AnyExecutable any_exec;
87-
{
88-
std::lock_guard wait_lock{wait_mutex_};
89-
if (!rclcpp::ok(this->context_) || !spinning.load()) {
90-
return;
93+
try {
94+
(void)this_thread_number;
95+
while (rclcpp::ok(this->context_) && spinning.load()) {
96+
rclcpp::AnyExecutable any_exec;
97+
{
98+
std::lock_guard wait_lock{wait_mutex_};
99+
if (!rclcpp::ok(this->context_) || !spinning.load()) {
100+
return;
101+
}
102+
if (!get_next_executable(any_exec, next_exec_timeout_)) {
103+
continue;
104+
}
91105
}
92-
if (!get_next_executable(any_exec, next_exec_timeout_)) {
93-
continue;
106+
if (yield_before_execute_) {
107+
std::this_thread::yield();
94108
}
95-
}
96-
if (yield_before_execute_) {
97-
std::this_thread::yield();
98-
}
99109

100-
execute_any_executable(any_exec);
101-
102-
if (any_exec.callback_group &&
103-
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
104-
{
105-
try {
106-
interrupt_guard_condition_->trigger();
107-
} catch (const rclcpp::exceptions::RCLError & ex) {
108-
throw std::runtime_error(
109-
std::string(
110-
"Failed to trigger guard condition on callback group change: ") + ex.what());
110+
execute_any_executable(any_exec);
111+
112+
if (any_exec.callback_group &&
113+
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
114+
{
115+
try {
116+
interrupt_guard_condition_->trigger();
117+
} catch (const rclcpp::exceptions::RCLError & ex) {
118+
throw std::runtime_error(
119+
std::string(
120+
"Failed to trigger guard condition on callback group change: ") + ex.what());
121+
}
111122
}
123+
124+
// Clear the callback_group to prevent the AnyExecutable destructor from
125+
// resetting the callback group `can_be_taken_from`
126+
any_exec.callback_group.reset();
112127
}
128+
} catch (const std::exception & e) {
129+
RCLCPP_ERROR_STREAM(
130+
rclcpp::get_logger("rclcpp"),
131+
"Exception while spinning : " << e.what());
113132

114-
// Clear the callback_group to prevent the AnyExecutable destructor from
115-
// resetting the callback group `can_be_taken_from`
116-
any_exec.callback_group.reset();
133+
exception_handler(e);
117134
}
118135
}

rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,45 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) {
9797
executor.add_node(node);
9898
executor.spin();
9999
}
100+
101+
TEST_F(TestMultiThreadedExecutor, catch_exception) {
102+
rclcpp::executors::MultiThreadedExecutor executor;
103+
104+
std::shared_ptr<rclcpp::Node> node =
105+
std::make_shared<rclcpp::Node>("test_multi_threaded_executor_catch_exception");
106+
107+
const std::string test_reason = "test exception";
108+
109+
std::atomic_bool timer_executed_after_exception = false;
110+
111+
auto timer = node->create_wall_timer(
112+
std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor]()
113+
{
114+
static size_t cnt = 0;
115+
if (cnt == 0) {
116+
cnt++;
117+
throw std::runtime_error(test_reason);
118+
}
119+
120+
timer_executed_after_exception = true;
121+
executor.cancel();
122+
});
123+
124+
std::atomic_bool caught_exception = false;
125+
126+
executor.add_node(node);
127+
executor.spin(
128+
[&caught_exception, &test_reason](const std::exception & e)
129+
{
130+
const std::runtime_error * runtime_error = dynamic_cast<const std::runtime_error *>(&e);
131+
ASSERT_NE(runtime_error, nullptr);
132+
133+
ASSERT_EQ(runtime_error->what(), test_reason);
134+
135+
caught_exception = true;
136+
}
137+
);
138+
139+
ASSERT_TRUE(caught_exception);
140+
ASSERT_TRUE(timer_executed_after_exception);
141+
}

0 commit comments

Comments
 (0)