Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ReaderProxy
FragmentNumber_t& next_unsent_frag,
SequenceNumber_t& gap_seq,
const SequenceNumber_t& min_seq,
bool& need_reactivate_periodic_heartbeat) const;
bool& need_reactivate_periodic_heartbeat);

/**
* Mark all changes up to the one indicated by seq_num as Acknowledged.
Expand Down Expand Up @@ -345,11 +345,38 @@ class ReaderProxy
* Get the highest fully acknowledged sequence number.
* @return the highest fully acknowledged sequence number.
*/
SequenceNumber_t changes_low_mark() const
inline SequenceNumber_t changes_low_mark() const
{
return changes_low_mark_;
}

/*!
* Get the first sequence number not relevant that was removed without reader being informed.
* @return First sequence number.
*/
inline SequenceNumber_t first_irrelevant_removed() const
{
return first_irrelevant_removed_;
}

/*!
* Get the last sequence number not relevant that was removed without reader being informed.
* @return last sequence number.
*/
inline SequenceNumber_t last_irrelevant_removed() const
{
return last_irrelevant_removed_;
}

/*!
* Reset the interval of sequence numbers not relevant that were removed without reader being informed.
*/
inline void reset_irrelevant_removed()
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}

/**
* Change the interval of nack-supression event.
* @param interval Time from data sending to acknack processing.
Expand Down Expand Up @@ -450,8 +477,14 @@ class ReaderProxy
//! Last NACKFRAG count.
uint32_t last_nackfrag_count_;

//! Sequence number of the lowest change not fully acknowledged.
SequenceNumber_t changes_low_mark_;

//! First sequence number not relevant that was removed without reader being informed.
SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()};
//! Last sequence number not relevant that was removed without reader being informed.
SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()};

bool active_ = false;

using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
Expand Down
12 changes: 11 additions & 1 deletion include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,13 @@ class StatefulWriter : public RTPSWriter
*/
bool ack_timer_expired();

void send_heartbeat_to_all_readers();
/*!
* Send heartbeat to all the remote readers.
* @param force_separating True to send the heartbeat separately for each reader.
* False to send a unique heartbeat to all the readers.
*/
void send_heartbeat_to_all_readers(
bool force_separating);

void deliver_sample_to_intraprocesses(
CacheChange_t* change);
Expand All @@ -503,6 +509,10 @@ class StatefulWriter : public RTPSWriter
void prepare_datasharing_delivery(
CacheChange_t* change);

void add_gaps_for_removed_irrelevants(
ReaderProxy& remoteReaderProxy,
RTPSMessageGroup& group);

