@@ -593,3 +593,106 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) {
593593
594594 rclcpp::shutdown ();
595595}
596+
597+ template <typename T>
598+ class TestIntraprocessExecutors : public ::testing::Test
599+ {
600+ public:
601+ static void SetUpTestCase ()
602+ {
603+ rclcpp::init (0 , nullptr );
604+ }
605+
606+ static void TearDownTestCase ()
607+ {
608+ rclcpp::shutdown ();
609+ }
610+
611+ void SetUp ()
612+ {
613+ const auto test_info = ::testing::UnitTest::GetInstance ()->current_test_info ();
614+ std::stringstream test_name;
615+ test_name << test_info->test_case_name () << " _" << test_info->name ();
616+ node = std::make_shared<rclcpp::Node>(" node" , test_name.str ());
617+
618+ callback_count = 0 ;
619+
620+ const std::string topic_name = std::string (" topic_" ) + test_name.str ();
621+
622+ rclcpp::PublisherOptions po;
623+ po.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
624+ publisher = node->create_publisher <test_msgs::msg::Empty>(topic_name, rclcpp::QoS (1 ), po);
625+
626+ auto callback = [this ](test_msgs::msg::Empty::ConstSharedPtr) {
627+ this ->callback_count .fetch_add (1 );
628+ };
629+
630+ rclcpp::SubscriptionOptions so;
631+ so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
632+ subscription =
633+ node->create_subscription <test_msgs::msg::Empty>(
634+ topic_name, rclcpp::QoS (kNumMessages ), std::move (callback), so);
635+ }
636+
637+ void TearDown ()
638+ {
639+ publisher.reset ();
640+ subscription.reset ();
641+ node.reset ();
642+ }
643+
644+ const size_t kNumMessages = 100 ;
645+
646+ rclcpp::Node::SharedPtr node;
647+ rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher;
648+ rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription;
649+ std::atomic_int callback_count;
650+ };
651+
652+ TYPED_TEST_SUITE (TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames);
653+
654+ TYPED_TEST (TestIntraprocessExecutors, testIntraprocessRetrigger) {
655+ // This tests that executors will continue to service intraprocess subscriptions in the case
656+ // that publishers aren't continuing to publish.
657+ // This was previously broken in that intraprocess guard conditions were only triggered on
658+ // publish and the test was added to prevent future regressions.
659+ const size_t kNumMessages = 100 ;
660+
661+ using ExecutorType = TypeParam;
662+ ExecutorType executor;
663+ executor.add_node (this ->node );
664+
665+ EXPECT_EQ (0 , this ->callback_count .load ());
666+ this ->publisher ->publish (test_msgs::msg::Empty ());
667+
668+ // Wait for up to 5 seconds for the first message to come available.
669+ const std::chrono::milliseconds sleep_per_loop (10 );
670+ int loops = 0 ;
671+ while (1u != this ->callback_count .load () && loops < 500 ) {
672+ rclcpp::sleep_for (sleep_per_loop);
673+ executor.spin_some ();
674+ loops++;
675+ }
676+ EXPECT_EQ (1u , this ->callback_count .load ());
677+
678+ // reset counter
679+ this ->callback_count .store (0 );
680+
681+ for (size_t ii = 0 ; ii < kNumMessages ; ++ii) {
682+ this ->publisher ->publish (test_msgs::msg::Empty ());
683+ }
684+
685+ // Fire a timer every 10ms up to 5 seconds waiting for subscriptions to be read.
686+ loops = 0 ;
687+ auto timer = this ->node ->create_wall_timer (
688+ std::chrono::milliseconds (10 ), [this , &executor, &loops, &kNumMessages ]() {
689+ loops++;
690+ if (kNumMessages == this ->callback_count .load () ||
691+ loops == 500 )
692+ {
693+ executor.cancel ();
694+ }
695+ });
696+ executor.spin ();
697+ EXPECT_EQ (kNumMessages , this ->callback_count .load ());
698+ }
0 commit comments