From e66284593a3983d448f1728ec05735d304e008a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Wed, 3 Sep 2025 11:58:31 +0200 Subject: [PATCH] Avoid sending duplicated ACKs in DataSharing (#5986) * Refs #23603: Avoid duplicate ACK in datasharing Signed-off-by: cferreiragonz * Refs #23603: Doxygen Signed-off-by: cferreiragonz * Refs #23603: Mock method Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz (cherry picked from commit 8421fb023ffa8fa1a3d99408b2658c972e546cbb) --- .../subscriber/DataReaderImpl/ReadTakeCommand.hpp | 7 +++++-- src/cpp/rtps/reader/BaseReader.hpp | 10 ++++++---- src/cpp/rtps/reader/StatefulReader.cpp | 5 +++-- src/cpp/rtps/reader/StatefulReader.hpp | 10 ++++++---- src/cpp/rtps/reader/StatelessReader.cpp | 3 ++- src/cpp/rtps/reader/StatelessReader.hpp | 10 ++++++---- test/mock/rtps/RTPSReader/rtps/reader/BaseReader.hpp | 3 ++- 7 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index ad09e87e7af..76b86b0d402 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -147,7 +147,6 @@ struct ReadTakeCommand ReturnCode_t previous_return_value = return_value_; bool added = add_sample(*it, remove_change); history_.change_was_processed_nts(change, added); - rtps::BaseReader::downcast(reader_)->end_sample_access_nts(change, wp, added); // Check if the payload is dirty if (added && !check_datasharing_validity(change, data_values_.has_ownership())) @@ -165,7 +164,11 @@ struct ReadTakeCommand added = false; } - if (remove_change || (added && take_samples)) + // Only send ACK if the change will not be removed to avoid sending the same ACK twice + bool should_remove = remove_change || (added && take_samples); + rtps::BaseReader::downcast(reader_)->end_sample_access_nts(change, wp, added, !should_remove); + + if (should_remove) { // Remove from history history_.remove_change_sub(change, it); diff --git a/src/cpp/rtps/reader/BaseReader.hpp b/src/cpp/rtps/reader/BaseReader.hpp index 596e335ce53..f909af5c9cb 100644 --- a/src/cpp/rtps/reader/BaseReader.hpp +++ b/src/cpp/rtps/reader/BaseReader.hpp @@ -221,14 +221,16 @@ class BaseReader /** * @brief Called after the change has been deserialized. * - * @param [in] change Pointer to the change being accessed. - * @param [in] writer Writer proxy the @c change belongs to. - * @param [in] mark_as_read Whether the @c change should be marked as read or not. + * @param [in] change Pointer to the change being accessed. + * @param [in] writer Writer proxy the @c change belongs to. + * @param [in] mark_as_read Whether the @c change should be marked as read or not. + * @param [in] should_send_ack Whether an ACKNACK should be sent to the writer or not. */ virtual void end_sample_access_nts( fastdds::rtps::CacheChange_t* change, fastdds::rtps::WriterProxy*& writer, - bool mark_as_read) = 0; + bool mark_as_read, + bool should_send_ack = false) = 0; /** * @brief A method to update the liveliness changed status of the reader diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 1cd27d4ddd5..1c2ac400605 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -1467,7 +1467,8 @@ bool StatefulReader::begin_sample_access_nts( void StatefulReader::end_sample_access_nts( CacheChange_t* change, WriterProxy*& writer, - bool mark_as_read) + bool mark_as_read, + bool should_send_ack) { assert(!writer || change->writerGUID == writer->guid()); @@ -1481,7 +1482,7 @@ void StatefulReader::end_sample_access_nts( } } - if (mark_as_read) + if (should_send_ack && mark_as_read) { send_ack_if_datasharing(this, history_, writer, change->sequenceNumber); } diff --git a/src/cpp/rtps/reader/StatefulReader.hpp b/src/cpp/rtps/reader/StatefulReader.hpp index fe2df2178a5..05b040c3c38 100644 --- a/src/cpp/rtps/reader/StatefulReader.hpp +++ b/src/cpp/rtps/reader/StatefulReader.hpp @@ -295,14 +295,16 @@ class StatefulReader : public fastdds::rtps::BaseReader /** * Called after the change has been deserialized. - * @param [in] change Pointer to the change being accessed. - * @param [in] writer Writer proxy the @c change belongs to. - * @param [in] mark_as_read Whether the @c change should be marked as read or not. + * @param [in] change Pointer to the change being accessed. + * @param [in] writer Writer proxy the @c change belongs to. + * @param [in] mark_as_read Whether the @c change should be marked as read or not. + * @param [in] should_send_ack Whether an ACKNACK should be sent to the writer or not. */ void end_sample_access_nts( CacheChange_t* change, WriterProxy*& writer, - bool mark_as_read) override; + bool mark_as_read, + bool should_send_ack = false) override; /** * @brief Fills the provided vector with the GUIDs of the matched writers. diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 594f46de3b6..f87fa42a247 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -513,7 +513,8 @@ bool StatelessReader::begin_sample_access_nts( void StatelessReader::end_sample_access_nts( CacheChange_t* change, WriterProxy*& /*writer*/, - bool mark_as_read) + bool mark_as_read, + bool /*should_send_ack*/) { // Mark change as read if (mark_as_read && !change->isRead) diff --git a/src/cpp/rtps/reader/StatelessReader.hpp b/src/cpp/rtps/reader/StatelessReader.hpp index cc198634efc..a3163bb29ed 100644 --- a/src/cpp/rtps/reader/StatelessReader.hpp +++ b/src/cpp/rtps/reader/StatelessReader.hpp @@ -218,14 +218,16 @@ class StatelessReader : public fastdds::rtps::BaseReader /** * Called after the change has been deserialized. - * @param [in] change Pointer to the change being accessed. - * @param [in] writer Writer proxy the @c change belongs to. - * @param [in] mark_as_read Whether the @c change should be marked as read or not. + * @param [in] change Pointer to the change being accessed. + * @param [in] writer Writer proxy the @c change belongs to. + * @param [in] mark_as_read Whether the @c change should be marked as read or not. + * @param [in] should_send_ack Whether an ACKNACK should be sent to the writer or not. */ void end_sample_access_nts( CacheChange_t* change, WriterProxy*& writer, - bool mark_as_read) override; + bool mark_as_read, + bool should_send_ack = false) override; /** * @brief Fills the provided vector with the GUIDs of the matched writers. diff --git a/test/mock/rtps/RTPSReader/rtps/reader/BaseReader.hpp b/test/mock/rtps/RTPSReader/rtps/reader/BaseReader.hpp index 3310c25ffc0..4e0130e3bea 100644 --- a/test/mock/rtps/RTPSReader/rtps/reader/BaseReader.hpp +++ b/test/mock/rtps/RTPSReader/rtps/reader/BaseReader.hpp @@ -154,7 +154,8 @@ class BaseReader : public fastdds::rtps::RTPSReader virtual void end_sample_access_nts( fastdds::rtps::CacheChange_t* /*change*/, fastdds::rtps::WriterProxy*& /*wp*/, - bool /*mark_as_read*/) + bool /*mark_as_read*/, + bool /*should_send_ack*/ = false) { }