Skip to content

Commit ba5a1c0

Browse files
alsoraskyegalaxy
authored andcommitted
avoid adding notify waitable twice to events-executor collection (ros2#2564)
* avoid adding notify waitable twice to events-executor entities collection Signed-off-by: Alberto Soragna <[email protected]> * remove redundant mutex lock Signed-off-by: Alberto Soragna <[email protected]> --------- Signed-off-by: Alberto Soragna <[email protected]> (cherry picked from commit f27bdbf)
1 parent bda1276 commit ba5a1c0

File tree

2 files changed

+64
-6
lines changed

2 files changed

+64
-6
lines changed

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,14 +432,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
432432
// We could explicitly check for the notify waitable ID when we receive a waitable event
433433
// but I think that it's better if the waitable was in the collection and it could be
434434
// retrieved in the "standard" way.
435-
// To do it, we need to add the notify waitable as an entry in both the new and
436-
// current collections such that it's neither added or removed.
435+
// To do it, we need to add the notify waitable as an entry in the new collection
436+
// such that it's neither added or removed (it should have already been added
437+
// to the current collection in the constructor)
437438
this->add_notify_waitable_to_collection(new_collection.waitables);
438439

439-
// Acquire lock before modifying the current collection
440-
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
441-
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
442-
443440
this->refresh_current_collection(new_collection);
444441
}
445442

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,67 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
694694
}
695695
}
696696

697+
// Check that executors are correctly notified while they are spinning
698+
// we notify twice to ensure that the notify waitable is still working
699+
// after the first notification
700+
TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning)
701+
{
702+
using ExecutorType = TypeParam;
703+
704+
// Create executor, add the node and start spinning
705+
ExecutorType executor;
706+
executor.add_node(this->node);
707+
std::thread spinner([&]() {executor.spin();});
708+
709+
// Wait for executor to be spinning
710+
while (!executor.is_spinning()) {
711+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
712+
}
713+
714+
// Create the first subscription while the executor is already spinning
715+
std::atomic<size_t> sub1_msg_count {0};
716+
auto sub1 = this->node->template create_subscription<test_msgs::msg::Empty>(
717+
this->publisher->get_topic_name(),
718+
rclcpp::QoS(10),
719+
[&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
720+
sub1_msg_count++;
721+
});
722+
723+
// Publish a message and verify it's received
724+
this->publisher->publish(test_msgs::msg::Empty());
725+
auto start = std::chrono::steady_clock::now();
726+
while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) {
727+
std::this_thread::sleep_for(1ms);
728+
}
729+
EXPECT_EQ(sub1_msg_count, 1u);
730+
731+
// Create a second subscription while the executor is already spinning
732+
std::atomic<size_t> sub2_msg_count {0};
733+
auto sub2 = this->node->template create_subscription<test_msgs::msg::Empty>(
734+
this->publisher->get_topic_name(),
735+
rclcpp::QoS(10),
736+
[&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
737+
sub2_msg_count++;
738+
});
739+
740+
// Publish a message and verify it's received by both subscriptions
741+
this->publisher->publish(test_msgs::msg::Empty());
742+
start = std::chrono::steady_clock::now();
743+
while (
744+
sub1_msg_count == 1 &&
745+
sub2_msg_count == 0 &&
746+
(std::chrono::steady_clock::now() - start) < 10s)
747+
{
748+
std::this_thread::sleep_for(1ms);
749+
}
750+
EXPECT_EQ(sub1_msg_count, 2u);
751+
EXPECT_EQ(sub2_msg_count, 1u);
752+
753+
// Cancel needs to be called before join, so that executor.spin() returns.
754+
executor.cancel();
755+
spinner.join();
756+
}
757+
697758
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
698759
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
699760
{

0 commit comments

Comments
 (0)