Skip to content

Commit f27bdbf

Browse files
alsoraapojomovsky
authored andcommitted
avoid adding notify waitable twice to events-executor collection (ros2#2564)
* avoid adding notify waitable twice to events-executor entities collection Signed-off-by: Alberto Soragna <[email protected]> * remove redundant mutex lock Signed-off-by: Alberto Soragna <[email protected]> --------- Signed-off-by: Alberto Soragna <[email protected]>
1 parent 5929f64 commit f27bdbf

File tree

2 files changed

+64
-6
lines changed

2 files changed

+64
-6
lines changed

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,14 +415,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
415415
// We could explicitly check for the notify waitable ID when we receive a waitable event
416416
// but I think that it's better if the waitable was in the collection and it could be
417417
// retrieved in the "standard" way.
418-
// To do it, we need to add the notify waitable as an entry in both the new and
419-
// current collections such that it's neither added or removed.
418+
// To do it, we need to add the notify waitable as an entry in the new collection
419+
// such that it's neither added or removed (it should have already been added
420+
// to the current collection in the constructor)
420421
this->add_notify_waitable_to_collection(new_collection.waitables);
421422

422-
// Acquire lock before modifying the current collection
423-
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
424-
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
425-
426423
this->refresh_current_collection(new_collection);
427424
}
428425

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,67 @@ TYPED_TEST(TestExecutors, stressAddRemoveNode)
859859
spinner_thread.join();
860860
}
861861

862+
// Check that executors are correctly notified while they are spinning
863+
// we notify twice to ensure that the notify waitable is still working
864+
// after the first notification
865+
TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning)
866+
{
867+
using ExecutorType = TypeParam;
868+
869+
// Create executor, add the node and start spinning
870+
ExecutorType executor;
871+
executor.add_node(this->node);
872+
std::thread spinner([&]() {executor.spin();});
873+
874+
// Wait for executor to be spinning
875+
while (!executor.is_spinning()) {
876+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
877+
}
878+
879+
// Create the first subscription while the executor is already spinning
880+
std::atomic<size_t> sub1_msg_count {0};
881+
auto sub1 = this->node->template create_subscription<test_msgs::msg::Empty>(
882+
this->publisher->get_topic_name(),
883+
rclcpp::QoS(10),
884+
[&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
885+
sub1_msg_count++;
886+
});
887+
888+
// Publish a message and verify it's received
889+
this->publisher->publish(test_msgs::msg::Empty());
890+
auto start = std::chrono::steady_clock::now();
891+
while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) {
892+
std::this_thread::sleep_for(1ms);
893+
}
894+
EXPECT_EQ(sub1_msg_count, 1u);
895+
896+
// Create a second subscription while the executor is already spinning
897+
std::atomic<size_t> sub2_msg_count {0};
898+
auto sub2 = this->node->template create_subscription<test_msgs::msg::Empty>(
899+
this->publisher->get_topic_name(),
900+
rclcpp::QoS(10),
901+
[&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
902+
sub2_msg_count++;
903+
});
904+
905+
// Publish a message and verify it's received by both subscriptions
906+
this->publisher->publish(test_msgs::msg::Empty());
907+
start = std::chrono::steady_clock::now();
908+
while (
909+
sub1_msg_count == 1 &&
910+
sub2_msg_count == 0 &&
911+
(std::chrono::steady_clock::now() - start) < 10s)
912+
{
913+
std::this_thread::sleep_for(1ms);
914+
}
915+
EXPECT_EQ(sub1_msg_count, 2u);
916+
EXPECT_EQ(sub2_msg_count, 1u);
917+
918+
// Cancel needs to be called before join, so that executor.spin() returns.
919+
executor.cancel();
920+
spinner.join();
921+
}
922+
862923
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
863924
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
864925
{

0 commit comments

Comments
 (0)