Skip to content

Commit 03e65f2

Browse files
committed
Refs #23919. Fix
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
1 parent 1e9986d commit 03e65f2

File tree

4 files changed

+119
-16
lines changed

4 files changed

+119
-16
lines changed

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,22 @@ void ReaderProxy::add_change(
234234
// Irrelevant changes are not added to the collection
235235
if (!is_relevant)
236236
{
237-
if ( !is_reliable_ &&
238-
changes_low_mark_ + 1 == change.getSequenceNumber())
237+
if (is_reliable_)
238+
{
239+
if (!is_local_reader())
240+
{
241+
if (SequenceNumber_t::unknown() == first_irrelevant_removed_)
242+
{
243+
first_irrelevant_removed_ = change.getSequenceNumber();
244+
last_irrelevant_removed_ = change.getSequenceNumber();
245+
}
246+
else if (change.getSequenceNumber() == last_irrelevant_removed_ + 1)
247+
{
248+
last_irrelevant_removed_ = change.getSequenceNumber();
249+
}
250+
}
251+
}
252+
else if (changes_low_mark_ + 1 == change.getSequenceNumber())
239253
{
240254
changes_low_mark_ = change.getSequenceNumber();
241255
}
@@ -281,7 +295,7 @@ bool ReaderProxy::change_is_unsent(
281295
FragmentNumber_t& next_unsent_frag,
282296
SequenceNumber_t& gap_seq,
283297
const SequenceNumber_t& min_seq,
284-
bool& need_reactivate_periodic_heartbeat) const
298+
bool& need_reactivate_periodic_heartbeat)
285299
{
286300
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
287301
{
@@ -328,6 +342,21 @@ bool ReaderProxy::change_is_unsent(
328342
gap_seq = SequenceNumber_t::unknown();
329343
}
330344
}
345+
346+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_ &&
347+
SequenceNumber_t::unknown() != gap_seq)
348+
{
349+
// Check if the hole is due to irrelevant changes removed without informing the reader
350+
if (gap_seq == first_irrelevant_removed_)
351+
{
352+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
353+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
354+
}
355+
else if (gap_seq < last_irrelevant_removed_)
356+
{
357+
last_irrelevant_removed_ = gap_seq - 1;
358+
}
359+
}
331360
}
332361
}
333362
}
@@ -436,6 +465,20 @@ bool ReaderProxy::requested_changes_set(
436465
else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
437466
{
438467
gap_builder.add(sit);
468+
469+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
470+
{
471+
// Check if the hole is due to irrelevant changes removed without informing the reader
472+
if (sit == first_irrelevant_removed_)
473+
{
474+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
475+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
476+
}
477+
else if (sit < last_irrelevant_removed_)
478+
{
479+
last_irrelevant_removed_ = sit - 1;
480+
}
481+
}
439482
}
440483
});
441484
}

src/cpp/rtps/writer/ReaderProxy.hpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class ReaderProxy
139139
FragmentNumber_t& next_unsent_frag,
140140
SequenceNumber_t& gap_seq,
141141
const SequenceNumber_t& min_seq,
142-
bool& need_reactivate_periodic_heartbeat) const;
142+
bool& need_reactivate_periodic_heartbeat);
143143

