From eb39a3a190de0e12d7b589cf315017c1f403d798 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Thu, 16 Nov 2023 10:56:45 +0000 Subject: [PATCH 1/7] fix(test_executors): Fix is_ready of TestWaitable Signed-off-by: Janosch Machowinski --- rclcpp/test/rclcpp/executors/test_executors.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 653f06fb9c..78ad00a613 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -392,6 +392,12 @@ class TestWaitable : public rclcpp::Waitable bool is_ready(rcl_wait_set_t * wait_set) override { + for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) { + if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) { + return true; + } + } + return false; (void)wait_set; return true; } From 09cad7224c64de7202d5fc06e475db07c0011937 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Thu, 16 Nov 2023 10:57:49 +0000 Subject: [PATCH 2/7] test: Added test for missed trigger event during execution Signed-off-by: Janosch Machowinski --- .../test/rclcpp/executors/test_executors.cpp | 82 ++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 78ad00a613..8414984ecd 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -41,6 +41,32 @@ using namespace std::chrono_literals; + +template +class TestExecutorsOnlyNode : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + node = std::make_shared("node", test_name.str()); + + } + + void TearDown() + { + node.reset(); + + rclcpp::shutdown(); + } + + rclcpp::Node::SharedPtr node; +}; + template class TestExecutors : public ::testing::Test { @@ -122,6 +148,8 @@ class ExecutorTypeNames // is updated. TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames); +TYPED_TEST_SUITE(TestExecutorsOnlyNode, ExecutorTypes, ExecutorTypeNames); + // StaticSingleThreadedExecutor is not included in these tests for now, due to: // https://github.com/ros2/rclcpp/issues/1219 using StandardExecutors = @@ -398,8 +426,6 @@ class TestWaitable : public rclcpp::Waitable } } return false; - (void)wait_set; - return true; } std::shared_ptr @@ -494,6 +520,58 @@ TYPED_TEST(TestExecutors, spinAll) spinner.join(); } +TYPED_TEST(TestExecutorsOnlyNode, missing_event) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + rclcpp::Node::SharedPtr node(this->node); + auto callback_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + auto waitable_interfaces = node->get_node_waitables_interface(); + auto my_waitable = std::make_shared(); + auto my_waitable2 = std::make_shared(); + waitable_interfaces->add_waitable(my_waitable, callback_group); + waitable_interfaces->add_waitable(my_waitable2, callback_group); + executor.add_node(this->node); + + my_waitable->trigger(); + my_waitable2->trigger(); + + // a node has some default subscribers, that need to get executed first, therefore the loop + for (int i = 0; i < 10; i++) { + executor.spin_once(std::chrono::milliseconds(10)); + if (my_waitable->get_count() > 0) { + // stop execution, after the first waitable has been executed + break; + } + } + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(0u, my_waitable2->get_count()); + + // block the callback group, this is something that may happen during multi threaded execution + // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work + callback_group->can_be_taken_from().exchange(false); + + //now there should be no ready event + executor.spin_once(std::chrono::milliseconds(10)); + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(0u, my_waitable2->get_count()); + + //unblock the callback group + callback_group->can_be_taken_from().exchange(true); + + //now the second waitable should get processed + executor.spin_once(std::chrono::milliseconds(10)); + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(1u, my_waitable2->get_count()); +} + TYPED_TEST(TestExecutors, spinSome) { using ExecutorType = TypeParam; From 58718f1f8181cfb17c4597df6345a4b63ec43975 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 17 Nov 2023 14:31:21 +0000 Subject: [PATCH 3/7] test: Added test for waitable::take_data double call without is_ready Signed-off-by: Janosch Machowinski --- .../test/rclcpp/executors/test_executors.cpp | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 8414984ecd..85586b90c0 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -422,6 +422,7 @@ class TestWaitable : public rclcpp::Waitable { for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) { if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) { + is_ready_called_before_take_data = true; return true; } } @@ -431,6 +432,12 @@ class TestWaitable : public rclcpp::Waitable std::shared_ptr take_data() override { + if (!is_ready_called_before_take_data) { + throw std::runtime_error( + "TestWaitable : Internal error, take data was called, but is_ready was not called before"); + } + + is_ready_called_before_take_data = false; return nullptr; } @@ -474,10 +481,12 @@ class TestWaitable : public rclcpp::Waitable } private: + bool is_ready_called_before_take_data = false; size_t count_ = 0; rclcpp::GuardCondition gc_; }; + TYPED_TEST(TestExecutors, spinAll) { using ExecutorType = TypeParam; @@ -520,6 +529,137 @@ TYPED_TEST(TestExecutors, spinAll) spinner.join(); } +TEST(TestExecutorsOnlyNode, double_take_data) +{ + rclcpp::init(0, nullptr); + + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + rclcpp::Node::SharedPtr node = std::make_shared("node", test_name.str()); + + class MyExecutor : public rclcpp::executors::SingleThreadedExecutor + { +public: + /** + * This is a copy of Executor::get_next_executable with a callback, to test + * for a special race condition + */ + bool get_next_executable_with_callback( + rclcpp::AnyExecutable & any_executable, + std::chrono::nanoseconds timeout, + std::function inbetween) + { + bool success = false; + // Check to see if there are any subscriptions or timers needing service + // TODO(wjwwood): improve run to run efficiency of this function + success = get_next_ready_executable(any_executable); + // If there are none + if (!success) { + + inbetween(); + + // Wait for subscriptions or timers to work on + wait_for_work(timeout); + if (!spinning.load()) { + return false; + } + // Try again + success = get_next_ready_executable(any_executable); + } + return success; + } + + void spin_once_with_callback( + std::chrono::nanoseconds timeout, + std::function inbetween) + { + rclcpp::AnyExecutable any_exec; + if (get_next_executable_with_callback(any_exec, timeout, inbetween)) { + execute_any_executable(any_exec); + } + } + + }; + + MyExecutor executor; + + auto callback_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + std::vector> waitables; + + auto waitable_interfaces = node->get_node_waitables_interface(); + + for (int i = 0; i < 3; i++) { + auto waitable = std::make_shared(); + waitables.push_back(waitable); + waitable_interfaces->add_waitable(waitable, callback_group); + } + executor.add_node(node); + + for (auto & waitable : waitables) { + waitable->trigger(); + } + + // a node has some default subscribers, that need to get executed first, therefore the loop + for (int i = 0; i < 10; i++) { + executor.spin_once(std::chrono::milliseconds(10)); + if (waitables.front()->get_count() > 0) { + // stop execution, after the first waitable has been executed + break; + } + } + + EXPECT_EQ(waitables.front()->get_count(), 1); + + // block the callback group, this is something that may happen during multi threaded execution + // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work + callback_group->can_be_taken_from().exchange(false); + + bool no_ready_executable = false; + + //now there should be no ready events now, + executor.spin_once_with_callback( + std::chrono::milliseconds(10), [&]() { + no_ready_executable = true; + }); + + EXPECT_TRUE(no_ready_executable); + + //rearm, so that rmw_wait will push a second entry into the queue + for (auto & waitable : waitables) { + waitable->trigger(); + } + + no_ready_executable = false; + + while (!no_ready_executable) { + executor.spin_once_with_callback( + std::chrono::milliseconds(10), [&]() { + //unblock the callback group + callback_group->can_be_taken_from().exchange(true); + + no_ready_executable = true; + + }); + } + EXPECT_TRUE(no_ready_executable); + + // now we process all events from get_next_ready_executable + EXPECT_NO_THROW( + for (int i = 0; i < 10; i++) { + executor.spin_once(std::chrono::milliseconds(1)); + } + ); + + node.reset(); + + rclcpp::shutdown(); +} + + TYPED_TEST(TestExecutorsOnlyNode, missing_event) { using ExecutorType = TypeParam; From a4ff2fe15f3e3323e00b7bb984dc878ac7a11c06 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Wed, 29 Nov 2023 21:34:16 -0600 Subject: [PATCH 4/7] simplify the missing_event test to not spin on the node itself Signed-off-by: William Woodall --- .../test/rclcpp/executors/test_executors.cpp | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 85586b90c0..89cba84b90 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -668,26 +668,19 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) rclcpp::Node::SharedPtr node(this->node); auto callback_group = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, - true); + false); auto waitable_interfaces = node->get_node_waitables_interface(); auto my_waitable = std::make_shared(); auto my_waitable2 = std::make_shared(); waitable_interfaces->add_waitable(my_waitable, callback_group); waitable_interfaces->add_waitable(my_waitable2, callback_group); - executor.add_node(this->node); + executor.add_callback_group(callback_group, node->get_node_base_interface()); my_waitable->trigger(); my_waitable2->trigger(); - // a node has some default subscribers, that need to get executed first, therefore the loop - for (int i = 0; i < 10; i++) { - executor.spin_once(std::chrono::milliseconds(10)); - if (my_waitable->get_count() > 0) { - // stop execution, after the first waitable has been executed - break; - } - } + executor.spin_once(std::chrono::milliseconds(10)); EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(0u, my_waitable2->get_count()); @@ -696,16 +689,16 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work callback_group->can_be_taken_from().exchange(false); - //now there should be no ready event + // now there should be no ready event executor.spin_once(std::chrono::milliseconds(10)); EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(0u, my_waitable2->get_count()); - //unblock the callback group + // unblock the callback group callback_group->can_be_taken_from().exchange(true); - //now the second waitable should get processed + // now the second waitable should get processed executor.spin_once(std::chrono::milliseconds(10)); EXPECT_EQ(1u, my_waitable->get_count()); From 8c68c71d647782c105475e55b6fdad5a06eac0bd Mon Sep 17 00:00:00 2001 From: William Woodall Date: Wed, 29 Nov 2023 23:46:49 -0600 Subject: [PATCH 5/7] use futures to make the test rebust to spurious spins Signed-off-by: William Woodall --- .../test/rclcpp/executors/test_executors.cpp | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 89cba84b90..5b3206b131 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -454,6 +454,14 @@ class TestWaitable : public rclcpp::Waitable (void) data; count_++; std::this_thread::sleep_for(3ms); + try { + std::lock_guard lock(execute_promise_mutex_); + execute_promise_.set_value(); + } catch (const std::future_error & future_error) { + if (future_error.code() != std::future_errc::promise_already_satisfied) { + throw; + } + } } void @@ -480,8 +488,18 @@ class TestWaitable : public rclcpp::Waitable return count_; } + std::future + reset_execute_promise_and_get_future() + { + std::lock_guard lock(execute_promise_mutex_); + execute_promise_ = std::promise(); + return execute_promise_.get_future(); + } + private: bool is_ready_called_before_take_data = false; + std::promise execute_promise_; + std::mutex execute_promise_mutex_; size_t count_ = 0; rclcpp::GuardCondition gc_; }; @@ -670,6 +688,7 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) rclcpp::CallbackGroupType::MutuallyExclusive, false); + std::chrono::seconds max_spin_duration(2); auto waitable_interfaces = node->get_node_waitables_interface(); auto my_waitable = std::make_shared(); auto my_waitable2 = std::make_shared(); @@ -680,7 +699,10 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) my_waitable->trigger(); my_waitable2->trigger(); - executor.spin_once(std::chrono::milliseconds(10)); + { + auto my_waitable_execute_future = my_waitable->reset_execute_promise_and_get_future(); + executor.spin_until_future_complete(my_waitable_execute_future, max_spin_duration); + } EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(0u, my_waitable2->get_count()); @@ -690,7 +712,13 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) callback_group->can_be_taken_from().exchange(false); // now there should be no ready event - executor.spin_once(std::chrono::milliseconds(10)); + { + auto my_waitable2_execute_future = my_waitable2->reset_execute_promise_and_get_future(); + auto future_code = executor.spin_until_future_complete( + my_waitable2_execute_future, + std::chrono::milliseconds(100)); // expected to timeout + EXPECT_EQ(future_code, rclcpp::FutureReturnCode::TIMEOUT); + } EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(0u, my_waitable2->get_count()); @@ -699,7 +727,10 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) callback_group->can_be_taken_from().exchange(true); // now the second waitable should get processed - executor.spin_once(std::chrono::milliseconds(10)); + { + auto my_waitable2_execute_future = my_waitable2->reset_execute_promise_and_get_future(); + executor.spin_until_future_complete(my_waitable2_execute_future, max_spin_duration); + } EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(1u, my_waitable2->get_count()); From b5eb517a970ce5f6ec1090eb8f6920598adfe8f2 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 24 Jan 2024 13:36:41 +0000 Subject: [PATCH 6/7] Made double_take_data test work without intrusive behavior Signed-off-by: Janosch Machowinski --- .../test/rclcpp/executors/test_executors.cpp | 287 ++++++++++++------ 1 file changed, 201 insertions(+), 86 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 5b3206b131..f607414789 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -410,19 +410,36 @@ class TestWaitable : public rclcpp::Waitable add_to_wait_set(rcl_wait_set_t * wait_set) override { rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); + if (retrigger_guard_condition && num_unprocessed_triggers > 0) { + gc_.trigger(); + } } void trigger() { + num_unprocessed_triggers++; gc_.trigger(); } + void trigger_and_hold_execute() + { + hold_execute = true; + + trigger(); + } + + void release_execute() + { + hold_execute = false; + cv.notify_one(); + } + bool is_ready(rcl_wait_set_t * wait_set) override { + is_ready_called_before_take_data = true; for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) { if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) { - is_ready_called_before_take_data = true; return true; } } @@ -438,6 +455,9 @@ class TestWaitable : public rclcpp::Waitable } is_ready_called_before_take_data = false; + + num_unprocessed_triggers--; + return nullptr; } @@ -445,7 +465,7 @@ class TestWaitable : public rclcpp::Waitable take_data_by_entity_id(size_t id) override { (void) id; - return nullptr; + return take_data(); } void @@ -462,6 +482,12 @@ class TestWaitable : public rclcpp::Waitable throw; } } + + if(hold_execute) + { + std::unique_lock lk(cv_m); + cv.wait(lk); + } } void @@ -496,11 +522,22 @@ class TestWaitable : public rclcpp::Waitable return execute_promise_.get_future(); } + void enable_retriggering(bool enabled) + { + retrigger_guard_condition = enabled; + } + private: bool is_ready_called_before_take_data = false; + bool retrigger_guard_condition = true; std::promise execute_promise_; std::mutex execute_promise_mutex_; + std::atomic num_unprocessed_triggers = 0; + std::atomic hold_execute = false; size_t count_ = 0; + std::condition_variable cv; + std::mutex cv_m; + rclcpp::GuardCondition gc_; }; @@ -547,136 +584,209 @@ TYPED_TEST(TestExecutors, spinAll) spinner.join(); } -TEST(TestExecutorsOnlyNode, double_take_data) +TEST(TestExecutors, double_take_data) { rclcpp::init(0, nullptr); + rclcpp::executors::MultiThreadedExecutor executor; + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); std::stringstream test_name; test_name << test_info->test_case_name() << "_" << test_info->name(); rclcpp::Node::SharedPtr node = std::make_shared("node", test_name.str()); - class MyExecutor : public rclcpp::executors::SingleThreadedExecutor - { -public: - /** - * This is a copy of Executor::get_next_executable with a callback, to test - * for a special race condition - */ - bool get_next_executable_with_callback( - rclcpp::AnyExecutable & any_executable, - std::chrono::nanoseconds timeout, - std::function inbetween) - { - bool success = false; - // Check to see if there are any subscriptions or timers needing service - // TODO(wjwwood): improve run to run efficiency of this function - success = get_next_ready_executable(any_executable); - // If there are none - if (!success) { - - inbetween(); - - // Wait for subscriptions or timers to work on - wait_for_work(timeout); - if (!spinning.load()) { - return false; - } - // Try again - success = get_next_ready_executable(any_executable); - } - return success; - } + auto waitable_interfaces = node->get_node_waitables_interface(); - void spin_once_with_callback( - std::chrono::nanoseconds timeout, - std::function inbetween) - { - rclcpp::AnyExecutable any_exec; - if (get_next_executable_with_callback(any_exec, timeout, inbetween)) { - execute_any_executable(any_exec); - } - } + auto first_cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); - }; + auto third_cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); - MyExecutor executor; + // these waitable have one job, to make the MemoryStrategy::collect_enties method take + // a long time, in order to force our race condition + std::vector> stuffing_waitables; + std::vector> stuffing_cbgs; + for (size_t i = 0; i < 50; i++) { + auto cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + stuffing_cbgs.push_back(cbg); + + for (int j = 0; j < 200; j++) { + auto waitable = std::make_shared(); + stuffing_waitables.push_back(waitable); + waitable_interfaces->add_waitable(waitable, cbg); + } + } + + // this is the callback group were wo introduce the double take auto callback_group = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, true); + std::vector> waitables; - auto waitable_interfaces = node->get_node_waitables_interface(); + auto w3 = std::make_shared(); + waitable_interfaces->add_waitable(w3, third_cbg); + + // First group of waitables, that gets processed. + // We use the shared count of these waitables and the callback group, + // to estimate when MemoryStrategy::collect_enties is called in the spinner thread + std::vector> first_waitables; + auto non_triggered_in_first_cbg = std::make_shared(); + waitable_interfaces->add_waitable(non_triggered_in_first_cbg, first_cbg); + + auto non_triggered_in_first_cbg2 = std::make_shared(); + waitable_interfaces->add_waitable(non_triggered_in_first_cbg2, first_cbg); + - for (int i = 0; i < 3; i++) { + auto cbg_start = std::make_shared(); + waitable_interfaces->add_waitable(cbg_start, callback_group); + + // These waitables will get triggered while cbg_start is beeing executed + for (int i = 0; i < 20; i++) { auto waitable = std::make_shared(); waitables.push_back(waitable); waitable_interfaces->add_waitable(waitable, callback_group); } + + // used to detect if all triggers were processed + auto cbg_end = std::make_shared(); + waitable_interfaces->add_waitable(cbg_end, callback_group); + executor.add_node(node); - for (auto & waitable : waitables) { - waitable->trigger(); + // ref count if not reference by the internals of the executor + auto min_ref_cnt = non_triggered_in_first_cbg.use_count(); + auto cbg_min_ref_cnt = first_cbg.use_count(); + + for (auto & w : waitables) { + EXPECT_EQ(w->get_count(), 0); } - // a node has some default subscribers, that need to get executed first, therefore the loop - for (int i = 0; i < 10; i++) { - executor.spin_once(std::chrono::milliseconds(10)); - if (waitables.front()->get_count() > 0) { - // stop execution, after the first waitable has been executed - break; + std::atomic_bool exception = false; + + std::thread t([&executor, &exception]() { + try { + executor.spin(); + } catch (const std::exception & e) { + exception = true; + } catch (...) { + exception = true; + } + }); + + size_t start_count = cbg_start->get_count(); + cbg_start->trigger_and_hold_execute(); + + //wait until the first waitable is executed and blocks the callback_group + while (cbg_start->get_count() == start_count) { + std::this_thread::sleep_for(1ms); + } + + for (auto & w : waitables) { + w->trigger(); + } + + // trigger w3 to make sure, the MemoryStrategy clears its internal list of ready entities + { + auto cnt = w3->get_count(); + w3->trigger(); + while (w3->get_count() == cnt) { + std::this_thread::sleep_for(1ms); } } - EXPECT_EQ(waitables.front()->get_count(), 1); + // observe the use counts of non_triggered_in_first_cbg, non_triggered_in_first_cbg2 and first_cbg + // in order to fugure out if MemoryStrategy::collect_enties is beeing calles + while (true) { + w3->trigger(); + bool restart = false; - // block the callback group, this is something that may happen during multi threaded execution - // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work - callback_group->can_be_taken_from().exchange(false); + // There should be no reference to our waitiable + while (min_ref_cnt != non_triggered_in_first_cbg.use_count() || + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + } - bool no_ready_executable = false; + // wait for the callback group to be taken + while (true) { + // node and callback group ptrs are referenced + if (cbg_min_ref_cnt != first_cbg.use_count()) { + break; + } - //now there should be no ready events now, - executor.spin_once_with_callback( - std::chrono::milliseconds(10), [&]() { - no_ready_executable = true; - }); + // is we got more references to the waitable, while the group pointer is not referenced, this is the wrong spot + if (min_ref_cnt != non_triggered_in_first_cbg.use_count() || + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + restart = true; + break; + } + } - EXPECT_TRUE(no_ready_executable); + if (restart) { + continue; + } - //rearm, so that rmw_wait will push a second entry into the queue - for (auto & waitable : waitables) { - waitable->trigger(); - } + // callback group pointer is referenced + while (true) { + // trigger criteria, both pointers were collected + if (min_ref_cnt != non_triggered_in_first_cbg.use_count() && + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + break; + } - no_ready_executable = false; + // invalid, second pointer is referenced, but not first one + if (min_ref_cnt == non_triggered_in_first_cbg.use_count() && + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + restart = true; + break; + } - while (!no_ready_executable) { - executor.spin_once_with_callback( - std::chrono::milliseconds(10), [&]() { - //unblock the callback group - callback_group->can_be_taken_from().exchange(true); + // group or node pointer was released, while waitable poitner were not taken. + if (cbg_min_ref_cnt == first_cbg.use_count()) { + restart = true; + break; + } - no_ready_executable = true; + } + if (restart) { + continue; + } - }); + break; } - EXPECT_TRUE(no_ready_executable); - // now we process all events from get_next_ready_executable - EXPECT_NO_THROW( - for (int i = 0; i < 10; i++) { - executor.spin_once(std::chrono::milliseconds(1)); + //we unblock the callback_group now, this should force the race condition + cbg_start->release_execute(); + + std::this_thread::yield(); + std::this_thread::sleep_for(10ms); + + size_t end_count = cbg_end->get_count(); + cbg_end->trigger(); + + // wait for all triggers to be executed, or for an exception to occur + while (end_count == cbg_end->get_count() && !exception) { + std::this_thread::sleep_for(10ms); } - ); - node.reset(); + EXPECT_FALSE(exception); + node.reset(); rclcpp::shutdown(); -} + t.join(); +} TYPED_TEST(TestExecutorsOnlyNode, missing_event) { @@ -732,6 +842,11 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) executor.spin_until_future_complete(my_waitable2_execute_future, max_spin_duration); } + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(1u, my_waitable2->get_count()); + //now the second waitable should get processed + executor.spin_once(std::chrono::milliseconds(10)); + EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(1u, my_waitable2->get_count()); } From 3ceaa9b92fa1fdb92e3bb11cc23b89b376e8e33f Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 24 Jan 2024 14:53:07 +0000 Subject: [PATCH 7/7] fix: catch exceptions from failed tests Signed-off-by: Janosch Machowinski --- .../test/rclcpp/executors/test_executors.cpp | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index f607414789..591d4a5633 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -483,8 +483,7 @@ class TestWaitable : public rclcpp::Waitable } } - if(hold_execute) - { + if (hold_execute) { std::unique_lock lk(cv_m); cv.wait(lk); } @@ -554,9 +553,16 @@ TYPED_TEST(TestExecutors, spinAll) // Long timeout, but should not block test if spin_all works as expected as we cancel the // executor. bool spin_exited = false; - std::thread spinner([&spin_exited, &executor, this]() { - executor.spin_all(1s); - executor.remove_node(this->node, true); + std::atomic_bool exception = false; + + std::thread spinner([&spin_exited, &executor, &exception, this]() { + try { + executor.spin_all(1s); + executor.remove_node(this->node, true); + } catch (...) { + exception = true; + } + spin_exited = true; }); @@ -581,6 +587,8 @@ TYPED_TEST(TestExecutors, spinAll) EXPECT_LT(1u, my_waitable->get_count()); waitable_interfaces->remove_waitable(my_waitable, nullptr); ASSERT_TRUE(spin_exited); + + EXPECT_FALSE(exception); spinner.join(); } @@ -863,9 +871,16 @@ TYPED_TEST(TestExecutors, spinSome) // Long timeout, doesn't block test from finishing because spin_some should exit after the // first one completes. bool spin_exited = false; - std::thread spinner([&spin_exited, &executor, this]() { - executor.spin_some(1s); - executor.remove_node(this->node, true); + + std::atomic_bool exception = false; + + std::thread spinner([&spin_exited, &executor, &exception, this]() { + try { + executor.spin_some(1s); + executor.remove_node(this->node, true); + } catch (...) { + exception = true; + } spin_exited = true; }); @@ -886,6 +901,7 @@ TYPED_TEST(TestExecutors, spinSome) EXPECT_LE(1u, my_waitable->get_count()); waitable_interfaces->remove_waitable(my_waitable, nullptr); EXPECT_TRUE(spin_exited); + EXPECT_FALSE(exception); // Cancel if it hasn't exited already. executor.cancel();