Skip to content

Commit 3382ae5

Browse files
richiwarecferreiragonz
authored andcommitted
Fix GAP messages are not sent when there is no Reader requesting the DATA (#6181)
* Refs #23919. Tests Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix cornercase Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix jobs needs log Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> (cherry picked from commit 062258c)
1 parent 35b7edc commit 3382ae5

File tree

7 files changed

+265
-21
lines changed

7 files changed

+265
-21
lines changed

examples/cpp/content_filter/SubscriberApp.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ class SubscriberApp : public Application, public DataReaderListener
6464
DataReader* reader,
6565
const SubscriptionMatchedStatus& info) override;
6666

67+
void on_sample_lost(
68+
DataReader*,
69+
const SampleLostStatus& status) override
70+
{
71+
std::cout << "Sample lost: " << status.total_count << std::endl;
72+
}
73+
6774
//! Run subscriber
6875
void run() override;
6976

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,25 +227,40 @@ void ReaderProxy::add_change(
227227
const ChangeForReader_t& change,
228228
bool is_relevant)
229229
{
230-
assert(change.getSequenceNumber() > changes_low_mark_);
230+
SequenceNumber_t seq_num {change.getSequenceNumber()};
231+
assert(seq_num > changes_low_mark_);
231232
assert(changes_for_reader_.empty() ? true :
232-
change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber());
233+
seq_num > changes_for_reader_.back().getSequenceNumber());
233234

234235
// Irrelevant changes are not added to the collection
235236
if (!is_relevant)
236237
{
237-
if ( !is_reliable_ &&
238-
changes_low_mark_ + 1 == change.getSequenceNumber())
238+
if (is_reliable_)
239+
{
240+
if (!is_local_reader())
241+
{
242+
if (SequenceNumber_t::unknown() == first_irrelevant_removed_)
243+
{
244+
first_irrelevant_removed_ = seq_num;
245+
last_irrelevant_removed_ = seq_num;
246+
}
247+
else if (seq_num == last_irrelevant_removed_ + 1)
248+
{
249+
last_irrelevant_removed_ = seq_num;
250+
}
251+
}
252+
}
253+
else if (changes_low_mark_ + 1 == seq_num)
239254
{
240-
changes_low_mark_ = change.getSequenceNumber();
255+
changes_low_mark_ = seq_num;
241256
}
242257
return;
243258
}
244259

245260
if (changes_for_reader_.push_back(change) == nullptr)
246261
{
247262
// This should never happen
248-
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber()
263+
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num
249264
<< " to reader proxy " << guid());
250265
eprosima::fastdds::dds::Log::Flush();
251266
assert(false);
@@ -281,7 +296,7 @@ bool ReaderProxy::change_is_unsent(
281296
FragmentNumber_t& next_unsent_frag,
282297
SequenceNumber_t& gap_seq,
283298
const SequenceNumber_t& min_seq,
284-
bool& need_reactivate_periodic_heartbeat) const
299+
bool& need_reactivate_periodic_heartbeat)
285300
{
286301
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
287302
{
@@ -328,6 +343,21 @@ bool ReaderProxy::change_is_unsent(
328343
gap_seq = SequenceNumber_t::unknown();
329344
}
330345
}
346+
347+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_ &&
348+
SequenceNumber_t::unknown() != gap_seq)
349+
{
350+
// Check if the hole is due to irrelevant changes removed without informing the reader
351+
if (gap_seq == first_irrelevant_removed_)
352+
{
353+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
354+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
355+
}
356+
else if (gap_seq < last_irrelevant_removed_)
357+
{
358+
last_irrelevant_removed_ = gap_seq - 1;
359+
}
360+
}
331361
}
332362
}
333363
}
@@ -436,6 +466,20 @@ bool ReaderProxy::requested_changes_set(
436466
else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
437467
{
438468
gap_builder.add(sit);
469+
470+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
471+
{
472+
// Check if the hole is due to irrelevant changes removed without informing the reader
473+
if (sit == first_irrelevant_removed_)
474+
{
475+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
476+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
477+
}
478+
else if (sit < last_irrelevant_removed_)
479+
{
480+
last_irrelevant_removed_ = sit - 1;
481+
}
482+
}
439483
}
440484
});
441485
}

