Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions rclcpp/include/rclcpp/parameter_event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,30 @@ class ParameterEventHandler
ParameterCallbackType callback,
const std::string & node_name = "");

/// Configure which node parameter events will be received.
/**
* This function depends on middleware support for content filtering.
* If middleware doesn't support contentfilter, return false.
*
* If node_names is empty, the configured node filter will be cleared.
*
* If this function return true, only parameter events from the specified node will be received.
* It affects the behavior of the following two functions.
* - add_parameter_event_callback()
* The callback will only be called for parameter events from the specified nodes which are
* configured in this function.
* - add_parameter_callback()
* The callback will only be called for parameter events from the specified nodes which are
* configured in this function and add_parameter_callback().
* If the nodes specified in this function is different from the nodes specified in
* add_parameter_callback(), the callback will never be called.
*
* \param[in] node_names Node names to filter parameter events from.
* \returns true if configuring was successfully applied, false otherwise.
*/
RCLCPP_PUBLIC
bool configure_nodes_filter(const std::vector<std::string> & node_names);

/// Remove a parameter callback registered with add_parameter_callback.
/**
* The parameter name and node name are inspected from the callback handle. The callback handle
Expand Down
29 changes: 29 additions & 0 deletions rclcpp/src/rclcpp/parameter_event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,35 @@ ParameterEventHandler::add_parameter_callback(
return handle;
}

bool
ParameterEventHandler::configure_nodes_filter(const std::vector<std::string> & node_names)
{
if (node_names.empty()) {
// Clear content filter
event_subscription_->set_content_filter("");
return true;
}

std::string filter_expression;
size_t total = node_names.size();
for (size_t i = 0; i < total; ++i) {
filter_expression += "node = %" + std::to_string(i);
if (i < total - 1) {
filter_expression += " OR ";
}
}

// Enclose each node name in "'".
std::vector<std::string> quoted_node_names;
for (const auto & name : node_names) {
quoted_node_names.push_back("'" + resolve_path(name) + "'");
}

event_subscription_->set_content_filter(filter_expression, quoted_node_names);

return event_subscription_->is_cft_enabled();
}

void
ParameterEventHandler::remove_parameter_callback(
ParameterCallbackHandle::SharedPtr callback_handle)
Expand Down
199 changes: 199 additions & 0 deletions rclcpp/test/rclcpp/test_parameter_event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <chrono>
#include <memory>
#include <string>
Expand All @@ -20,6 +21,8 @@
#include "gtest/gtest.h"
#include "rclcpp/rclcpp.hpp"

using namespace std::chrono_literals;

class TestParameterEventHandler : public rclcpp::ParameterEventHandler
{
public:
Expand Down Expand Up @@ -443,3 +446,199 @@ TEST_F(TestNode, LastInFirstCallForParameterEventCallbacks)
param_handler->remove_parameter_event_callback(h2);
EXPECT_EQ(param_handler->num_event_callbacks(), 0UL);
}

TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterEventCallback)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str != "rmw_fastrtps_cpp" && rmw_implementation_str != "rmw_connextdds") {
GTEST_SKIP() << "Content filter is now only supported in FastDDS and ConnextDDS.";
}

const std::string remote_node_name1 = "remote_node1";
const std::string remote_node_name2 = "remote_node2";
rclcpp::NodeOptions options;
options.enable_rosout(false);
auto remote_node1 = std::make_shared<rclcpp::Node>(
remote_node_name1, options);
auto remote_node2 = std::make_shared<rclcpp::Node>(
remote_node_name2, options);

const std::string remote_node1_param_name = "param_node1";
const std::string remote_node2_param_name = "param_node2";
remote_node1->declare_parameter(remote_node1_param_name, 10);
remote_node2->declare_parameter(remote_node2_param_name, "Default");

std::atomic_bool received_from_remote_node1{false};
std::atomic_bool received_from_remote_node2{false};

auto cb =
[&remote_node_name1, &remote_node_name2, &received_from_remote_node1,
&received_from_remote_node2]
(const rcl_interfaces::msg::ParameterEvent & parm) {
if (parm.node == "/" + remote_node_name1) {
received_from_remote_node1 = true;
} else if (parm.node == "/" + remote_node_name2) {
received_from_remote_node2 = true;
}
};

// Configure to only receive parameter events from remote_node_name2
ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2}));

auto handler = param_handler->add_parameter_event_callback(cb);

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.add_node(remote_node1);
executor.add_node(remote_node2);

auto wait_param_event = [&executor]
(std::chrono::seconds timeout, std::function<bool()> condition = nullptr) {
auto start = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start < timeout) {
executor.spin_some();
if (condition && condition()) {
break;
}
std::this_thread::sleep_for(100ms);
}
};

{
std::thread thread(wait_param_event, 1s, nullptr);
std::this_thread::sleep_for(100ms);
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20));
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc"));
thread.join();
}

EXPECT_EQ(received_from_remote_node1, false);
EXPECT_EQ(received_from_remote_node2, true);

// Clear node filter and all parameter events from remote nodes should be received
ASSERT_TRUE(param_handler->configure_nodes_filter({}));

received_from_remote_node1 = false;
received_from_remote_node2 = false;

{
std::thread thread(
wait_param_event,
2s,
[&received_from_remote_node1, &received_from_remote_node2]() {
if (received_from_remote_node1 && received_from_remote_node2) {
return true;
}
return false;
});
std::this_thread::sleep_for(100ms);
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 30));
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "def"));
thread.join();
}

EXPECT_EQ(received_from_remote_node1, true);
EXPECT_EQ(received_from_remote_node2, true);
}

TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterCallback)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str != "rmw_fastrtps_cpp" && rmw_implementation_str != "rmw_connextdds") {
GTEST_SKIP() << "Content filter is now only supported in FastDDS and ConnextDDS.";
}

const std::string remote_node_name1 = "remote_node1";
const std::string remote_node_name2 = "remote_node2";
rclcpp::NodeOptions options;
options.enable_rosout(false);
auto remote_node1 = std::make_shared<rclcpp::Node>(
remote_node_name1, options);
auto remote_node2 = std::make_shared<rclcpp::Node>(
remote_node_name2, options);

const std::string remote_node1_param_name = "param_node1";
const std::string remote_node2_param_name = "param_node2";
remote_node1->declare_parameter(remote_node1_param_name, 10);
remote_node2->declare_parameter(remote_node2_param_name, "Default");

std::atomic_bool received_from_remote_node1{false};
std::atomic_bool received_from_remote_node2{false};

auto cb_remote_node1 =
[&remote_node1_param_name, &received_from_remote_node1]
(const rclcpp::Parameter & parm) {
if (parm.get_name() == remote_node1_param_name) {
received_from_remote_node1 = true;
}
};

auto cb_remote_node2 =
[&remote_node2_param_name, &received_from_remote_node2]
(const rclcpp::Parameter & parm) {
if (parm.get_name() == remote_node2_param_name) {
received_from_remote_node2 = true;
}
};

// Configure to only receive parameter events from remote_node_name2
ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2}));

auto handler1 = param_handler->add_parameter_callback(
remote_node1_param_name, cb_remote_node1, remote_node_name1);
auto handler2 = param_handler->add_parameter_callback(
remote_node2_param_name, cb_remote_node2, remote_node_name2);

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.add_node(remote_node1);
executor.add_node(remote_node2);

auto wait_param_event = [&executor]
(std::chrono::seconds timeout, std::function<bool()> condition = nullptr) {
auto start = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start < timeout) {
executor.spin_some();
if (condition && condition()) {
break;
}
std::this_thread::sleep_for(100ms);
}
};

{
std::thread thread(wait_param_event, 1s, nullptr);
std::this_thread::sleep_for(100ms);
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20));
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc"));
thread.join();
}

EXPECT_EQ(received_from_remote_node1, false);
EXPECT_EQ(received_from_remote_node2, true);

// Clear node filter and all parameter events from remote nodes should be received
ASSERT_TRUE(param_handler->configure_nodes_filter({}));

received_from_remote_node1 = false;
received_from_remote_node2 = false;

{
std::thread thread(
wait_param_event,
2s,
[&received_from_remote_node1, &received_from_remote_node2]() {
if (received_from_remote_node1 && received_from_remote_node2) {
return true;
}
return false;
});
std::this_thread::sleep_for(100ms);
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 30));
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "def"));
thread.join();
}

EXPECT_EQ(received_from_remote_node1, true);
EXPECT_EQ(received_from_remote_node2, true);
}