Skip to content

Commit c378340

Browse files
anamudAnanya Muddukrishna
andauthored
Unique network flows (#296)
* Demonstrate unique network flows feature Signed-off-by: Ananya Muddukrishna <[email protected]> * Use new unique network flow option Signed-off-by: Ananya Muddukrishna <[email protected]> * Modify to match renamed API Signed-off-by: Ananya Muddukrishna <[email protected]> * Rename files for consistency Signed-off-by: Ananya Muddukrishna <[email protected]> * Update build configuration Signed-off-by: Ananya Muddukrishna <[email protected]> * Catch exception on failure to create subscriber Signed-off-by: Ananya Muddukrishna <[email protected]> Co-authored-by: Ananya Muddukrishna <[email protected]>
1 parent 654358e commit c378340

File tree

4 files changed

+240
-0
lines changed

4 files changed

+240
-0
lines changed

rclcpp/topics/minimal_publisher/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@ ament_target_dependencies(publisher_lambda rclcpp std_msgs)
2020
add_executable(publisher_member_function member_function.cpp)
2121
ament_target_dependencies(publisher_member_function rclcpp std_msgs)
2222

23+
add_executable(publisher_member_function_with_unique_network_flow_endpoints member_function_with_unique_network_flow_endpoints.cpp)
24+
ament_target_dependencies(publisher_member_function_with_unique_network_flow_endpoints rclcpp std_msgs)
25+
2326
add_executable(publisher_not_composable not_composable.cpp)
2427
ament_target_dependencies(publisher_not_composable rclcpp std_msgs)
2528

2629
install(TARGETS
2730
publisher_lambda
2831
publisher_member_function
32+
publisher_member_function_with_unique_network_flow_endpoints
2933
publisher_not_composable
3034
DESTINATION lib/${PROJECT_NAME}
3135
)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2020 Ericsson AB
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <chrono>
16+
#include <functional>
17+
#include <memory>
18+
#include <sstream>
19+
#include <string>
20+
#include <vector>
21+
22+
#include "rclcpp/rclcpp.hpp"
23+
#include "rclcpp/publisher_options.hpp"
24+
#include "std_msgs/msg/string.hpp"
25+
26+
using namespace std::chrono_literals;
27+
28+
class MinimalPublisherWithUniqueNetworkFlowEndpoints : public rclcpp::Node
29+
{
30+
public:
31+
MinimalPublisherWithUniqueNetworkFlowEndpoints()
32+
: Node("minimal_publisher_with_unique_network_flow_endpoints"), count_1_(0), count_2_(0)
33+
{
34+
// Create publisher with unique network flow endpoints
35+
// Enable unique network flow endpoints via options
36+
auto options_1 = rclcpp::PublisherOptions();
37+
options_1.require_unique_network_flow_endpoints =
38+
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED;
39+
publisher_1_ = this->create_publisher<std_msgs::msg::String>("topic_1", 10, options_1);
40+
timer_1_ = this->create_wall_timer(
41+
500ms, std::bind(&MinimalPublisherWithUniqueNetworkFlowEndpoints::timer_1_callback, this));
42+
43+
// Create publisher without unique network flow endpoints
44+
// Unique network flow endpoints are disabled in default options
45+
auto options_2 = rclcpp::PublisherOptions();
46+
publisher_2_ = this->create_publisher<std_msgs::msg::String>("topic_2", 10);
47+
timer_2_ = this->create_wall_timer(
48+
1000ms, std::bind(&MinimalPublisherWithUniqueNetworkFlowEndpoints::timer_2_callback, this));
49+
50+
// Get network flow endpoints
51+
auto network_flow_endpoints_1 = publisher_1_->get_network_flow_endpoints();
52+
auto network_flow_endpoints_2 = publisher_2_->get_network_flow_endpoints();
53+
54+
// Print network flow endpoints
55+
print_network_flow_endpoints(network_flow_endpoints_1);
56+
print_network_flow_endpoints(network_flow_endpoints_2);
57+
}
58+
59+
private:
60+
void timer_1_callback()
61+
{
62+
auto message = std_msgs::msg::String();
63+
message.data = "Hello, world! " + std::to_string(count_1_++);
64+
65+
RCLCPP_INFO(
66+
this->get_logger(), "Publishing: '%s'", message.data.c_str());
67+
publisher_1_->publish(message);
68+
}
69+
void timer_2_callback()
70+
{
71+
auto message = std_msgs::msg::String();
72+
message.data = "Hej, världen! " + std::to_string(count_2_++);
73+
74+
RCLCPP_INFO(
75+
this->get_logger(), "Publishing: '%s'", message.data.c_str());
76+
publisher_2_->publish(message);
77+
}
78+
/// Print network flow endpoints in JSON-like format
79+
void print_network_flow_endpoints(
80+
const std::vector<rclcpp::NetworkFlowEndpoint> & network_flow_endpoints) const
81+
{
82+
std::ostringstream stream;
83+
stream << "{\"networkFlowEndpoints\": [";
84+
bool comma_skip = true;
85+
for (auto network_flow_endpoint : network_flow_endpoints) {
86+
if (comma_skip) {
87+
comma_skip = false;
88+
} else {
89+
stream << ",";
90+
}
91+
stream << network_flow_endpoint;
92+
}
93+
stream << "]}";
94+
RCLCPP_INFO(
95+
this->get_logger(), "%s",
96+
stream.str().c_str());
97+
}
98+
rclcpp::TimerBase::SharedPtr timer_1_;
99+
rclcpp::TimerBase::SharedPtr timer_2_;
100+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_1_;
101+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_2_;
102+
size_t count_1_;
103+
size_t count_2_;
104+
};
105+
106+
int main(int argc, char * argv[])
107+
{
108+
rclcpp::init(argc, argv);
109+
rclcpp::spin(std::make_shared<MinimalPublisherWithUniqueNetworkFlowEndpoints>());
110+
rclcpp::shutdown();
111+
return 0;
112+
}

rclcpp/topics/minimal_subscriber/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@ ament_target_dependencies(subscriber_member_function rclcpp std_msgs)
2323
add_executable(subscriber_member_function_with_topic_statistics member_function_with_topic_statistics.cpp)
2424
ament_target_dependencies(subscriber_member_function_with_topic_statistics rclcpp std_msgs)
2525

26+
add_executable(subscriber_member_function_with_unique_network_flow_endpoints member_function_with_unique_network_flow_endpoints.cpp)
27+
ament_target_dependencies(subscriber_member_function_with_unique_network_flow_endpoints rclcpp std_msgs)
28+
2629
add_executable(subscriber_not_composable not_composable.cpp)
2730
ament_target_dependencies(subscriber_not_composable rclcpp std_msgs)
2831

2932
install(TARGETS
3033
subscriber_lambda
3134
subscriber_member_function
3235
subscriber_member_function_with_topic_statistics
36+
subscriber_member_function_with_unique_network_flow_endpoints
3337
subscriber_not_composable
3438
DESTINATION lib/${PROJECT_NAME})
3539

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2020 Ericsson AB
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
16+
#include <functional>
17+
#include <memory>
18+
#include <sstream>
19+
#include <string>
20+
#include <vector>
21+
22+
#include "rclcpp/rclcpp.hpp"
23+
#include "rclcpp/subscription_options.hpp"
24+
#include "std_msgs/msg/string.hpp"
25+
26+
using std::placeholders::_1;
27+
28+
class MinimalSubscriberWithUniqueNetworkFlowEndpoints : public rclcpp::Node
29+
{
30+
public:
31+
MinimalSubscriberWithUniqueNetworkFlowEndpoints()
32+
: Node("minimal_subscriber_with_unique_network_flow_endpoints")
33+
{
34+
try {
35+
// Create subscription with unique network flow endpoints
36+
// Enable unique network flow endpoints via options
37+
// Since option is strict, expect exception
38+
auto options_1 = rclcpp::SubscriptionOptions();
39+
options_1.require_unique_network_flow_endpoints =
40+
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED;
41+
42+
subscription_1_ = this->create_subscription<std_msgs::msg::String>(
43+
"topic_1", 10, std::bind(
44+
&MinimalSubscriberWithUniqueNetworkFlowEndpoints::topic_1_callback, this,
45+
_1), options_1);
46+
47+
// Create subscription without unique network flow endpoints
48+
// Unique network flow endpoints are disabled by default
49+
auto options_2 = rclcpp::SubscriptionOptions();
50+
subscription_2_ = this->create_subscription<std_msgs::msg::String>(
51+
"topic_2", 10, std::bind(
52+
&MinimalSubscriberWithUniqueNetworkFlowEndpoints::topic_2_callback, this,
53+
_1), options_2);
54+
55+
// Get network flow endpoints
56+
auto network_flow_endpoints_1 = subscription_1_->get_network_flow_endpoints();
57+
auto network_flow_endpoints_2 = subscription_2_->get_network_flow_endpoints();
58+
59+
// Check if network flow endpoints are unique
60+
for (auto network_flow_endpoint_1 : network_flow_endpoints_1) {
61+
for (auto network_flow_endpoint_2 : network_flow_endpoints_2) {
62+
if (network_flow_endpoint_1 == network_flow_endpoint_2) {
63+
RCLCPP_ERROR(
64+
this->get_logger(), "Network flow endpoints across subscriptions are not unique");
65+
break;
66+
}
67+
}
68+
}
69+
70+
// Print network flow endpoints
71+
print_network_flow_endpoints(network_flow_endpoints_1);
72+
print_network_flow_endpoints(network_flow_endpoints_2);
73+
} catch (const rclcpp::exceptions::RCLError & e) {
74+
RCLCPP_ERROR(
75+
this->get_logger(),
76+
"Error: %s",
77+
e.what());
78+
}
79+
}
80+
81+
private:
82+
void topic_1_callback(const std_msgs::msg::String::SharedPtr msg) const
83+
{
84+
RCLCPP_INFO(this->get_logger(), "Topic 1 news: '%s'", msg->data.c_str());
85+
}
86+
void topic_2_callback(const std_msgs::msg::String::SharedPtr msg) const
87+
{
88+
RCLCPP_INFO(this->get_logger(), "Topic 2 news: '%s'", msg->data.c_str());
89+
}
90+
/// Print network flow endpoints in JSON-like format
91+
void print_network_flow_endpoints(
92+
const std::vector<rclcpp::NetworkFlowEndpoint> & network_flow_endpoints) const
93+
{
94+
std::ostringstream stream;
95+
stream << "{\"networkFlowEndpoints\": [";
96+
bool comma_skip = true;
97+
for (auto network_flow_endpoint : network_flow_endpoints) {
98+
if (comma_skip) {
99+
comma_skip = false;
100+
} else {
101+
stream << ",";
102+
}
103+
stream << network_flow_endpoint;
104+
}
105+
stream << "]}";
106+
RCLCPP_INFO(
107+
this->get_logger(), "%s",
108+
stream.str().c_str());
109+
}
110+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_1_;
111+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_2_;
112+
};
113+
114+
int main(int argc, char * argv[])
115+
{
116+
rclcpp::init(argc, argv);
117+
rclcpp::spin(std::make_shared<MinimalSubscriberWithUniqueNetworkFlowEndpoints>());
118+
rclcpp::shutdown();
119+
return 0;
120+
}

0 commit comments

Comments
 (0)