Skip to content

Commit 8de4b90

Browse files
authored
avoid adding notify waitable twice to events-executor collection (#2564)
* avoid adding notify waitable twice to events-executor entities collection Signed-off-by: Alberto Soragna <[email protected]> * remove redundant mutex lock Signed-off-by: Alberto Soragna <[email protected]> --------- Signed-off-by: Alberto Soragna <[email protected]>
1 parent 8230d15 commit 8de4b90

File tree

2 files changed

+64
-6
lines changed

2 files changed

+64
-6
lines changed

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,14 +415,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
415415
// We could explicitly check for the notify waitable ID when we receive a waitable event
416416
// but I think that it's better if the waitable was in the collection and it could be
417417
// retrieved in the "standard" way.
418-
// To do it, we need to add the notify waitable as an entry in both the new and
419-
// current collections such that it's neither added or removed.
418+
// To do it, we need to add the notify waitable as an entry in the new collection
419+
// such that it's neither added or removed (it should have already been added
420+
// to the current collection in the constructor)
420421
this->add_notify_waitable_to_collection(new_collection.waitables);
421422

422-
// Acquire lock before modifying the current collection
423-
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
424-
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
425-
426423
this->refresh_current_collection(new_collection);
427424
}
428425

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,67 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
807807
}
808808
}
809809

810+
// Check that executors are correctly notified while they are spinning
811+
// we notify twice to ensure that the notify waitable is still working
812+
// after the first notification
813+
TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning)
814+
{
815+
using ExecutorType = TypeParam;
816+
817+
// Create executor, add the node and start spinning
818+
ExecutorType executor;
819+
executor.add_node(this->node);
820+
std::thread spinner([&]() {executor.spin();});
821+
822+
// Wait for executor to be spinning
823+
while (!executor.is_spinning()) {
824+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
825+
}
826+
827+
// Create the first subscription while the executor is already spinning
828+
std::atomic<size_t> sub1_msg_count {0};
829+
auto sub1 = this->node->template create_subscription<test_msgs::msg::Empty>(
830+
this->publisher->get_topic_name(),
831+
rclcpp::QoS(10),
832+
[&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
833+
sub1_msg_count++;
834+
});
835+
836+
// Publish a message and verify it's received
837+
this->publisher->publish(test_msgs::msg::Empty());
838+
auto start = std::chrono::steady_clock::now();
839+
while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) {
840+
std::this_thread::sleep_for(1ms);
841+
}
842+
EXPECT_EQ(sub1_msg_count, 1u);
843+
844+
// Create a second subscription while the executor is already spinning
845+
std::atomic<size_t> sub2_msg_count {0};
846+
auto sub2 = this->node->template create_subscription<test_msgs::msg::Empty>(
847+
this->publisher->get_topic_name(),
848+
rclcpp::QoS(10),
849+
[&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
850+
sub2_msg_count++;
851+
});
852+
853+
// Publish a message and verify it's received by both subscriptions
854+
this->publisher->publish(test_msgs::msg::Empty());
855+
start = std::chrono::steady_clock::now();
856+
while (
857+
sub1_msg_count == 1 &&
858+
sub2_msg_count == 0 &&
859+
(std::chrono::steady_clock::now() - start) < 10s)
860+
{
861+
std::this_thread::sleep_for(1ms);
862+
}
863+
EXPECT_EQ(sub1_msg_count, 2u);
864+
EXPECT_EQ(sub2_msg_count, 1u);
865+
866+
// Cancel needs to be called before join, so that executor.spin() returns.
867+
executor.cancel();
868+
spinner.join();
869+
}
870+
810871
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
811872
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
812873
{

0 commit comments

Comments
 (0)