Skip to content

Commit 7ccd64c

Browse files
authored
Allow to add/remove nodes thread safely in rclcpp::Executor (ros2#1505)
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent 8d2c682 commit 7ccd64c

File tree

3 files changed

+68
-19
lines changed

3 files changed

+68
-19
lines changed

rclcpp/include/rclcpp/executor.hpp

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ class Executor
448448
RCLCPP_PUBLIC
449449
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
450450
get_node_by_group(
451-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes,
451+
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
452452
rclcpp::CallbackGroup::SharedPtr group);
453453

454454
/// Return true if the node has been added to this executor.
@@ -460,7 +460,7 @@ class Executor
460460
bool
461461
has_node(
462462
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
463-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const;
463+
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
464464

465465
RCLCPP_PUBLIC
466466
rclcpp::CallbackGroup::SharedPtr
@@ -476,7 +476,7 @@ class Executor
476476
rclcpp::CallbackGroup::SharedPtr group_ptr,
477477
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
478478
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
479-
bool notify = true);
479+
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
480480

481481
/// Remove a callback group from the executor.
482482
/**
@@ -487,7 +487,7 @@ class Executor
487487
remove_callback_group_from_map(
488488
rclcpp::CallbackGroup::SharedPtr group_ptr,
489489
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
490-
bool notify = true);
490+
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
491491

492492
RCLCPP_PUBLIC
493493
bool
@@ -497,7 +497,7 @@ class Executor
497497
bool
498498
get_next_ready_executable_from_map(
499499
AnyExecutable & any_executable,
500-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes);
500+
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
501501

502502
RCLCPP_PUBLIC
503503
bool
@@ -518,7 +518,7 @@ class Executor
518518
*/
519519
RCLCPP_PUBLIC
520520
virtual void
521-
add_callback_groups_from_nodes_associated_to_executor();
521+
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
522522

523523
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
524524
std::atomic_bool spinning;
@@ -532,10 +532,11 @@ class Executor
532532
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
533533

534534
// Mutex to protect the subsequent memory_strategy_.
535-
std::mutex memory_strategy_mutex_;
535+
mutable std::mutex mutex_;
536536

537537
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
538-
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
538+
memory_strategy::MemoryStrategy::SharedPtr
539+
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
539540

540541
/// The context associated with this executor.
541542
std::shared_ptr<rclcpp::Context> context_;
@@ -552,19 +553,24 @@ class Executor
552553
WeakNodesToGuardConditionsMap;
553554

554555
/// maps nodes to guard conditions
555-
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
556+
WeakNodesToGuardConditionsMap
557+
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
556558

557559
/// maps callback groups associated to nodes
558-
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
560+
WeakCallbackGroupsToNodesMap
561+
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
559562

560563
/// maps callback groups to nodes associated with executor
561-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
564+
WeakCallbackGroupsToNodesMap
565+
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
562566

563567
/// maps all callback groups to nodes
564-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_;
568+
WeakCallbackGroupsToNodesMap
569+
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
565570

566571
/// nodes that are associated with the executor
567-
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
572+
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
573+
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
568574
};
569575

570576
namespace executor

