Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ struct ReadTakeCommand
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(*it, remove_change);
history_.change_was_processed_nts(change, added);
rtps::BaseReader::downcast(reader_)->end_sample_access_nts(change, wp, added);

// Check if the payload is dirty
if (added && !check_datasharing_validity(change, data_values_.has_ownership()))
Expand All @@ -165,7 +164,11 @@ struct ReadTakeCommand
added = false;
}

if (remove_change || (added && take_samples))
// Only send ACK if the change will not be removed to avoid sending the same ACK twice
bool should_remove = remove_change || (added && take_samples);
rtps::BaseReader::downcast(reader_)->end_sample_access_nts(change, wp, added, !should_remove);

if (should_remove)
{
// Remove from history
history_.remove_change_sub(change, it);
Expand Down
10 changes: 6 additions & 4 deletions src/cpp/rtps/reader/BaseReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,16 @@ class BaseReader
/**
* @brief Called after the change has been deserialized.
*
* @param [in] change Pointer to the change being accessed.
* @param [in] writer Writer proxy the @c change belongs to.
* @param [in] mark_as_read Whether the @c change should be marked as read or not.
* @param [in] change Pointer to the change being accessed.
* @param [in] writer Writer proxy the @c change belongs to.
* @param [in] mark_as_read Whether the @c change should be marked as read or not.
* @param [in] should_send_ack Whether an ACKNACK should be sent to the writer or not.
*/
virtual void end_sample_access_nts(
fastdds::rtps::CacheChange_t* change,
fastdds::rtps::WriterProxy*& writer,
bool mark_as_read) = 0;
bool mark_as_read,
bool should_send_ack = false) = 0;

/**
* @brief A method to update the liveliness changed status of the reader
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,8 @@ bool StatefulReader::begin_sample_access_nts(
void StatefulReader::end_sample_access_nts(
CacheChange_t* change,
WriterProxy*& writer,
bool mark_as_read)
bool mark_as_read,
bool should_send_ack)
{
assert(!writer || change->writerGUID == writer->guid());

Expand All @@ -1481,7 +1482,7 @@ void StatefulReader::end_sample_access_nts(
}
}

if (mark_as_read)
if (should_send_ack && mark_as_read)
{
send_ack_if_datasharing(this, history_, writer, change->sequenceNumber);
}
Expand Down
10 changes: 6 additions & 4 deletions src/cpp/rtps/reader/StatefulReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,16 @@ class StatefulReader : public fastdds::rtps::BaseReader

/**
* Called after the change has been deserialized.
* @param [in] change Pointer to the change being accessed.
* @param [in] writer Writer proxy the @c change belongs to.
* @param [in] mark_as_read Whether the @c change should be marked as read or not.
* @param [in] change Pointer to the change being accessed.
* @param [in] writer Writer proxy the @c change belongs to.
* @param [in] mark_as_read Whether the @c change should be marked as read or not.
* @param [in] should_send_ack Whether an ACKNACK should be sent to the writer or not.
*/
void end_sample_access_nts(
CacheChange_t* change,
WriterProxy*& writer,
bool mark_as_read) override;
bool mark_as_read,
bool should_send_ack = false) override;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ bool StatelessReader::begin_sample_access_nts(
void StatelessReader::end_sample_access_nts(
CacheChange_t* change,
WriterProxy*& /*writer*/,
bool mark_as_read)
bool mark_as_read,
bool /*should_send_ack*/)
{
// Mark change as read
if (mark_as_read && !change->isRead)
Expand Down
10 changes: 6 additions & 4 deletions src/cpp/rtps/reader/StatelessReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,16 @@ class StatelessReader : public fastdds::rtps::BaseReader

/**
* Called after the change has been deserialized.
* @param [in] change Pointer to the change being accessed.
* @param [in] writer Writer proxy the @c change belongs to.
* @param [in] mark_as_read Whether the @c change should be marked as read or not.
* @param [in] change Pointer to the change being accessed.
* @param [in] writer Writer proxy the @c change belongs to.
* @param [in] mark_as_read Whether the @c change should be marked as read or not.
* @param [in] should_send_ack Whether an ACKNACK should be sent to the writer or not.
*/
void end_sample_access_nts(
CacheChange_t* change,
WriterProxy*& writer,
bool mark_as_read) override;
bool mark_as_read,
bool should_send_ack = false) override;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
Expand Down
3 changes: 2 additions & 1 deletion test/mock/rtps/RTPSReader/rtps/reader/BaseReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ class BaseReader : public fastdds::rtps::RTPSReader
virtual void end_sample_access_nts(
fastdds::rtps::CacheChange_t* /*change*/,
fastdds::rtps::WriterProxy*& /*wp*/,
bool /*mark_as_read*/)
bool /*mark_as_read*/,
bool /*should_send_ack*/ = false)
{
}

Expand Down
Loading