Skip to content

Commit beda096

Browse files
author
Chen Lihui
authored
Topic node guard condition in executor (#2074)
* add a test Signed-off-by: Chen Lihui <[email protected]> * rollback the node guard condition for exectuor Signed-off-by: Chen Lihui <[email protected]> --------- Signed-off-by: Chen Lihui <[email protected]>
1 parent 1fd5a96 commit beda096

File tree

3 files changed

+75
-0
lines changed

3 files changed

+75
-0
lines changed

rclcpp/include/rclcpp/executor.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,20 @@ class Executor
560560
virtual void
561561
spin_once_impl(std::chrono::nanoseconds timeout);
562562

563+
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
564+
const rclcpp::GuardCondition *,
565+
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
566+
WeakNodesToGuardConditionsMap;
567+
563568
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
564569
const rclcpp::GuardCondition *,
565570
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
566571
WeakCallbackGroupsToGuardConditionsMap;
567572

573+
/// maps nodes to guard conditions
574+
WeakNodesToGuardConditionsMap
575+
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
576+
568577
/// maps callback groups to guard conditions
569578
WeakCallbackGroupsToGuardConditionsMap
570579
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

rclcpp/src/rclcpp/executor.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ Executor::~Executor()
112112
}
113113
weak_groups_to_guard_conditions_.clear();
114114

115+
for (const auto & pair : weak_nodes_to_guard_conditions_) {
116+
auto guard_condition = pair.second;
117+
memory_strategy_->remove_guard_condition(guard_condition);
118+
}
119+
weak_nodes_to_guard_conditions_.clear();
120+
115121
// Finalize the wait set.
116122
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
117123
RCUTILS_LOG_ERROR_NAMED(
@@ -274,6 +280,10 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
274280
}
275281
});
276282

283+
const auto & gc = node_ptr->get_notify_guard_condition();
284+
weak_nodes_to_guard_conditions_[node_ptr] = &gc;
285+
// Add the node's notify condition to the guard condition handles
286+
memory_strategy_->add_guard_condition(gc);
277287
weak_nodes_.push_back(node_ptr);
278288
}
279289

@@ -378,6 +388,9 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
378388
}
379389
}
380390

391+
memory_strategy_->remove_guard_condition(&node_ptr->get_notify_guard_condition());
392+
weak_nodes_to_guard_conditions_.erase(node_ptr);
393+
381394
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
382395
has_executor.store(false);
383396
}
@@ -706,6 +719,12 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
706719
auto weak_node_ptr = pair.second;
707720
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
708721
invalid_group_ptrs.push_back(weak_group_ptr);
722+
auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr);
723+
if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) {
724+
auto guard_condition = node_guard_pair->second;
725+
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
726+
memory_strategy_->remove_guard_condition(guard_condition);
727+
}
709728
}
710729
}
711730
std::for_each(

rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,53 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess
340340
EXPECT_TRUE(received_message_future.get());
341341
}
342342

343+
/*
344+
* Test callback group created after spin.
345+
* A subscriber with a new callback group that created after executor spin not received a message
346+
* because the executor can't be triggered while a subscriber created, see
347+
* https://github.com/ros2/rclcpp/issues/2067
348+
*/
349+
TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin)
350+
{
351+
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
352+
353+
// create a publisher to send data
354+
rclcpp::QoS qos = rclcpp::QoS(1).reliable().transient_local();
355+
rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher =
356+
node->create_publisher<test_msgs::msg::Empty>("topic_name", qos);
357+
publisher->publish(test_msgs::msg::Empty());
358+
359+
// create a thread running an executor
360+
rclcpp::executors::SingleThreadedExecutor executor;
361+
executor.add_node(node);
362+
std::promise<bool> received_message_promise;
363+
auto received_message_future = received_message_promise.get_future();
364+
rclcpp::FutureReturnCode return_code = rclcpp::FutureReturnCode::TIMEOUT;
365+
std::thread executor_thread = std::thread(
366+
[&executor, &received_message_future, &return_code]() {
367+
return_code = executor.spin_until_future_complete(received_message_future, 5s);
368+
});
369+
370+
// to create a callback group after spin
371+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
372+
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
373+
rclcpp::CallbackGroupType::MutuallyExclusive);
374+
375+
// expect the subscriber to receive a message
376+
auto sub_callback = [&received_message_promise](test_msgs::msg::Empty::ConstSharedPtr) {
377+
received_message_promise.set_value(true);
378+
};
379+
// create a subscription using the `cb_grp` callback group
380+
auto options = rclcpp::SubscriptionOptions();
381+
options.callback_group = cb_grp;
382+
rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription =
383+
node->create_subscription<test_msgs::msg::Empty>("topic_name", qos, sub_callback, options);
384+
385+
executor_thread.join();
386+
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
387+
EXPECT_TRUE(received_message_future.get());
388+
}
389+
343390
/*
344391
* Test removing callback group from executor that its not associated with.
345392
*/

0 commit comments

Comments
 (0)