Skip to content

Commit 7e90ae1

Browse files
mergify[bot]cagueroahcorde
authored
Use GID filtering to prevent loops (backport #834) (#844)
Signed-off-by: Alejandro Hernandez Cordero <ahcorde@gmail.com> Co-authored-by: Carlos Agüero <caguero@openrobotics.org> Co-authored-by: Alejandro Hernandez Cordero <ahcorde@gmail.com>
1 parent cbb69ea commit 7e90ae1

File tree

3 files changed

+215
-7
lines changed

3 files changed

+215
-7
lines changed

ros_gz_bridge/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,15 @@ if(BUILD_TESTING)
346346
ament_add_gtest(${PROJECT_NAME}_bridge_config test/bridge_config.cpp
347347
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR})
348348
target_include_directories(${PROJECT_NAME}_bridge_config PRIVATE src)
349+
349350
ament_target_dependencies(${PROJECT_NAME}_bridge_config rclcpp)
350351
target_link_libraries(${PROJECT_NAME}_bridge_config ${bridge_lib})
351352

353+
ament_add_gtest(${PROJECT_NAME}_gid_filtering test/test_gid_filtering.cpp)
354+
target_link_libraries(${PROJECT_NAME}_gid_filtering
355+
rclcpp::rclcpp
356+
${std_msgs_TARGETS}
357+
)
352358
endif()
353359

354360
# Export old-style CMake variables

ros_gz_bridge/src/factory.hpp

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
#ifndef FACTORY_HPP_
1616
#define FACTORY_HPP_
1717

18+
#include <array>
1819
#include <chrono>
20+
#include <cstring>
1921
#include <functional>
2022
#include <memory>
2123
#include <string>
2224
#include <type_traits>
25+
#include <vector>
2326

2427
#include <gz/transport/Node.hh>
2528

