@@ -422,6 +422,7 @@ class TestWaitable : public rclcpp::Waitable
422422 {
423423 for (size_t i = 0 ; i < wait_set->size_of_guard_conditions ; ++i) {
424424 if (&gc_.get_rcl_guard_condition () == wait_set->guard_conditions [i]) {
425+ is_ready_called_before_take_data = true ;
425426 return true ;
426427 }
427428 }
@@ -431,6 +432,12 @@ class TestWaitable : public rclcpp::Waitable
431432 std::shared_ptr<void >
432433 take_data () override
433434 {
435+ if (!is_ready_called_before_take_data) {
436+ throw std::runtime_error (
437+ " TestWaitable : Internal error, take data was called, but is_ready was not called before" );
438+ }
439+
440+ is_ready_called_before_take_data = false ;
434441 return nullptr ;
435442 }
436443
@@ -474,10 +481,12 @@ class TestWaitable : public rclcpp::Waitable
474481 }
475482
476483private:
484+ bool is_ready_called_before_take_data = false ;
477485 size_t count_ = 0 ;
478486 rclcpp::GuardCondition gc_;
479487};
480488
489+
481490TYPED_TEST (TestExecutors, spinAll)
482491{
483492 using ExecutorType = TypeParam;
@@ -520,6 +529,137 @@ TYPED_TEST(TestExecutors, spinAll)
520529 spinner.join ();
521530}
522531
532+ TEST (TestExecutorsOnlyNode, double_take_data)
533+ {
534+ rclcpp::init (0 , nullptr );
535+
536+ const auto test_info = ::testing::UnitTest::GetInstance ()->current_test_info ();
537+ std::stringstream test_name;
538+ test_name << test_info->test_case_name () << " _" << test_info->name ();
539+ rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(" node" , test_name.str ());
540+
541+ class MyExecutor : public rclcpp ::executors::SingleThreadedExecutor
542+ {
543+ public:
544+ /* *
545+ * This is a copy of Executor::get_next_executable with a callback, to test
546+ * for a special race condition
547+ */
548+ bool get_next_executable_with_callback (
549+ rclcpp::AnyExecutable & any_executable,
550+ std::chrono::nanoseconds timeout,
551+ std::function<void (void )> inbetween)
552+ {
553+ bool success = false ;
554+ // Check to see if there are any subscriptions or timers needing service
555+ // TODO(wjwwood): improve run to run efficiency of this function
556+ success = get_next_ready_executable (any_executable);
557+ // If there are none
558+ if (!success) {
559+
560+ inbetween ();
561+
562+ // Wait for subscriptions or timers to work on
563+ wait_for_work (timeout);
564+ if (!spinning.load ()) {
565+ return false ;
566+ }
567+ // Try again
568+ success = get_next_ready_executable (any_executable);
569+ }
570+ return success;
571+ }
572+
573+ void spin_once_with_callback (
574+ std::chrono::nanoseconds timeout,
575+ std::function<void (void )> inbetween)
576+ {
577+ rclcpp::AnyExecutable any_exec;
578+ if (get_next_executable_with_callback (any_exec, timeout, inbetween)) {
579+ execute_any_executable (any_exec);
580+ }
581+ }
582+
583+ };
584+
585+ MyExecutor executor;
586+
587+ auto callback_group = node->create_callback_group (
588+ rclcpp::CallbackGroupType::MutuallyExclusive,
589+ true );
590+
591+ std::vector<std::shared_ptr<TestWaitable>> waitables;
592+
593+ auto waitable_interfaces = node->get_node_waitables_interface ();
594+
595+ for (int i = 0 ; i < 3 ; i++) {
596+ auto waitable = std::make_shared<TestWaitable>();
597+ waitables.push_back (waitable);
598+ waitable_interfaces->add_waitable (waitable, callback_group);
599+ }
600+ executor.add_node (node);
601+
602+ for (auto & waitable : waitables) {
603+ waitable->trigger ();
604+ }
605+
606+ // a node has some default subscribers, that need to get executed first, therefore the loop
607+ for (int i = 0 ; i < 10 ; i++) {
608+ executor.spin_once (std::chrono::milliseconds (10 ));
609+ if (waitables.front ()->get_count () > 0 ) {
610+ // stop execution, after the first waitable has been executed
611+ break ;
612+ }
613+ }
614+
615+ EXPECT_EQ (waitables.front ()->get_count (), 1 );
616+
617+ // block the callback group, this is something that may happen during multi threaded execution
618+ // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work
619+ callback_group->can_be_taken_from ().exchange (false );
620+
621+ bool no_ready_executable = false ;
622+
623+ // now there should be no ready events now,
624+ executor.spin_once_with_callback (
625+ std::chrono::milliseconds (10 ), [&]() {
626+ no_ready_executable = true ;
627+ });
628+
629+ EXPECT_TRUE (no_ready_executable);
630+
631+ // rearm, so that rmw_wait will push a second entry into the queue
632+ for (auto & waitable : waitables) {
633+ waitable->trigger ();
634+ }
635+
636+ no_ready_executable = false ;
637+
638+ while (!no_ready_executable) {
639+ executor.spin_once_with_callback (
640+ std::chrono::milliseconds (10 ), [&]() {
641+ // unblock the callback group
642+ callback_group->can_be_taken_from ().exchange (true );
643+
644+ no_ready_executable = true ;
645+
646+ });
647+ }
648+ EXPECT_TRUE (no_ready_executable);
649+
650+ // now we process all events from get_next_ready_executable
651+ EXPECT_NO_THROW (
652+ for (int i = 0 ; i < 10 ; i++) {
653+ executor.spin_once (std::chrono::milliseconds (1 ));
654+ }
655+ );
656+
657+ node.reset ();
658+
659+ rclcpp::shutdown ();
660+ }
661+
662+
523663TYPED_TEST (TestExecutorsOnlyNode, missing_event)
524664{
525665 using ExecutorType = TypeParam;
0 commit comments