src/cpp/rtps/writer/ReaderProxy.hpp

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class ReaderProxy
139139
FragmentNumber_t& next_unsent_frag,
140140
SequenceNumber_t& gap_seq,
141141
const SequenceNumber_t& min_seq,
142-
bool& need_reactivate_periodic_heartbeat) const;
142+
bool& need_reactivate_periodic_heartbeat);
143143

144144
/**
145145
* Mark all changes up to the one indicated by seq_num as Acknowledged.
@@ -341,11 +341,38 @@ class ReaderProxy
341341
* Get the highest fully acknowledged sequence number.
342342
* @return the highest fully acknowledged sequence number.
343343
*/
344-
SequenceNumber_t changes_low_mark() const
344+
inline SequenceNumber_t changes_low_mark() const
345345
{
346346
return changes_low_mark_;
347347
}
348348

349+
/*!
350+
* Get the first sequence number not relevant that was removed without reader being informed.
351+
* @return First sequence number.
352+
*/
353+
inline SequenceNumber_t first_irrelevant_removed() const
354+
{
355+
return first_irrelevant_removed_;
356+
}
357+
358+
/*!
359+
* Get the last sequence number not relevant that was removed without reader being informed.
360+
* @return last sequence number.
361+
*/
362+
inline SequenceNumber_t last_irrelevant_removed() const
363+
{
364+
return last_irrelevant_removed_;
365+
}
366+
367+
/*!
368+
* Reset the interval of sequence numbers not relevant that were removed without reader being informed.
369+
*/
370+
inline void reset_irrelevant_removed()
371+
{
372+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
373+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
374+
}
375+
349376
/**
350377
* Change the interval of nack-supression event.
351378
* @param interval Time from data sending to acknack processing.
@@ -446,8 +473,14 @@ class ReaderProxy
446473
//! Last NACKFRAG count.
447474
uint32_t last_nackfrag_count_;
448475

476+
//! Sequence number of the lowest change not fully acknowledged.
449477
SequenceNumber_t changes_low_mark_;
450478

479+
//! First sequence number not relevant that was removed without reader being informed.
480+
SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()};
481+
//! Last sequence number not relevant that was removed without reader being informed.
482+
SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()};
483+
451484
bool active_ = false;
452485

453486
using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;

src/cpp/rtps/writer/StatefulWriter.cpp

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -520,11 +520,12 @@ bool StatefulWriter::change_removed_by_history(
520520
return ret_value;
521521
}
522522

523-
void StatefulWriter::send_heartbeat_to_all_readers()
523+
void StatefulWriter::send_heartbeat_to_all_readers(
524+
bool force_separating)
524525
{
525526
// This method is only called from send_periodic_heartbeat
526527

527-
if (separate_sending_enabled_)
528+
if (separate_sending_enabled_ || force_separating)
528529
{
529530
for (ReaderProxy* reader : matched_remote_readers_)
530531
{
@@ -648,6 +649,15 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
648649
{
649650
SequenceNumber_t gap_seq;
650651
FragmentNumber_t next_unsent_frag = 0;
652+
653+
if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed())
654+
{
655+
// Send GAP with irrelevant changes that are not in history.
656+
group.sender(this, (*remote_reader)->message_sender());
657+
add_gaps_for_removed_irrelevants(**remote_reader, group);
658+
group.sender(this, &locator_selector); // This makes the flush_and_reset().
659+
}
660+
651661
if ((*remote_reader)->change_is_unsent(change->sequenceNumber, next_unsent_frag, gap_seq, get_seq_num_min(),
652662
need_reactivate_periodic_heartbeat) &&
653663
(0 == n_fragments || min_unsent_fragment >= next_unsent_frag))
@@ -1684,7 +1694,8 @@ bool StatefulWriter::send_periodic_heartbeat(
16841694
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
16851695
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
16861696

1687-
bool unacked_changes = false;
1697+
bool unacked_changes {false};
1698+
bool irrelevants_removed {false};
16881699
if (!liveliness)
16891700
{
16901701
SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
@@ -1693,20 +1704,30 @@ bool StatefulWriter::send_periodic_heartbeat(
16931704
first_seq_to_check_acknowledge = history_->next_sequence_number() - 1;
16941705
}
16951706

1696-
unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1697-
matched_remote_readers_,
1698-
[first_seq_to_check_acknowledge](ReaderProxy* reader)
1699-
{
1700-
return reader->has_unacknowledged(first_seq_to_check_acknowledge);
1701-
}
1702-
);
1707+
for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1708+
matched_remote_readers_,
1709+
[first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader)
1710+
{
1711+
if (!unacked_changes)
1712+
{
1713+
unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge);
1714+
}
1715+
1716+
if (!irrelevants_removed)
1717+
{
1718+
irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed();
1719+
}
1720+
1721+
return unacked_changes && irrelevants_removed;
1722+
}
1723+
);
17031724

17041725
if (unacked_changes)
17051726
{
17061727
try
17071728
{
17081729
//TODO if separating, here sends periodic for all readers, instead of ones needed it.
1709-
send_heartbeat_to_all_readers();
1730+
send_heartbeat_to_all_readers(irrelevants_removed);
17101731
}
17111732
catch (const RTPSMessageGroup::timeout&)
17121733
{
@@ -1796,6 +1817,7 @@ void StatefulWriter::send_heartbeat_to_nts(
17961817
assert(firstSeq <= lastSeq);
17971818
if (!liveliness)
17981819
{
1820+
add_gaps_for_removed_irrelevants(remoteReaderProxy, group);
17991821
add_gaps_for_holes_in_history(group);
18001822
}
18011823
}
@@ -2247,6 +2269,19 @@ bool StatefulWriter::get_connections(
22472269

22482270
#endif // ifdef FASTDDS_STATISTICS
22492271

2272+
void StatefulWriter::add_gaps_for_removed_irrelevants(
2273+
ReaderProxy& remoteReaderProxy,
2274+
RTPSMessageGroup& group)
2275+
{
2276+
if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed())
2277+
{
2278+
group.add_gap(remoteReaderProxy.first_irrelevant_removed(),
2279+
SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1),
2280+
remoteReaderProxy.guid());
2281+
remoteReaderProxy.reset_irrelevant_removed();
2282+
}
2283+
}
2284+
22502285
void StatefulWriter::add_gaps_for_holes_in_history(
22512286
RTPSMessageGroup& group)
22522287
{

src/cpp/rtps/writer/StatefulWriter.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,13 @@ class StatefulWriter : public BaseWriter
385385
*/
386386
bool ack_timer_expired();
387387

388-
void send_heartbeat_to_all_readers();
388+
/*!
389+
* Send heartbeat to all the remote readers.
390+
* @param force_separating True to send the heartbeat separately for each reader.
391+
* False to send a unique heartbeat to all the readers.
392+
*/
393+
void send_heartbeat_to_all_readers(
394+
bool force_separating);
389395

390396
void deliver_sample_to_intraprocesses(
391397
CacheChange_t* change);
@@ -402,6 +408,10 @@ class StatefulWriter : public BaseWriter
402408
void prepare_datasharing_delivery(
403409
CacheChange_t* change);
404410

411+
void add_gaps_for_removed_irrelevants(
412+
ReaderProxy& remoteReaderProxy,
413+
RTPSMessageGroup& group);
414+
405415
/**
406416
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
407417
*

0 commit comments

Comments
 (0)