@@ -93,14 +96,38 @@ class Factory : public FactoryInterface
9396
size_t queue_size,
9497
gz::transport::Node::Publisher & gz_pub)
9598
{
96-
std::function<void(std::shared_ptr<const ROS_T>)> fn = std::bind(
97-
&Factory<ROS_T, GZ_T>::ros_callback,
98-
std::placeholders::_1, gz_pub,
99-
ros_type_name_, gz_type_name_,
100-
ros_node);
99+
// Was this published by one of my own publishers? We compare
100+
// the sender's GID against the GIDs we collected from the bridge node's
101+
// publishers at subscription creation time. If it matches, we drop the
102+
// message to prevent a loop.
103+
auto self_pub_gids =
104+
std::make_shared<std::vector<std::array<uint8_t, RMW_GID_STORAGE_SIZE>>>();
105+
for (const auto & info : ros_node->get_publishers_info_by_topic(topic_name)) {
106+
if (info.node_name() == ros_node->get_name() &&
107+
info.node_namespace() == ros_node->get_namespace())
108+
{
109+
self_pub_gids->push_back(info.endpoint_gid());
110+
}
111+
}
112+
113+
auto ros_type = ros_type_name_;
114+
auto gz_type = gz_type_name_;
115+
std::function<void(std::shared_ptr<const ROS_T>, const rclcpp::MessageInfo &)> fn =
116+
[self_pub_gids, gz_pub, ros_type, gz_type, ros_node](
117+
std::shared_ptr<const ROS_T> ros_msg,
118+
const rclcpp::MessageInfo & msg_info) mutable
119+
{
120+
// Skip messages published by this bridge node to prevent loops.
121+
const auto & sender_gid = msg_info.get_rmw_message_info().publisher_gid;
122+
for (const auto & gid : *self_pub_gids) {
123+
if (std::memcmp(sender_gid.data, gid.data(), RMW_GID_STORAGE_SIZE) == 0) {
124+
return;
125+
}
126+
}
127+
ros_callback(ros_msg, gz_pub, ros_type, gz_type, ros_node);
128+
};
129+
101130
auto options = rclcpp::SubscriptionOptions();
102-
// Ignore messages that are published from this bridge.
103-
options.ignore_local_publications = true;
104131
// Allow QoS overriding
105132
options.qos_overriding_options =
106133
rclcpp::QosOverridingOptions::with_default_policies();
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Copyright 2026 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <gtest/gtest.h>
16+
17+
#include <array>
18+
#include <atomic>
19+
#include <chrono>
20+
#include <cstring>
21+
#include <memory>
22+
#include <string>
23+
#include <thread>
24+
#include <vector>
25+
26+
#include <rclcpp/rclcpp.hpp>
27+
#include <std_msgs/msg/string.hpp>
28+
29+
using namespace std::chrono_literals;
30+
31+
class GidFilteringTest : public ::testing::Test
32+
{
33+
protected:
34+
void SetUp() override
35+
{
36+
rclcpp::init(0, nullptr);
37+
}
38+
39+
void TearDown() override
40+
{
41+
rclcpp::shutdown();
42+
}
43+
44+
/// Collect publisher GIDs for a given topic that belong to the specified node.
45+
/// This replicates the logic in factory.hpp create_ros_subscriber().
46+
static std::vector<std::array<uint8_t, RMW_GID_STORAGE_SIZE>>
47+
collect_self_publisher_gids(
48+
rclcpp::Node::SharedPtr node,
49+
const std::string & topic)
50+
{
51+
std::vector<std::array<uint8_t, RMW_GID_STORAGE_SIZE>> gids;
52+
for (const auto & info : node->get_publishers_info_by_topic(topic)) {
53+
if (info.node_name() == node->get_name() &&
54+
info.node_namespace() == node->get_namespace())
55+
{
56+
gids.push_back(info.endpoint_gid());
57+
}
58+
}
59+
return gids;
60+
}
61+
62+
/// Check if a sender GID matches any in a list of GIDs.
63+
/// This replicates the filtering logic in factory.hpp callback.
64+
static bool is_from_self(
65+
const rmw_gid_t & sender_gid,
66+
const std::vector<std::array<uint8_t, RMW_GID_STORAGE_SIZE>> & self_gids)
67+
{
68+
for (const auto & gid : self_gids) {
69+
if (std::memcmp(sender_gid.data, gid.data(), RMW_GID_STORAGE_SIZE) == 0) {
70+
return true;
71+
}
72+
}
73+
return false;
74+
}
75+
};
76+
77+
// Verify that GID collection finds publishers from the same node.
78+
TEST_F(GidFilteringTest, CollectsOwnPublisherGids)
79+
{
80+
auto node = std::make_shared<rclcpp::Node>("bridge_node");
81+
const std::string topic = "/test_gid_collect";
82+
83+
auto pub = node->create_publisher<std_msgs::msg::String>(topic, 10);
84+
85+
// Allow DDS discovery within the same process.
86+
std::this_thread::sleep_for(100ms);
87+
88+
auto gids = collect_self_publisher_gids(node, topic);
89+
EXPECT_EQ(1u, gids.size());
90+
}
91+
92+
// Verify that GID collection does not include publishers from other nodes,
93+
// even when they publish on the same topic in the same process.
94+
TEST_F(GidFilteringTest, DoesNotCollectExternalPublisherGids)
95+
{
96+
auto bridge_node = std::make_shared<rclcpp::Node>("bridge_node");
97+
auto external_node = std::make_shared<rclcpp::Node>("external_node");
98+
const std::string topic = "/test_gid_external";
99+
100+
auto bridge_pub = bridge_node->create_publisher<std_msgs::msg::String>(topic, 10);
101+
auto external_pub = external_node->create_publisher<std_msgs::msg::String>(topic, 10);
102+
(void)external_pub;
103+
104+
std::this_thread::sleep_for(100ms);
105+
106+
// Only bridge_node's publisher should be collected.
107+
auto self_gids = collect_self_publisher_gids(bridge_node, topic);
108+
EXPECT_EQ(1u, self_gids.size());
109+
110+
// But the topic has 2 publishers total.
111+
auto all_pubs = bridge_node->get_publishers_info_by_topic(topic);
112+
EXPECT_EQ(2u, all_pubs.size());
113+
}
114+
115+
// A subscriber using GID-based filtering should accept messages from an
116+
// external node while rejecting messages published by its own node.
117+
TEST_F(GidFilteringTest, FiltersOwnMessagesAllowsExternal)
118+
{
119+
auto bridge_node = std::make_shared<rclcpp::Node>("bridge_node");
120+
auto external_node = std::make_shared<rclcpp::Node>("external_node");
121+
const std::string topic = "/test_gid_filter";
122+
123+
// Simulate the bridge's own ROS publisher (created by GZ-to-ROS handle).
124+
auto self_pub = bridge_node->create_publisher<std_msgs::msg::String>(topic, 10);
125+
126+
// Simulate a composed node's publisher.
127+
auto ext_pub = external_node->create_publisher<std_msgs::msg::String>(topic, 10);
128+
129+
std::this_thread::sleep_for(100ms);
130+
131+
// Collect self GIDs.
132+
auto self_gids = std::make_shared<
133+
std::vector<std::array<uint8_t, RMW_GID_STORAGE_SIZE>>>(
134+
collect_self_publisher_gids(bridge_node, topic));
135+
ASSERT_EQ(1u, self_gids->size());
136+
137+
// Create subscriber with GID-based filtering.
138+
std::atomic<int> external_received{0};
139+
std::atomic<int> self_filtered{0};
140+
141+
auto sub = bridge_node->create_subscription<std_msgs::msg::String>(
142+
topic, 10,
143+
[self_gids, &external_received, &self_filtered](
144+
const std_msgs::msg::String & /*msg*/,
145+
const rclcpp::MessageInfo & msg_info)
146+
{
147+
const auto & sender_gid = msg_info.get_rmw_message_info().publisher_gid;
148+
if (is_from_self(sender_gid, *self_gids)) {
149+
self_filtered++;
150+
return;
151+
}
152+
external_received++;
153+
});
154+
155+
rclcpp::executors::SingleThreadedExecutor executor;
156+
executor.add_node(bridge_node);
157+
executor.add_node(external_node);
158+
159+
std_msgs::msg::String msg;
160+
161+
// Publish from both sources and spin to process callbacks.
162+
for (int i = 0; i < 20 && external_received == 0; ++i) {
163+
msg.data = "from_bridge";
164+
self_pub->publish(msg);
165+
msg.data = "from_external";
166+
ext_pub->publish(msg);
167+
std::this_thread::sleep_for(50ms);
168+
executor.spin_some();
169+
}
170+
171+
EXPECT_GT(external_received.load(), 0)
172+
<< "Messages from an external composed node must be received";
173+
EXPECT_GT(self_filtered.load(), 0)
174+
<< "Messages from the bridge's own publisher must be caught by the GID filter";
175+
}

0 commit comments

Comments
 (0)