rclcpp/src/rclcpp/executor.cpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
144144
Executor::get_all_callback_groups()
145145
{
146146
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
147+
std::lock_guard<std::mutex> guard{mutex_};
147148
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
148149
groups.push_back(group_node_ptr.first);
149150
}
@@ -157,6 +158,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
157158
Executor::get_manually_added_callback_groups()
158159
{
159160
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
161+
std::lock_guard<std::mutex> guard{mutex_};
160162
for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
161163
groups.push_back(group_node_ptr.first);
162164
}
@@ -167,6 +169,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
167169
Executor::get_automatically_added_callback_groups_from_nodes()
168170
{
169171
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
172+
std::lock_guard<std::mutex> guard{mutex_};
170173
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
171174
groups.push_back(group_node_ptr.first);
172175
}
@@ -233,7 +236,6 @@ Executor::add_callback_group_to_map(
233236
}
234237
}
235238
// Add the node's notify condition to the guard condition handles
236-
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
237239
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
238240
}
239241
}
@@ -244,6 +246,7 @@ Executor::add_callback_group(
244246
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
245247
bool notify)
246248
{
249+
std::lock_guard<std::mutex> guard{mutex_};
247250
this->add_callback_group_to_map(
248251
group_ptr,
249252
node_ptr,
@@ -259,6 +262,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
259262
if (has_executor.exchange(true)) {
260263
throw std::runtime_error("Node has already been added to an executor.");
261264
}
265+
std::lock_guard<std::mutex> guard{mutex_};
262266
for (auto & weak_group : node_ptr->get_callback_groups()) {
263267
auto group_ptr = weak_group.lock();
264268
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
@@ -307,7 +311,6 @@ Executor::remove_callback_group_from_map(
307311
throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group remove");
308312
}
309313
}
310-
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
311314
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
312315
}
313316
}
@@ -317,6 +320,7 @@ Executor::remove_callback_group(
317320
rclcpp::CallbackGroup::SharedPtr group_ptr,
318321
bool notify)
319322
{
323+
std::lock_guard<std::mutex> guard{mutex_};
320324
this->remove_callback_group_from_map(
321325
group_ptr,
322326
weak_groups_associated_with_executor_to_nodes_,
@@ -336,6 +340,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
336340
throw std::runtime_error("Node needs to be associated with an executor.");
337341
}
338342

343+
std::lock_guard<std::mutex> guard{mutex_};
339344
bool found_node = false;
340345
auto node_it = weak_nodes_.begin();
341346
while (node_it != weak_nodes_.end()) {
@@ -489,6 +494,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
489494
if (memory_strategy == nullptr) {
490495
throw std::runtime_error("Received NULL memory strategy in executor.");
491496
}
497+
std::lock_guard<std::mutex> guard{mutex_};
492498
memory_strategy_ = memory_strategy;
493499
}
494500

@@ -662,7 +668,7 @@ void
662668
Executor::wait_for_work(std::chrono::nanoseconds timeout)
663669
{
664670
{
665-
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
671+
std::lock_guard<std::mutex> guard(mutex_);
666672

667673
// Check weak_nodes_ to find any callback group that is not owned
668674
// by an executor and add it to the list of callbackgroups for
@@ -741,12 +747,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
741747

742748
// check the null handles in the wait set and remove them from the handles in memory strategy
743749
// for callback-based entities
750+
std::lock_guard<std::mutex> guard(mutex_);
744751
memory_strategy_->remove_null_handles(&wait_set_);
745752
}
746753

747754
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
748755
Executor::get_node_by_group(
749-
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes,
756+
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
757+
weak_groups_to_nodes,
750758
rclcpp::CallbackGroup::SharedPtr group)
751759
{
752760
if (!group) {
@@ -764,6 +772,7 @@ Executor::get_node_by_group(
764772
rclcpp::CallbackGroup::SharedPtr
765773
Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
766774
{
775+
std::lock_guard<std::mutex> guard{mutex_};
767776
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
768777
auto group = pair.first.lock();
769778
if (!group) {
@@ -804,9 +813,11 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
804813
bool
805814
Executor::get_next_ready_executable_from_map(
806815
AnyExecutable & any_executable,
807-
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes)
816+
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
817+
weak_groups_to_nodes)
808818
{
809819
bool success = false;
820+
std::lock_guard<std::mutex> guard{mutex_};
810821
// Check the timers to see if there are any that are ready
811822
memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
812823
if (any_executable.timer) {
@@ -893,7 +904,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos
893904
bool
894905
Executor::has_node(
895906
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
896-
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const
907+
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
908+
weak_groups_to_nodes) const
897909
{
898910
return std::find_if(
899911
weak_groups_to_nodes.begin(),

rclcpp/test/rclcpp/test_executor.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "rclcpp/executor.hpp"
2323
#include "rclcpp/memory_strategy.hpp"
24+
#include "rclcpp/executors/single_threaded_executor.hpp"
2425
#include "rclcpp/strategies/allocator_memory_strategy.hpp"
2526

2627
#include "../mocking_utils/patch.hpp"
@@ -54,6 +55,7 @@ class DummyExecutor : public rclcpp::Executor
5455
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group(
5556
rclcpp::CallbackGroup::SharedPtr group)
5657
{
58+
std::lock_guard<std::mutex> guard_{mutex_}; // only to make the TSA happy
5759
return get_node_by_group(weak_groups_to_nodes_, group);
5860
}
5961

@@ -87,6 +89,35 @@ MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, !=)
8789
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, <)
8890
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, >)
8991

92+
TEST_F(TestExecutor, add_remove_node_thread_safe) {
93+
using namespace std::chrono_literals;
94+
95+
// Create an Executor
96+
rclcpp::executors::SingleThreadedExecutor executor;
97+
98+
auto future = std::async(std::launch::async, [&executor] {executor.spin();});
99+
100+
// Add and remove nodes repeatedly
101+
// Test that this does not cause a segfault
102+
size_t num_nodes = 100;
103+
for (size_t i = 0; i < num_nodes; ++i) {
104+
std::ostringstream name;
105+
name << "node_" << i;
106+
auto node = std::make_shared<rclcpp::Node>(name.str());
107+
executor.add_node(node);
108+
// Sleeping here helps exaggerate the issue
109+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
110+
executor.remove_node(node);
111+
}
112+
std::future_status future_status = std::future_status::timeout;
113+
do {
114+
executor.cancel();
115+
future_status = future.wait_for(1s);
116+
} while (future_status == std::future_status::timeout);
117+
EXPECT_EQ(future_status, std::future_status::ready);
118+
future.get();
119+
}
120+
90121
TEST_F(TestExecutor, constructor_bad_guard_condition_init) {
91122
auto mock = mocking_utils::patch_and_return(
92123
"lib:rclcpp", rcl_guard_condition_init, RCL_RET_ERROR);

0 commit comments

Comments
 (0)