diff --git a/rclcpp/include/rclcpp/parameter_event_handler.hpp b/rclcpp/include/rclcpp/parameter_event_handler.hpp index 589fb1144c..3be4b9268d 100644 --- a/rclcpp/include/rclcpp/parameter_event_handler.hpp +++ b/rclcpp/include/rclcpp/parameter_event_handler.hpp @@ -251,6 +251,30 @@ class ParameterEventHandler ParameterCallbackType callback, const std::string & node_name = ""); + /// Configure which node parameter events will be received. + /** + * This function depends on middleware support for content filtering. + * If middleware doesn't support contentfilter, return false. + * + * If node_names is empty, the configured node filter will be cleared. + * + * If this function return true, only parameter events from the specified node will be received. + * It affects the behavior of the following two functions. + * - add_parameter_event_callback() + * The callback will only be called for parameter events from the specified nodes which are + * configured in this function. + * - add_parameter_callback() + * The callback will only be called for parameter events from the specified nodes which are + * configured in this function and add_parameter_callback(). + * If the nodes specified in this function is different from the nodes specified in + * add_parameter_callback(), the callback will never be called. + * + * \param[in] node_names Node names to filter parameter events from. + * \returns true if configuring was successfully applied, false otherwise. + */ + RCLCPP_PUBLIC + bool configure_nodes_filter(const std::vector & node_names); + /// Remove a parameter callback registered with add_parameter_callback. /** * The parameter name and node name are inspected from the callback handle. The callback handle diff --git a/rclcpp/src/rclcpp/parameter_event_handler.cpp b/rclcpp/src/rclcpp/parameter_event_handler.cpp index b1b36b663e..83ec5ccd63 100644 --- a/rclcpp/src/rclcpp/parameter_event_handler.cpp +++ b/rclcpp/src/rclcpp/parameter_event_handler.cpp @@ -74,6 +74,35 @@ ParameterEventHandler::add_parameter_callback( return handle; } +bool +ParameterEventHandler::configure_nodes_filter(const std::vector & node_names) +{ + if (node_names.empty()) { + // Clear content filter + event_subscription_->set_content_filter(""); + return true; + } + + std::string filter_expression; + size_t total = node_names.size(); + for (size_t i = 0; i < total; ++i) { + filter_expression += "node = %" + std::to_string(i); + if (i < total - 1) { + filter_expression += " OR "; + } + } + + // Enclose each node name in "'". + std::vector quoted_node_names; + for (const auto & name : node_names) { + quoted_node_names.push_back("'" + resolve_path(name) + "'"); + } + + event_subscription_->set_content_filter(filter_expression, quoted_node_names); + + return event_subscription_->is_cft_enabled(); +} + void ParameterEventHandler::remove_parameter_callback( ParameterCallbackHandle::SharedPtr callback_handle) diff --git a/rclcpp/test/rclcpp/test_parameter_event_handler.cpp b/rclcpp/test/rclcpp/test_parameter_event_handler.cpp index 0cf4a7a445..3e14632c35 100644 --- a/rclcpp/test/rclcpp/test_parameter_event_handler.cpp +++ b/rclcpp/test/rclcpp/test_parameter_event_handler.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -20,6 +21,8 @@ #include "gtest/gtest.h" #include "rclcpp/rclcpp.hpp" +using namespace std::chrono_literals; + class TestParameterEventHandler : public rclcpp::ParameterEventHandler { public: @@ -443,3 +446,199 @@ TEST_F(TestNode, LastInFirstCallForParameterEventCallbacks) param_handler->remove_parameter_event_callback(h2); EXPECT_EQ(param_handler->num_event_callbacks(), 0UL); } + +TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterEventCallback) +{ + std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier()); + if (rmw_implementation_str != "rmw_fastrtps_cpp" && rmw_implementation_str != "rmw_connextdds") { + GTEST_SKIP() << "Content filter is now only supported in FastDDS and ConnextDDS."; + } + + const std::string remote_node_name1 = "remote_node1"; + const std::string remote_node_name2 = "remote_node2"; + rclcpp::NodeOptions options; + options.enable_rosout(false); + auto remote_node1 = std::make_shared( + remote_node_name1, options); + auto remote_node2 = std::make_shared( + remote_node_name2, options); + + const std::string remote_node1_param_name = "param_node1"; + const std::string remote_node2_param_name = "param_node2"; + remote_node1->declare_parameter(remote_node1_param_name, 10); + remote_node2->declare_parameter(remote_node2_param_name, "Default"); + + std::atomic_bool received_from_remote_node1{false}; + std::atomic_bool received_from_remote_node2{false}; + + auto cb = + [&remote_node_name1, &remote_node_name2, &received_from_remote_node1, + &received_from_remote_node2] + (const rcl_interfaces::msg::ParameterEvent & parm) { + if (parm.node == "/" + remote_node_name1) { + received_from_remote_node1 = true; + } else if (parm.node == "/" + remote_node_name2) { + received_from_remote_node2 = true; + } + }; + + // Configure to only receive parameter events from remote_node_name2 + ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2})); + + auto handler = param_handler->add_parameter_event_callback(cb); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + executor.add_node(remote_node1); + executor.add_node(remote_node2); + + auto wait_param_event = [&executor] + (std::chrono::seconds timeout, std::function condition = nullptr) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + executor.spin_some(); + if (condition && condition()) { + break; + } + std::this_thread::sleep_for(100ms); + } + }; + + { + std::thread thread(wait_param_event, 1s, nullptr); + std::this_thread::sleep_for(100ms); + remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20)); + remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc")); + thread.join(); + } + + EXPECT_EQ(received_from_remote_node1, false); + EXPECT_EQ(received_from_remote_node2, true); + + // Clear node filter and all parameter events from remote nodes should be received + ASSERT_TRUE(param_handler->configure_nodes_filter({})); + + received_from_remote_node1 = false; + received_from_remote_node2 = false; + + { + std::thread thread( + wait_param_event, + 2s, + [&received_from_remote_node1, &received_from_remote_node2]() { + if (received_from_remote_node1 && received_from_remote_node2) { + return true; + } + return false; + }); + std::this_thread::sleep_for(100ms); + remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 30)); + remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "def")); + thread.join(); + } + + EXPECT_EQ(received_from_remote_node1, true); + EXPECT_EQ(received_from_remote_node2, true); +} + +TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterCallback) +{ + std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier()); + if (rmw_implementation_str != "rmw_fastrtps_cpp" && rmw_implementation_str != "rmw_connextdds") { + GTEST_SKIP() << "Content filter is now only supported in FastDDS and ConnextDDS."; + } + + const std::string remote_node_name1 = "remote_node1"; + const std::string remote_node_name2 = "remote_node2"; + rclcpp::NodeOptions options; + options.enable_rosout(false); + auto remote_node1 = std::make_shared( + remote_node_name1, options); + auto remote_node2 = std::make_shared( + remote_node_name2, options); + + const std::string remote_node1_param_name = "param_node1"; + const std::string remote_node2_param_name = "param_node2"; + remote_node1->declare_parameter(remote_node1_param_name, 10); + remote_node2->declare_parameter(remote_node2_param_name, "Default"); + + std::atomic_bool received_from_remote_node1{false}; + std::atomic_bool received_from_remote_node2{false}; + + auto cb_remote_node1 = + [&remote_node1_param_name, &received_from_remote_node1] + (const rclcpp::Parameter & parm) { + if (parm.get_name() == remote_node1_param_name) { + received_from_remote_node1 = true; + } + }; + + auto cb_remote_node2 = + [&remote_node2_param_name, &received_from_remote_node2] + (const rclcpp::Parameter & parm) { + if (parm.get_name() == remote_node2_param_name) { + received_from_remote_node2 = true; + } + }; + + // Configure to only receive parameter events from remote_node_name2 + ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2})); + + auto handler1 = param_handler->add_parameter_callback( + remote_node1_param_name, cb_remote_node1, remote_node_name1); + auto handler2 = param_handler->add_parameter_callback( + remote_node2_param_name, cb_remote_node2, remote_node_name2); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + executor.add_node(remote_node1); + executor.add_node(remote_node2); + + auto wait_param_event = [&executor] + (std::chrono::seconds timeout, std::function condition = nullptr) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + executor.spin_some(); + if (condition && condition()) { + break; + } + std::this_thread::sleep_for(100ms); + } + }; + + { + std::thread thread(wait_param_event, 1s, nullptr); + std::this_thread::sleep_for(100ms); + remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20)); + remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc")); + thread.join(); + } + + EXPECT_EQ(received_from_remote_node1, false); + EXPECT_EQ(received_from_remote_node2, true); + + // Clear node filter and all parameter events from remote nodes should be received + ASSERT_TRUE(param_handler->configure_nodes_filter({})); + + received_from_remote_node1 = false; + received_from_remote_node2 = false; + + { + std::thread thread( + wait_param_event, + 2s, + [&received_from_remote_node1, &received_from_remote_node2]() { + if (received_from_remote_node1 && received_from_remote_node2) { + return true; + } + return false; + }); + std::this_thread::sleep_for(100ms); + remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 30)); + remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "def")); + thread.join(); + } + + EXPECT_EQ(received_from_remote_node1, true); + EXPECT_EQ(received_from_remote_node2, true); +}