Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quality_of_service_demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ and the "Deadline missed" messages will no longer be printed.
This demo shows how to get a notification when a subscription loses a message.

This feature is not available in all RMW implementations.
`rmw_cyclonedds_cpp` and `rmw_connextdds` do support this feature.
CycloneDDS partially implements the feature and it only triggers the event under some limited circumstances, thus it's recommended to try the demo with Connext.
`rmw_zenoh_cpp`, `rmw_cyclonedds_cpp` and `rmw_connextdds` do support this feature.
CycloneDDS partially implements the feature and it only triggers the event under some limited circumstances, thus it's recommended to try the demo with Connext or `rmw_zenoh_cpp`.

In one terminal, run a listener
```
Expand Down
4 changes: 3 additions & 1 deletion quality_of_service_demo/rclcpp/src/message_lost_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class MessageLostListener : public rclcpp::Node
};
// Create the subscription. This will also create an event handler based on the above
// subscription options.
rclcpp::QoS qos{1};
qos.best_effort();
sub_ = create_subscription<sensor_msgs::msg::Image>(
"message_lost_chatter", 1, callback, sub_opts);
"message_lost_chatter", qos, callback, sub_opts);
}

private:
Expand Down
43 changes: 37 additions & 6 deletions quality_of_service_demo/rclcpp/src/message_lost_talker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void print_usage()
"Usage: message_lost_talker [-h] [-s SIZE]\n\n"
"optional arguments:\n"
"\t-h: Print this help message.\n"
"\t-4: Timer rate in Hz, default to 0.3 Hz\n"
"\t-s <message_size>: Message size in KiB, default to 8192 KiB" <<
std::endl;
}
Expand All @@ -47,7 +48,8 @@ class MessageLostTalker : public rclcpp::Node
QUALITY_OF_SERVICE_DEMO_PUBLIC
explicit MessageLostTalker(const rclcpp::NodeOptions & options)
: Node("message_lost_talker", options),
message_size_(8u * 1024u * 1024u) // 8MB
message_size_(8u * 1024u * 1024u), // 8MB
timer_period_(3000) // 3s period
{
const std::vector<std::string> & args = this->get_node_options().arguments();
if (args.size()) {
Expand All @@ -58,13 +60,38 @@ class MessageLostTalker : public rclcpp::Node
// exceptions. Raise one here, so stack unwinding happens gracefully.
std::exit(0);
}
// Argument: timer period
auto opt_it = std::find(args.cbegin(), args.cend(), "-r");
if (opt_it != args.cend()) {
++opt_it;
if (opt_it == args.cend()) {
print_usage();
std::cout << "\n-r must be followed by a positive number" << std::endl;
// TODO(ivanpauno): Update the rclcpp_components template to be able to handle
// exceptions. Raise one here, so stack unwinding happens gracefully.
std::exit(0);
}
std::istringstream input_stream(*opt_it);
double timer_rate;
input_stream >> timer_rate;
if (!input_stream) {
print_usage();
std::cout << "\n-s must be followed by a positive number, got: '" <<
*opt_it << "'" << std::endl;
// TODO(ivanpauno): Update the rclcpp_components template to be able to handle
// exceptions. Raise one here, so stack unwinding happens gracefully.
std::exit(0);
}
timer_period_ = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double, std::ratio<1>>(1.0 / timer_rate));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if -r 0, this could generate 0 division exception?

}
// Argument: message size
auto opt_it = std::find(args.cbegin(), args.cend(), "-s");
opt_it = std::find(args.cbegin(), args.cend(), "-s");
if (opt_it != args.cend()) {
++opt_it;
if (opt_it == args.cend()) {
print_usage();
std::cout << "\n-s must be followed by a possitive integer" << std::endl;
std::cout << "\n-s must be followed by a positive integer" << std::endl;
// TODO(ivanpauno): Update the rclcpp_components template to be able to handle
// exceptions. Raise one here, so stack unwinding happens gracefully.
std::exit(0);
Expand All @@ -73,7 +100,7 @@ class MessageLostTalker : public rclcpp::Node
input_stream >> message_size_;
if (!input_stream) {
print_usage();
std::cout << "\n-s must be followed by a possitive integer, got: '" <<
std::cout << "\n-s must be followed by a positive integer, got: '" <<
*opt_it << "'" << std::endl;
// TODO(ivanpauno): Update the rclcpp_components template to be able to handle
// exceptions. Raise one here, so stack unwinding happens gracefully.
Expand All @@ -97,16 +124,20 @@ class MessageLostTalker : public rclcpp::Node
pub_->publish(msg_);
};
// Create a publisher
pub_ = this->create_publisher<sensor_msgs::msg::Image>("message_lost_chatter", 1);
rclcpp::QoS qos{1};
qos.reliable();
pub_ = this->create_publisher<sensor_msgs::msg::Image>("message_lost_chatter", qos);


// Use a timer to schedule periodic message publishing.
timer_ = this->create_wall_timer(3s, publish_message);
timer_ = this->create_wall_timer(timer_period_, publish_message);
}

private:
size_t message_size_;
sensor_msgs::msg::Image msg_;
rclcpp::Publisher<sensor_msgs::msg::Image>::SharedPtr pub_;
std::chrono::milliseconds timer_period_;
rclcpp::TimerBase::SharedPtr timer_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.time import Time
from rclpy.qos import QoSReliabilityPolicy, QoSProfile

from sensor_msgs.msg import Image

Expand All @@ -37,19 +38,20 @@ def __init__(self):
event_callbacks = SubscriptionEventCallbacks(
message_lost=self._message_lost_event_callback)
# Create a subscription, passing the previously created event handlers.
qos = QoSProfile(depth=1, reliability=QoSReliabilityPolicy.BEST_EFFORT)
self.subscription = self.create_subscription(
Image,
'message_lost_chatter',
self._message_callback,
1,
qos,
event_callbacks=event_callbacks)

def _message_callback(self, message):
"""Log when a message is received."""
now = self.get_clock().now()
diff = now - Time.from_msg(message.header.stamp)
self.get_logger().info(
f'I heard an Image. Message single trip latency: [{diff.nanoseconds}]\n---')
f'I heard an Image. Message single trip latency: [{diff.nanoseconds / 1e9}]\n---')

def _message_lost_event_callback(self, message_lost_status):
"""Log the number of lost messages when the event is triggered."""
Expand Down