Skip to content

Commit 1709df8

Browse files
alsoraapojomovsky
authored andcommitted
add mutex to protect events_executor current entity collection (ros2#2187)
* add mutex to protect events_executor current entity collection and unit-test Signed-off-by: Alberto Soragna <[email protected]> * be more precise with mutex locks; make stress test less stressfull Signed-off-by: Alberto Soragna <[email protected]> * fix uncrustify error Signed-off-by: Alberto Soragna <[email protected]> --------- Signed-off-by: Alberto Soragna <[email protected]>
1 parent a838046 commit 1709df8

File tree

3 files changed

+93
-15
lines changed

3 files changed

+93
-15
lines changed

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,12 @@ class EventsExecutor : public rclcpp::Executor
274274
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
275275

276276
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
277-
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
278277
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
279278

279+
/// Mutex to protect the current_entities_collection_
280+
std::recursive_mutex collection_mutex_;
281+
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
282+
280283
/// Flag used to reduce the number of unnecessary waitable events
281284
std::atomic<bool> notify_waitable_event_pushed_ {false};
282285

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
275275
switch (event.type) {
276276
case ExecutorEventType::CLIENT_EVENT:
277277
{
278-
auto client = this->retrieve_entity(
279-
static_cast<const rcl_client_t *>(event.entity_key),
280-
current_entities_collection_->clients);
281-
278+
rclcpp::ClientBase::SharedPtr client;
279+
{
280+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
281+
client = this->retrieve_entity(
282+
static_cast<const rcl_client_t *>(event.entity_key),
283+
current_entities_collection_->clients);
284+
}
282285
if (client) {
283286
for (size_t i = 0; i < event.num_events; i++) {
284287
execute_client(client);
@@ -289,9 +292,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
289292
}
290293
case ExecutorEventType::SUBSCRIPTION_EVENT:
291294
{
292-
auto subscription = this->retrieve_entity(
293-
static_cast<const rcl_subscription_t *>(event.entity_key),
294-
current_entities_collection_->subscriptions);
295+
rclcpp::SubscriptionBase::SharedPtr subscription;
296+
{
297+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
298+
subscription = this->retrieve_entity(
299+
static_cast<const rcl_subscription_t *>(event.entity_key),
300+
current_entities_collection_->subscriptions);
301+
}
295302
if (subscription) {
296303
for (size_t i = 0; i < event.num_events; i++) {
297304
execute_subscription(subscription);
@@ -301,10 +308,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
301308
}
302309
case ExecutorEventType::SERVICE_EVENT:
303310
{
304-
auto service = this->retrieve_entity(
305-
static_cast<const rcl_service_t *>(event.entity_key),
306-
current_entities_collection_->services);
307-
311+
rclcpp::ServiceBase::SharedPtr service;
312+
{
313+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
314+
service = this->retrieve_entity(
315+
static_cast<const rcl_service_t *>(event.entity_key),
316+
current_entities_collection_->services);
317+
}
308318
if (service) {
309319
for (size_t i = 0; i < event.num_events; i++) {
310320
execute_service(service);
@@ -321,9 +331,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
321331
}
322332
case ExecutorEventType::WAITABLE_EVENT:
323333
{
324-
auto waitable = this->retrieve_entity(
325-
static_cast<const rclcpp::Waitable *>(event.entity_key),
326-
current_entities_collection_->waitables);
334+
rclcpp::Waitable::SharedPtr waitable;
335+
{
336+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
337+
waitable = this->retrieve_entity(
338+
static_cast<const rclcpp::Waitable *>(event.entity_key),
339+
current_entities_collection_->waitables);
340+
}
327341
if (waitable) {
328342
for (size_t i = 0; i < event.num_events; i++) {
329343
auto data = waitable->take_data_by_entity_id(event.waitable_data);
@@ -388,6 +402,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
388402
void
389403
EventsExecutor::refresh_current_collection_from_callback_groups()
390404
{
405+
// Build the new collection
391406
this->entities_collector_->update_collections();
392407
auto callback_groups = this->entities_collector_->get_all_callback_groups();
393408
rclcpp::executors::ExecutorEntitiesCollection new_collection;
@@ -402,6 +417,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
402417
// To do it, we need to add the notify waitable as an entry in both the new and
403418
// current collections such that it's neither added or removed.
404419
this->add_notify_waitable_to_collection(new_collection.waitables);
420+
421+
// Acquire lock before modifying the current collection
422+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
405423
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
406424

407425
this->refresh_current_collection(new_collection);
@@ -411,6 +429,9 @@ void
411429
EventsExecutor::refresh_current_collection(
412430
const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
413431
{
432+
// Acquire lock before modifying the current collection
433+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
434+
414435
current_entities_collection_->timers.update(
415436
new_collection.timers,
416437
[this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,60 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
803803
}
804804
}
805805

806+
// This test verifies the thread-safety of adding and removing a node
807+
// while the executor is spinning and events are ready.
808+
// This test does not contain expectations, but rather it verifies that
809+
// we can run a "stressful routine" without crashing.
810+
TYPED_TEST(TestExecutors, stressAddRemoveNode)
811+
{
812+
using ExecutorType = TypeParam;
813+
// rmw_connextdds doesn't support events-executor
814+
if (
815+
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
816+
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
817+
{
818+
GTEST_SKIP();
819+
}
820+
821+
ExecutorType executor;
822+
823+
// A timer that is "always" ready (the timer callback doesn't do anything)
824+
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});
825+
826+
// This thread spins the executor until it's cancelled
827+
std::thread spinner_thread([&]() {
828+
executor.spin();
829+
});
830+
831+
// This thread publishes data in a busy loop (the node has a subscription)
832+
std::thread publisher_thread1([&]() {
833+
for (size_t i = 0; i < 100000; i++) {
834+
this->publisher->publish(test_msgs::msg::Empty());
835+
}
836+
});
837+
std::thread publisher_thread2([&]() {
838+
for (size_t i = 0; i < 100000; i++) {
839+
this->publisher->publish(test_msgs::msg::Empty());
840+
}
841+
});
842+
843+
// This thread adds/remove the node that contains the entities in a busy loop
844+
std::thread add_remove_thread([&]() {
845+
for (size_t i = 0; i < 100000; i++) {
846+
executor.add_node(this->node);
847+
executor.remove_node(this->node);
848+
}
849+
});
850+
851+
// Wait for the threads that do real work to finish
852+
publisher_thread1.join();
853+
publisher_thread2.join();
854+
add_remove_thread.join();
855+
856+
executor.cancel();
857+
spinner_thread.join();
858+
}
859+
806860
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
807861
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
808862
{

0 commit comments

Comments
 (0)