diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 653f06fb9c..591d4a5633 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -41,6 +41,32 @@ using namespace std::chrono_literals; + +template +class TestExecutorsOnlyNode : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + node = std::make_shared("node", test_name.str()); + + } + + void TearDown() + { + node.reset(); + + rclcpp::shutdown(); + } + + rclcpp::Node::SharedPtr node; +}; + template class TestExecutors : public ::testing::Test { @@ -122,6 +148,8 @@ class ExecutorTypeNames // is updated. TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames); +TYPED_TEST_SUITE(TestExecutorsOnlyNode, ExecutorTypes, ExecutorTypeNames); + // StaticSingleThreadedExecutor is not included in these tests for now, due to: // https://github.com/ros2/rclcpp/issues/1219 using StandardExecutors = @@ -382,23 +410,54 @@ class TestWaitable : public rclcpp::Waitable add_to_wait_set(rcl_wait_set_t * wait_set) override { rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); + if (retrigger_guard_condition && num_unprocessed_triggers > 0) { + gc_.trigger(); + } } void trigger() { + num_unprocessed_triggers++; gc_.trigger(); } + void trigger_and_hold_execute() + { + hold_execute = true; + + trigger(); + } + + void release_execute() + { + hold_execute = false; + cv.notify_one(); + } + bool is_ready(rcl_wait_set_t * wait_set) override { - (void)wait_set; - return true; + is_ready_called_before_take_data = true; + for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) { + if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) { + return true; + } + } + return false; } std::shared_ptr take_data() override { + if (!is_ready_called_before_take_data) { + throw std::runtime_error( + "TestWaitable : Internal error, take data was called, but is_ready was not called before"); + } + + is_ready_called_before_take_data = false; + + num_unprocessed_triggers--; + return nullptr; } @@ -406,7 +465,7 @@ class TestWaitable : public rclcpp::Waitable take_data_by_entity_id(size_t id) override { (void) id; - return nullptr; + return take_data(); } void @@ -415,6 +474,19 @@ class TestWaitable : public rclcpp::Waitable (void) data; count_++; std::this_thread::sleep_for(3ms); + try { + std::lock_guard lock(execute_promise_mutex_); + execute_promise_.set_value(); + } catch (const std::future_error & future_error) { + if (future_error.code() != std::future_errc::promise_already_satisfied) { + throw; + } + } + + if (hold_execute) { + std::unique_lock lk(cv_m); + cv.wait(lk); + } } void @@ -441,11 +513,34 @@ class TestWaitable : public rclcpp::Waitable return count_; } + std::future + reset_execute_promise_and_get_future() + { + std::lock_guard lock(execute_promise_mutex_); + execute_promise_ = std::promise(); + return execute_promise_.get_future(); + } + + void enable_retriggering(bool enabled) + { + retrigger_guard_condition = enabled; + } + private: + bool is_ready_called_before_take_data = false; + bool retrigger_guard_condition = true; + std::promise execute_promise_; + std::mutex execute_promise_mutex_; + std::atomic num_unprocessed_triggers = 0; + std::atomic hold_execute = false; size_t count_ = 0; + std::condition_variable cv; + std::mutex cv_m; + rclcpp::GuardCondition gc_; }; + TYPED_TEST(TestExecutors, spinAll) { using ExecutorType = TypeParam; @@ -458,9 +553,16 @@ TYPED_TEST(TestExecutors, spinAll) // Long timeout, but should not block test if spin_all works as expected as we cancel the // executor. bool spin_exited = false; - std::thread spinner([&spin_exited, &executor, this]() { - executor.spin_all(1s); - executor.remove_node(this->node, true); + std::atomic_bool exception = false; + + std::thread spinner([&spin_exited, &executor, &exception, this]() { + try { + executor.spin_all(1s); + executor.remove_node(this->node, true); + } catch (...) { + exception = true; + } + spin_exited = true; }); @@ -485,9 +587,278 @@ TYPED_TEST(TestExecutors, spinAll) EXPECT_LT(1u, my_waitable->get_count()); waitable_interfaces->remove_waitable(my_waitable, nullptr); ASSERT_TRUE(spin_exited); + + EXPECT_FALSE(exception); spinner.join(); } +TEST(TestExecutors, double_take_data) +{ + rclcpp::init(0, nullptr); + + rclcpp::executors::MultiThreadedExecutor executor; + + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + rclcpp::Node::SharedPtr node = std::make_shared("node", test_name.str()); + + auto waitable_interfaces = node->get_node_waitables_interface(); + + auto first_cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + auto third_cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + // these waitable have one job, to make the MemoryStrategy::collect_enties method take + // a long time, in order to force our race condition + std::vector> stuffing_waitables; + std::vector> stuffing_cbgs; + + for (size_t i = 0; i < 50; i++) { + auto cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + stuffing_cbgs.push_back(cbg); + + for (int j = 0; j < 200; j++) { + auto waitable = std::make_shared(); + stuffing_waitables.push_back(waitable); + waitable_interfaces->add_waitable(waitable, cbg); + } + } + + // this is the callback group were wo introduce the double take + auto callback_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + + std::vector> waitables; + + auto w3 = std::make_shared(); + waitable_interfaces->add_waitable(w3, third_cbg); + + // First group of waitables, that gets processed. + // We use the shared count of these waitables and the callback group, + // to estimate when MemoryStrategy::collect_enties is called in the spinner thread + std::vector> first_waitables; + auto non_triggered_in_first_cbg = std::make_shared(); + waitable_interfaces->add_waitable(non_triggered_in_first_cbg, first_cbg); + + auto non_triggered_in_first_cbg2 = std::make_shared(); + waitable_interfaces->add_waitable(non_triggered_in_first_cbg2, first_cbg); + + + auto cbg_start = std::make_shared(); + waitable_interfaces->add_waitable(cbg_start, callback_group); + + // These waitables will get triggered while cbg_start is beeing executed + for (int i = 0; i < 20; i++) { + auto waitable = std::make_shared(); + waitables.push_back(waitable); + waitable_interfaces->add_waitable(waitable, callback_group); + } + + // used to detect if all triggers were processed + auto cbg_end = std::make_shared(); + waitable_interfaces->add_waitable(cbg_end, callback_group); + + executor.add_node(node); + + // ref count if not reference by the internals of the executor + auto min_ref_cnt = non_triggered_in_first_cbg.use_count(); + auto cbg_min_ref_cnt = first_cbg.use_count(); + + for (auto & w : waitables) { + EXPECT_EQ(w->get_count(), 0); + } + + std::atomic_bool exception = false; + + std::thread t([&executor, &exception]() { + try { + executor.spin(); + } catch (const std::exception & e) { + exception = true; + } catch (...) { + exception = true; + } + }); + + size_t start_count = cbg_start->get_count(); + cbg_start->trigger_and_hold_execute(); + + //wait until the first waitable is executed and blocks the callback_group + while (cbg_start->get_count() == start_count) { + std::this_thread::sleep_for(1ms); + } + + for (auto & w : waitables) { + w->trigger(); + } + + // trigger w3 to make sure, the MemoryStrategy clears its internal list of ready entities + { + auto cnt = w3->get_count(); + w3->trigger(); + while (w3->get_count() == cnt) { + std::this_thread::sleep_for(1ms); + } + } + + // observe the use counts of non_triggered_in_first_cbg, non_triggered_in_first_cbg2 and first_cbg + // in order to fugure out if MemoryStrategy::collect_enties is beeing calles + while (true) { + w3->trigger(); + bool restart = false; + + // There should be no reference to our waitiable + while (min_ref_cnt != non_triggered_in_first_cbg.use_count() || + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + } + + // wait for the callback group to be taken + while (true) { + // node and callback group ptrs are referenced + if (cbg_min_ref_cnt != first_cbg.use_count()) { + break; + } + + // is we got more references to the waitable, while the group pointer is not referenced, this is the wrong spot + if (min_ref_cnt != non_triggered_in_first_cbg.use_count() || + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + restart = true; + break; + } + } + + if (restart) { + continue; + } + + // callback group pointer is referenced + while (true) { + // trigger criteria, both pointers were collected + if (min_ref_cnt != non_triggered_in_first_cbg.use_count() && + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + break; + } + + // invalid, second pointer is referenced, but not first one + if (min_ref_cnt == non_triggered_in_first_cbg.use_count() && + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + restart = true; + break; + } + + // group or node pointer was released, while waitable poitner were not taken. + if (cbg_min_ref_cnt == first_cbg.use_count()) { + restart = true; + break; + } + + } + if (restart) { + continue; + } + + break; + } + + //we unblock the callback_group now, this should force the race condition + cbg_start->release_execute(); + + std::this_thread::yield(); + std::this_thread::sleep_for(10ms); + + size_t end_count = cbg_end->get_count(); + cbg_end->trigger(); + + // wait for all triggers to be executed, or for an exception to occur + while (end_count == cbg_end->get_count() && !exception) { + std::this_thread::sleep_for(10ms); + } + + EXPECT_FALSE(exception); + + node.reset(); + rclcpp::shutdown(); + + t.join(); +} + +TYPED_TEST(TestExecutorsOnlyNode, missing_event) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + rclcpp::Node::SharedPtr node(this->node); + auto callback_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + false); + + std::chrono::seconds max_spin_duration(2); + auto waitable_interfaces = node->get_node_waitables_interface(); + auto my_waitable = std::make_shared(); + auto my_waitable2 = std::make_shared(); + waitable_interfaces->add_waitable(my_waitable, callback_group); + waitable_interfaces->add_waitable(my_waitable2, callback_group); + executor.add_callback_group(callback_group, node->get_node_base_interface()); + + my_waitable->trigger(); + my_waitable2->trigger(); + + { + auto my_waitable_execute_future = my_waitable->reset_execute_promise_and_get_future(); + executor.spin_until_future_complete(my_waitable_execute_future, max_spin_duration); + } + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(0u, my_waitable2->get_count()); + + // block the callback group, this is something that may happen during multi threaded execution + // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work + callback_group->can_be_taken_from().exchange(false); + + // now there should be no ready event + { + auto my_waitable2_execute_future = my_waitable2->reset_execute_promise_and_get_future(); + auto future_code = executor.spin_until_future_complete( + my_waitable2_execute_future, + std::chrono::milliseconds(100)); // expected to timeout + EXPECT_EQ(future_code, rclcpp::FutureReturnCode::TIMEOUT); + } + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(0u, my_waitable2->get_count()); + + // unblock the callback group + callback_group->can_be_taken_from().exchange(true); + + // now the second waitable should get processed + { + auto my_waitable2_execute_future = my_waitable2->reset_execute_promise_and_get_future(); + executor.spin_until_future_complete(my_waitable2_execute_future, max_spin_duration); + } + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(1u, my_waitable2->get_count()); + //now the second waitable should get processed + executor.spin_once(std::chrono::milliseconds(10)); + + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(1u, my_waitable2->get_count()); +} + TYPED_TEST(TestExecutors, spinSome) { using ExecutorType = TypeParam; @@ -500,9 +871,16 @@ TYPED_TEST(TestExecutors, spinSome) // Long timeout, doesn't block test from finishing because spin_some should exit after the // first one completes. bool spin_exited = false; - std::thread spinner([&spin_exited, &executor, this]() { - executor.spin_some(1s); - executor.remove_node(this->node, true); + + std::atomic_bool exception = false; + + std::thread spinner([&spin_exited, &executor, &exception, this]() { + try { + executor.spin_some(1s); + executor.remove_node(this->node, true); + } catch (...) { + exception = true; + } spin_exited = true; }); @@ -523,6 +901,7 @@ TYPED_TEST(TestExecutors, spinSome) EXPECT_LE(1u, my_waitable->get_count()); waitable_interfaces->remove_waitable(my_waitable, nullptr); EXPECT_TRUE(spin_exited); + EXPECT_FALSE(exception); // Cancel if it hasn't exited already. executor.cancel();