Skip to content

Commit 6e1fea1

Browse files
authored
Fix race condition in events-executor (#2177)
The initial implementation of the events-executor contained a bug where the executor would end up in an inconsistent state and stop processing interrupt/shutdown notifications. Manually adding a node to the executor results in a) producing a notify waitable event and b) refreshing the executor collections. The inconsistent state would happen if the event was processed before the collections were finished to be refreshed: the executor would pick up the event but be unable to process it. This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional notify waitable events to be pushed. The behavior is observable only under heavy load. Signed-off-by: Alberto Soragna <[email protected]>
1 parent 86c7714 commit 6e1fea1

File tree

3 files changed

+103
-24
lines changed

3 files changed

+103
-24
lines changed

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,11 @@ class EventsExecutor : public rclcpp::Executor
243243
std::function<void(size_t, int)>
244244
create_waitable_callback(const rclcpp::Waitable * waitable_id);
245245

246+
/// Utility to add the notify waitable to an entities collection
247+
void
248+
add_notify_waitable_to_collection(
249+
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection);
250+
246251
/// Searches for the provided entity_id in the collection and returns the entity if valid
247252
template<typename CollectionType>
248253
typename CollectionType::EntitySharedPtr

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
5050
timers_manager_ =
5151
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
5252

53+
this->current_entities_collection_ =
54+
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
55+
5356
notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
5457
[this]() {
5558
// This callback is invoked when:
@@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
6164
this->refresh_current_collection_from_callback_groups();
6265
});
6366

67+
// Make sure that the notify waitable is immediately added to the collection
68+
// to avoid missing events
69+
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
70+
6471
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
6572
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
6673

@@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(
8794

8895
this->entities_collector_ =
8996
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
90-
91-
this->current_entities_collection_ =
92-
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
9397
}
9498

9599
EventsExecutor::~EventsExecutor()
@@ -395,18 +399,8 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
395399
// retrieved in the "standard" way.
396400
// To do it, we need to add the notify waitable as an entry in both the new and
397401
// current collections such that it's neither added or removed.
398-
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
399-
new_collection.waitables.insert(
400-
{
401-
this->notify_waitable_.get(),
402-
{this->notify_waitable_, weak_group_ptr}
403-
});
404-
405-
this->current_entities_collection_->waitables.insert(
406-
{
407-
this->notify_waitable_.get(),
408-
{this->notify_waitable_, weak_group_ptr}
409-
});
402+
this->add_notify_waitable_to_collection(new_collection.waitables);
403+
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
410404

411405
this->refresh_current_collection(new_collection);
412406
}
@@ -486,3 +480,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
486480
};
487481
return callback;
488482
}
483+
484+
void
485+
EventsExecutor::add_notify_waitable_to_collection(
486+
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection)
487+
{
488+
// The notify waitable is not associated to any group, so use an invalid one
489+
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
490+
collection.insert(
491+
{
492+
this->notify_waitable_.get(),
493+
{this->notify_waitable_, weak_group_ptr}
494+
});
495+
}

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
#include <gtest/gtest.h>
2121

2222
#include <algorithm>
23+
#include <atomic>
2324
#include <chrono>
2425
#include <limits>
2526
#include <memory>
2627
#include <string>
2728
#include <thread>
2829
#include <utility>
30+
#include <vector>
2931

3032
#include "rcl/error_handling.h"
3133
#include "rcl/time.h"
@@ -43,18 +45,10 @@ template<typename T>
4345
class TestExecutors : public ::testing::Test
4446
{
4547
public:
46-
static void SetUpTestCase()
48+
void SetUp()
4749
{
4850
rclcpp::init(0, nullptr);
49-
}
50-
51-
static void TearDownTestCase()
52-
{
53-
rclcpp::shutdown();
54-
}
5551

56-
void SetUp()
57-
{
5852
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
5953
std::stringstream test_name;
6054
test_name << test_info->test_case_name() << "_" << test_info->name();
@@ -75,6 +69,8 @@ class TestExecutors : public ::testing::Test
7569
publisher.reset();
7670
subscription.reset();
7771
node.reset();
72+
73+
rclcpp::shutdown();
7874
}
7975

8076
rclcpp::Node::SharedPtr node;
@@ -729,6 +725,77 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
729725
spinner.join();
730726
}
731727

728+
// This test verifies that the add_node operation is robust wrt race conditions.
729+
// It's mostly meant to prevent regressions in the events-executor, but the operation should be
730+
// thread-safe in all executor implementations.
731+
// The initial implementation of the events-executor contained a bug where the executor
732+
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
733+
// Manually adding a node to the executor results in a) producing a notify waitable event
734+
// and b) refreshing the executor collections.
735+
// The inconsistent state would happen if the event was processed before the collections were
736+
// finished to be refreshed: the executor would pick up the event but be unable to process it.
737+
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
738+
// notify waitable events to be pushed.
739+
// The behavior is observable only under heavy load, so this test spawns several worker
740+
// threads. Due to the nature of the bug, this test may still succeed even if the
741+
// bug is present. However repeated runs will show its flakiness nature and indicate
742+
// an eventual regression.
743+
TYPED_TEST(TestExecutors, testRaceConditionAddNode)
744+
{
745+
using ExecutorType = TypeParam;
746+
// rmw_connextdds doesn't support events-executor
747+
if (
748+
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
749+
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
750+
{
751+
GTEST_SKIP();
752+
}
753+
754+
// Spawn some threads to do some heavy work
755+
std::atomic<bool> should_cancel = false;
756+
std::vector<std::thread> stress_threads;
757+
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
758+
stress_threads.emplace_back(
759+
[&should_cancel, i]() {
760+
// This is just some arbitrary heavy work
761+
volatile size_t total = 0;
762+
for (size_t k = 0; k < 549528914167; k++) {
763+
if (should_cancel) {
764+
break;
765+
}
766+
total += k * (i + 42);
767+
}
768+
});
769+
}
770+
771+
// Create an executor
772+
auto executor = std::make_shared<ExecutorType>();
773+
// Start spinning
774+
auto executor_thread = std::thread(
775+
[executor]() {
776+
executor->spin();
777+
});
778+
// Add a node to the executor
779+
executor->add_node(this->node);
780+
781+
// Cancel the executor (make sure that it's already spinning first)
782+
while (!executor->is_spinning() && rclcpp::ok()) {
783+
continue;
784+
}
785+
executor->cancel();
786+
787+
// Try to join the thread after cancelling the executor
788+
// This is the "test". We want to make sure that we can still cancel the executor
789+
// regardless of the presence of race conditions
790+
executor_thread.join();
791+
792+
// The test is now completed: we can join the stress threads
793+
should_cancel = true;
794+
for (auto & t : stress_threads) {
795+
t.join();
796+
}
797+
}
798+
732799
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
733800
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
734801
{

0 commit comments

Comments
 (0)