Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ SubscriptionData::SubscriptionData(
type_support_(std::move(type_support)),
sub_options_(std::move(sub_options)),
last_known_published_msg_({}),
reception_sn_(0),
wait_set_data_(nullptr),
is_shutdown_(false),
initialized_(false)
Expand Down Expand Up @@ -422,8 +423,8 @@ rmw_ret_t SubscriptionData::take_one_message(
message_info->source_timestamp = msg_data->attachment.source_timestamp();
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->attachment.sequence_number();
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->reception_sequence_number =
reception_sn_.fetch_add(1, std::memory_order_relaxed);
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(
message_info->publisher_gid.data,
Expand Down Expand Up @@ -479,8 +480,8 @@ rmw_ret_t SubscriptionData::take_serialized_message(
message_info->source_timestamp = msg_data->attachment.source_timestamp();
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->attachment.sequence_number();
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->reception_sequence_number =
reception_sn_.fetch_add(1, std::memory_order_relaxed);
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(
message_info->publisher_gid.data,
Expand Down
3 changes: 3 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef DETAIL__RMW_SUBSCRIPTION_DATA_HPP_
#define DETAIL__RMW_SUBSCRIPTION_DATA_HPP_

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -152,6 +153,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::deque<std::unique_ptr<Message>> message_queue_;
// Map GID of a subscription to the sequence number of the message it published.
std::unordered_map<size_t, int64_t> last_known_published_msg_;
// Per-subscriber reception sequence number counter, incremented on every take.
std::atomic<uint64_t> reception_sn_;
// Wait set data.
rmw_wait_set_data_t * wait_set_data_;
// Callback managers.
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ bool rmw_feature_supported(rmw_feature_t feature)
{
switch (feature) {
case RMW_FEATURE_MESSAGE_INFO_PUBLICATION_SEQUENCE_NUMBER:
return false;
return true;
case RMW_FEATURE_MESSAGE_INFO_RECEPTION_SEQUENCE_NUMBER:
return false;
return true;
case RMW_MIDDLEWARE_SUPPORTS_TYPE_DISCOVERY:
return true;
case RMW_MIDDLEWARE_CAN_TAKE_DYNAMIC_MESSAGE:
Expand Down
Loading