@@ -273,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
273273 switch (event.type ) {
274274 case ExecutorEventType::CLIENT_EVENT:
275275 {
276- auto client = this ->retrieve_entity (
277- static_cast <const rcl_client_t *>(event.entity_key ),
278- current_entities_collection_->clients );
279-
276+ rclcpp::ClientBase::SharedPtr client;
277+ {
278+ std::lock_guard<std::recursive_mutex> lock (collection_mutex_);
279+ client = this ->retrieve_entity (
280+ static_cast <const rcl_client_t *>(event.entity_key ),
281+ current_entities_collection_->clients );
282+ }
280283 if (client) {
281284 for (size_t i = 0 ; i < event.num_events ; i++) {
282285 execute_client (client);
@@ -287,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
287290 }
288291 case ExecutorEventType::SUBSCRIPTION_EVENT:
289292 {
290- auto subscription = this ->retrieve_entity (
291- static_cast <const rcl_subscription_t *>(event.entity_key ),
292- current_entities_collection_->subscriptions );
293+ rclcpp::SubscriptionBase::SharedPtr subscription;
294+ {
295+ std::lock_guard<std::recursive_mutex> lock (collection_mutex_);
296+ subscription = this ->retrieve_entity (
297+ static_cast <const rcl_subscription_t *>(event.entity_key ),
298+ current_entities_collection_->subscriptions );
299+ }
293300 if (subscription) {
294301 for (size_t i = 0 ; i < event.num_events ; i++) {
295302 execute_subscription (subscription);
@@ -299,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
299306 }
300307 case ExecutorEventType::SERVICE_EVENT:
301308 {
302- auto service = this ->retrieve_entity (
303- static_cast <const rcl_service_t *>(event.entity_key ),
304- current_entities_collection_->services );
305-
309+ rclcpp::ServiceBase::SharedPtr service;
310+ {
311+ std::lock_guard<std::recursive_mutex> lock (collection_mutex_);
312+ service = this ->retrieve_entity (
313+ static_cast <const rcl_service_t *>(event.entity_key ),
314+ current_entities_collection_->services );
315+ }
306316 if (service) {
307317 for (size_t i = 0 ; i < event.num_events ; i++) {
308318 execute_service (service);
@@ -319,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
319329 }
320330 case ExecutorEventType::WAITABLE_EVENT:
321331 {
322- auto waitable = this ->retrieve_entity (
323- static_cast <const rclcpp::Waitable *>(event.entity_key ),
324- current_entities_collection_->waitables );
332+ rclcpp::Waitable::SharedPtr waitable;
333+ {
334+ std::lock_guard<std::recursive_mutex> lock (collection_mutex_);
335+ waitable = this ->retrieve_entity (
336+ static_cast <const rclcpp::Waitable *>(event.entity_key ),
337+ current_entities_collection_->waitables );
338+ }
325339 if (waitable) {
326340 for (size_t i = 0 ; i < event.num_events ; i++) {
327341 auto data = waitable->take_data_by_entity_id (event.waitable_data );
@@ -386,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
386400void
387401EventsExecutor::refresh_current_collection_from_callback_groups ()
388402{
403+ // Build the new collection
389404 this ->entities_collector_ ->update_collections ();
390405 auto callback_groups = this ->entities_collector_ ->get_all_callback_groups ();
391406 rclcpp::executors::ExecutorEntitiesCollection new_collection;
@@ -400,6 +415,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
400415 // To do it, we need to add the notify waitable as an entry in both the new and
401416 // current collections such that it's neither added or removed.
402417 this ->add_notify_waitable_to_collection (new_collection.waitables );
418+
419+ // Acquire lock before modifying the current collection
420+ std::lock_guard<std::recursive_mutex> lock (collection_mutex_);
403421 this ->add_notify_waitable_to_collection (current_entities_collection_->waitables );
404422
405423 this ->refresh_current_collection (new_collection);
409427EventsExecutor::refresh_current_collection (
410428 const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
411429{
430+ // Acquire lock before modifying the current collection
431+ std::lock_guard<std::recursive_mutex> lock (collection_mutex_);
432+
412433 current_entities_collection_->timers .update (
413434 new_collection.timers ,
414435 [this ](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer (timer);},
0 commit comments