144144
/**
145145
* Mark all changes up to the one indicated by seq_num as Acknowledged.
@@ -346,6 +346,22 @@ class ReaderProxy
346346
return changes_low_mark_;
347347
}
348348

349+
SequenceNumber_t first_irrelevant_removed() const
350+
{
351+
return first_irrelevant_removed_;
352+
}
353+
354+
SequenceNumber_t last_irrelevant_removed() const
355+
{
356+
return last_irrelevant_removed_;
357+
}
358+
359+
void reset_irrelevant_removed()
360+
{
361+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
362+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
363+
}
364+
349365
/**
350366
* Change the interval of nack-supression event.
351367
* @param interval Time from data sending to acknack processing.
@@ -448,6 +464,11 @@ class ReaderProxy
448464

449465
SequenceNumber_t changes_low_mark_;
450466

467+
//! First sequence number not relevant that was removed without reader being informed.
468+
SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()};
469+
//! Last sequence number not relevant that was removed without reader being informed.
470+
SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()};
471+
451472
bool active_ = false;
452473

453474
using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;

src/cpp/rtps/writer/StatefulWriter.cpp

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -520,11 +520,12 @@ bool StatefulWriter::change_removed_by_history(
520520
return ret_value;
521521
}
522522

523-
void StatefulWriter::send_heartbeat_to_all_readers()
523+
void StatefulWriter::send_heartbeat_to_all_readers(
524+
bool force_separating)
524525
{
525526
// This method is only called from send_periodic_heartbeat
526527

527-
if (separate_sending_enabled_)
528+
if (separate_sending_enabled_ || force_separating)
528529
{
529530
for (ReaderProxy* reader : matched_remote_readers_)
530531
{
@@ -664,6 +665,14 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
664665
should_be_sent = true;
665666
inline_qos |= (*remote_reader)->expects_inline_qos();
666667

668+
if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed())
669+
{
670+
// Send GAP with irrelevant changes that are not in history.
671+
group.sender(this, (*remote_reader)->message_sender());
672+
add_gaps_for_removed_irrelevants(**remote_reader, group);
673+
group.sender(this, &locator_selector); // This makes the flush_and_reset().
674+
}
675+
667676
// If there is a hole (removed from history or not relevants) between previous sample and this one,
668677
// send it a personal GAP.
669678
if (SequenceNumber_t::unknown() != gap_seq)
@@ -1686,7 +1695,8 @@ bool StatefulWriter::send_periodic_heartbeat(
16861695
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
16871696
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
16881697

1689-
bool unacked_changes = false;
1698+
bool unacked_changes {false};
1699+
bool irrelevants_removed {false};
16901700
if (!liveliness)
16911701
{
16921702
SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
@@ -1695,20 +1705,30 @@ bool StatefulWriter::send_periodic_heartbeat(
16951705
first_seq_to_check_acknowledge = history_->next_sequence_number() - 1;
16961706
}
16971707

1698-
unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1699-
matched_remote_readers_,
1700-
[first_seq_to_check_acknowledge](ReaderProxy* reader)
1701-
{
1702-
return reader->has_unacknowledged(first_seq_to_check_acknowledge);
1703-
}
1704-
);
1708+
for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1709+
matched_remote_readers_,
1710+
[first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader)
1711+
{
1712+
if (!unacked_changes)
1713+
{
1714+
unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge);
1715+
}
1716+
1717+
if (!irrelevants_removed)
1718+
{
1719+
irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed();
1720+
}
1721+
1722+
return unacked_changes && irrelevants_removed;
1723+
}
1724+
);
17051725

17061726
if (unacked_changes)
17071727
{
17081728
try
17091729
{
17101730
//TODO if separating, here sends periodic for all readers, instead of ones needed it.
1711-
send_heartbeat_to_all_readers();
1731+
send_heartbeat_to_all_readers(irrelevants_removed);
17121732
}
17131733
catch (const RTPSMessageGroup::timeout&)
17141734
{
@@ -1798,6 +1818,7 @@ void StatefulWriter::send_heartbeat_to_nts(
17981818
assert(firstSeq <= lastSeq);
17991819
if (!liveliness)
18001820
{
1821+
add_gaps_for_removed_irrelevants(remoteReaderProxy, group);
18011822
add_gaps_for_holes_in_history(group);
18021823
}
18031824
}
@@ -2247,6 +2268,19 @@ bool StatefulWriter::get_connections(
22472268

22482269
#endif // ifdef FASTDDS_STATISTICS
22492270

2271+
void StatefulWriter::add_gaps_for_removed_irrelevants(
2272+
ReaderProxy& remoteReaderProxy,
2273+
RTPSMessageGroup& group)
2274+
{
2275+
if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed())
2276+
{
2277+
group.add_gap(remoteReaderProxy.first_irrelevant_removed(),
2278+
SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1),
2279+
remoteReaderProxy.guid());
2280+
remoteReaderProxy.reset_irrelevant_removed();
2281+
}
2282+
}
2283+
22502284
void StatefulWriter::add_gaps_for_holes_in_history(
22512285
RTPSMessageGroup& group)
22522286
{

src/cpp/rtps/writer/StatefulWriter.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,8 @@ class StatefulWriter : public BaseWriter
385385
*/
386386
bool ack_timer_expired();
387387

388-
void send_heartbeat_to_all_readers();
388+
void send_heartbeat_to_all_readers(
389+
bool force_separating);
389390

390391
void deliver_sample_to_intraprocesses(
391392
CacheChange_t* change);
@@ -402,6 +403,10 @@ class StatefulWriter : public BaseWriter
402403
void prepare_datasharing_delivery(
403404
CacheChange_t* change);
404405

406+
void add_gaps_for_removed_irrelevants(
407+
ReaderProxy& remoteReaderProxy,
408+
RTPSMessageGroup& group);
409+
405410
/**
406411
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
407412
*

0 commit comments

Comments
 (0)