/**
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
*
Expand Down
58 changes: 51 additions & 7 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,40 @@ void ReaderProxy::add_change(
const ChangeForReader_t& change,
bool is_relevant)
{
assert(change.getSequenceNumber() > changes_low_mark_);
SequenceNumber_t seq_num {change.getSequenceNumber()};
assert(seq_num > changes_low_mark_);
assert(changes_for_reader_.empty() ? true :
change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber());
seq_num > changes_for_reader_.back().getSequenceNumber());

// Irrelevant changes are not added to the collection
if (!is_relevant)
{
if ( !is_reliable_ &&
changes_low_mark_ + 1 == change.getSequenceNumber())
if (is_reliable_)
{
if (!is_local_reader())
{
if (SequenceNumber_t::unknown() == first_irrelevant_removed_)
{
first_irrelevant_removed_ = seq_num;
last_irrelevant_removed_ = seq_num;
}
else if (seq_num == last_irrelevant_removed_ + 1)
{
last_irrelevant_removed_ = seq_num;
}
}
}
else if (changes_low_mark_ + 1 == seq_num)
{
changes_low_mark_ = change.getSequenceNumber();
changes_low_mark_ = seq_num;
}
return;
}

if (changes_for_reader_.push_back(change) == nullptr)
{
// This should never happen
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber()
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num
<< " to reader proxy " << guid());
eprosima::fastdds::dds::Log::Flush();
assert(false);
Expand Down Expand Up @@ -281,7 +296,7 @@ bool ReaderProxy::change_is_unsent(
FragmentNumber_t& next_unsent_frag,
SequenceNumber_t& gap_seq,
const SequenceNumber_t& min_seq,
bool& need_reactivate_periodic_heartbeat) const
bool& need_reactivate_periodic_heartbeat)
{
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
{
Expand Down Expand Up @@ -328,6 +343,21 @@ bool ReaderProxy::change_is_unsent(
gap_seq = SequenceNumber_t::unknown();
}
}

if (SequenceNumber_t::unknown() != first_irrelevant_removed_ &&
SequenceNumber_t::unknown() != gap_seq)
{
// Check if the hole is due to irrelevant changes removed without informing the reader
if (gap_seq == first_irrelevant_removed_)
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}
else if (gap_seq < last_irrelevant_removed_)
{
last_irrelevant_removed_ = gap_seq - 1;
}
}
}
}
}
Expand Down Expand Up @@ -436,6 +466,20 @@ bool ReaderProxy::requested_changes_set(
else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
{
gap_builder.add(sit);

if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
{
// Check if the hole is due to irrelevant changes removed without informing the reader
if (sit == first_irrelevant_removed_)
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}
else if (sit < last_irrelevant_removed_)
{
last_irrelevant_removed_ = sit - 1;
}
}
}
});
}
Expand Down
57 changes: 46 additions & 11 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,12 @@ bool StatefulWriter::change_removed_by_history(
return ret_value;
}

void StatefulWriter::send_heartbeat_to_all_readers()
void StatefulWriter::send_heartbeat_to_all_readers(
bool force_separating)
{
// This method is only called from send_periodic_heartbeat

if (m_separateSendingEnabled)
if (m_separateSendingEnabled || force_separating)
{
for (ReaderProxy* reader : matched_remote_readers_)
{
Expand Down Expand Up @@ -705,6 +706,15 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
{
SequenceNumber_t gap_seq;
FragmentNumber_t next_unsent_frag = 0;

if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed())
{
// Send GAP with irrelevant changes that are not in history.
group.sender(this, (*remote_reader)->message_sender());
add_gaps_for_removed_irrelevants(**remote_reader, group);
group.sender(this, &locator_selector); // This makes the flush_and_reset().
}

if ((*remote_reader)->change_is_unsent(change->sequenceNumber, next_unsent_frag, gap_seq, get_seq_num_min(),
need_reactivate_periodic_heartbeat) &&
(0 == n_fragments || min_unsent_fragment >= next_unsent_frag))
Expand Down Expand Up @@ -1730,7 +1740,8 @@ bool StatefulWriter::send_periodic_heartbeat(
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);

bool unacked_changes = false;
bool unacked_changes {false};
bool irrelevants_removed {false};
if (!liveliness)
{
SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
Expand All @@ -1739,20 +1750,30 @@ bool StatefulWriter::send_periodic_heartbeat(
first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1;
}

unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
matched_remote_readers_,
[first_seq_to_check_acknowledge](ReaderProxy* reader)
{
return reader->has_unacknowledged(first_seq_to_check_acknowledge);
}
);
for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
matched_remote_readers_,
[first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader)
{
if (!unacked_changes)
{
unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge);
}

if (!irrelevants_removed)
{
irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed();
}

return unacked_changes && irrelevants_removed;
}
);

if (unacked_changes)
{
try
{
//TODO if separating, here sends periodic for all readers, instead of ones needed it.
send_heartbeat_to_all_readers();
send_heartbeat_to_all_readers(irrelevants_removed);
}
catch (const RTPSMessageGroup::timeout&)
{
Expand Down Expand Up @@ -1842,6 +1863,7 @@ void StatefulWriter::send_heartbeat_to_nts(
assert(firstSeq <= lastSeq);
if (!liveliness)
{
add_gaps_for_removed_irrelevants(remoteReaderProxy, group);
add_gaps_for_holes_in_history_(group);
}
}
Expand Down Expand Up @@ -2293,6 +2315,19 @@ bool StatefulWriter::get_connections(

#endif // ifdef FASTDDS_STATISTICS

void StatefulWriter::add_gaps_for_removed_irrelevants(
ReaderProxy& remoteReaderProxy,
RTPSMessageGroup& group)
{
if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed())
{
group.add_gap(remoteReaderProxy.first_irrelevant_removed(),
SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1),
remoteReaderProxy.guid());
remoteReaderProxy.reset_irrelevant_removed();
}
}

void StatefulWriter::add_gaps_for_holes_in_history_(
RTPSMessageGroup& group)
{
Expand Down
Loading
Loading