diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 90feef34..db7a1bfa 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -151,6 +151,7 @@ SubscriptionData::SubscriptionData( type_support_(std::move(type_support)), sub_options_(std::move(sub_options)), last_known_published_msg_({}), + reception_sn_(0), wait_set_data_(nullptr), is_shutdown_(false), initialized_(false) @@ -422,8 +423,8 @@ rmw_ret_t SubscriptionData::take_one_message( message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; + message_info->reception_sequence_number = + reception_sn_.fetch_add(1, std::memory_order_relaxed); message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; memcpy( message_info->publisher_gid.data, @@ -479,8 +480,8 @@ rmw_ret_t SubscriptionData::take_serialized_message( message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; + message_info->reception_sequence_number = + reception_sn_.fetch_add(1, std::memory_order_relaxed); message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; memcpy( message_info->publisher_gid.data, diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 90256eba..517a13ea 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -15,6 +15,7 @@ #ifndef DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ #define DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ +#include #include #include #include @@ -152,6 +153,8 @@ class SubscriptionData final : public std::enable_shared_from_this> message_queue_; // Map GID of a subscription to the sequence number of the message it published. std::unordered_map last_known_published_msg_; + // Per-subscriber reception sequence number counter, incremented on every take. + std::atomic reception_sn_; // Wait set data. rmw_wait_set_data_t * wait_set_data_; // Callback managers. diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index bde3c0e2..2d0be13b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -140,9 +140,9 @@ bool rmw_feature_supported(rmw_feature_t feature) { switch (feature) { case RMW_FEATURE_MESSAGE_INFO_PUBLICATION_SEQUENCE_NUMBER: - return false; + return true; case RMW_FEATURE_MESSAGE_INFO_RECEPTION_SEQUENCE_NUMBER: - return false; + return true; case RMW_MIDDLEWARE_SUPPORTS_TYPE_DISCOVERY: return true; case RMW_MIDDLEWARE_CAN_TAKE_DYNAMIC_MESSAGE: