diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 375493fa7f2..db9970323bb 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -143,7 +143,7 @@ class ReaderProxy FragmentNumber_t& next_unsent_frag, SequenceNumber_t& gap_seq, const SequenceNumber_t& min_seq, - bool& need_reactivate_periodic_heartbeat) const; + bool& need_reactivate_periodic_heartbeat); /** * Mark all changes up to the one indicated by seq_num as Acknowledged. @@ -345,11 +345,38 @@ class ReaderProxy * Get the highest fully acknowledged sequence number. * @return the highest fully acknowledged sequence number. */ - SequenceNumber_t changes_low_mark() const + inline SequenceNumber_t changes_low_mark() const { return changes_low_mark_; } + /*! + * Get the first sequence number not relevant that was removed without reader being informed. + * @return First sequence number. + */ + inline SequenceNumber_t first_irrelevant_removed() const + { + return first_irrelevant_removed_; + } + + /*! + * Get the last sequence number not relevant that was removed without reader being informed. + * @return last sequence number. + */ + inline SequenceNumber_t last_irrelevant_removed() const + { + return last_irrelevant_removed_; + } + + /*! + * Reset the interval of sequence numbers not relevant that were removed without reader being informed. + */ + inline void reset_irrelevant_removed() + { + first_irrelevant_removed_ = SequenceNumber_t::unknown(); + last_irrelevant_removed_ = SequenceNumber_t::unknown(); + } + /** * Change the interval of nack-supression event. * @param interval Time from data sending to acknack processing. @@ -450,8 +477,14 @@ class ReaderProxy //! Last NACKFRAG count. uint32_t last_nackfrag_count_; + //! Sequence number of the lowest change not fully acknowledged. SequenceNumber_t changes_low_mark_; + //! First sequence number not relevant that was removed without reader being informed. + SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()}; + //! Last sequence number not relevant that was removed without reader being informed. + SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()}; + bool active_ = false; using ChangeIterator = ResourceLimitedVector::iterator; diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index e1785389252..0b45a7be636 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -486,7 +486,13 @@ class StatefulWriter : public RTPSWriter */ bool ack_timer_expired(); - void send_heartbeat_to_all_readers(); + /*! + * Send heartbeat to all the remote readers. + * @param force_separating True to send the heartbeat separately for each reader. + * False to send a unique heartbeat to all the readers. + */ + void send_heartbeat_to_all_readers( + bool force_separating); void deliver_sample_to_intraprocesses( CacheChange_t* change); @@ -503,6 +509,10 @@ class StatefulWriter : public RTPSWriter void prepare_datasharing_delivery( CacheChange_t* change); + void add_gaps_for_removed_irrelevants( + ReaderProxy& remoteReaderProxy, + RTPSMessageGroup& group); + /** * Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group. * diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index e215280c206..f2d7ae09ad6 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -227,17 +227,32 @@ void ReaderProxy::add_change( const ChangeForReader_t& change, bool is_relevant) { - assert(change.getSequenceNumber() > changes_low_mark_); + SequenceNumber_t seq_num {change.getSequenceNumber()}; + assert(seq_num > changes_low_mark_); assert(changes_for_reader_.empty() ? true : - change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber()); + seq_num > changes_for_reader_.back().getSequenceNumber()); // Irrelevant changes are not added to the collection if (!is_relevant) { - if ( !is_reliable_ && - changes_low_mark_ + 1 == change.getSequenceNumber()) + if (is_reliable_) + { + if (!is_local_reader()) + { + if (SequenceNumber_t::unknown() == first_irrelevant_removed_) + { + first_irrelevant_removed_ = seq_num; + last_irrelevant_removed_ = seq_num; + } + else if (seq_num == last_irrelevant_removed_ + 1) + { + last_irrelevant_removed_ = seq_num; + } + } + } + else if (changes_low_mark_ + 1 == seq_num) { - changes_low_mark_ = change.getSequenceNumber(); + changes_low_mark_ = seq_num; } return; } @@ -245,7 +260,7 @@ void ReaderProxy::add_change( if (changes_for_reader_.push_back(change) == nullptr) { // This should never happen - EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber() + EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num << " to reader proxy " << guid()); eprosima::fastdds::dds::Log::Flush(); assert(false); @@ -281,7 +296,7 @@ bool ReaderProxy::change_is_unsent( FragmentNumber_t& next_unsent_frag, SequenceNumber_t& gap_seq, const SequenceNumber_t& min_seq, - bool& need_reactivate_periodic_heartbeat) const + bool& need_reactivate_periodic_heartbeat) { if (seq_num <= changes_low_mark_ || changes_for_reader_.empty()) { @@ -328,6 +343,21 @@ bool ReaderProxy::change_is_unsent( gap_seq = SequenceNumber_t::unknown(); } } + + if (SequenceNumber_t::unknown() != first_irrelevant_removed_ && + SequenceNumber_t::unknown() != gap_seq) + { + // Check if the hole is due to irrelevant changes removed without informing the reader + if (gap_seq == first_irrelevant_removed_) + { + first_irrelevant_removed_ = SequenceNumber_t::unknown(); + last_irrelevant_removed_ = SequenceNumber_t::unknown(); + } + else if (gap_seq < last_irrelevant_removed_) + { + last_irrelevant_removed_ = gap_seq - 1; + } + } } } } @@ -436,6 +466,20 @@ bool ReaderProxy::requested_changes_set( else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_)) { gap_builder.add(sit); + + if (SequenceNumber_t::unknown() != first_irrelevant_removed_) + { + // Check if the hole is due to irrelevant changes removed without informing the reader + if (sit == first_irrelevant_removed_) + { + first_irrelevant_removed_ = SequenceNumber_t::unknown(); + last_irrelevant_removed_ = SequenceNumber_t::unknown(); + } + else if (sit < last_irrelevant_removed_) + { + last_irrelevant_removed_ = sit - 1; + } + } } }); } diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index a8d7a7a2303..596a8a26d3c 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -577,11 +577,12 @@ bool StatefulWriter::change_removed_by_history( return ret_value; } -void StatefulWriter::send_heartbeat_to_all_readers() +void StatefulWriter::send_heartbeat_to_all_readers( + bool force_separating) { // This method is only called from send_periodic_heartbeat - if (m_separateSendingEnabled) + if (m_separateSendingEnabled || force_separating) { for (ReaderProxy* reader : matched_remote_readers_) { @@ -705,6 +706,15 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network( { SequenceNumber_t gap_seq; FragmentNumber_t next_unsent_frag = 0; + + if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed()) + { + // Send GAP with irrelevant changes that are not in history. + group.sender(this, (*remote_reader)->message_sender()); + add_gaps_for_removed_irrelevants(**remote_reader, group); + group.sender(this, &locator_selector); // This makes the flush_and_reset(). + } + if ((*remote_reader)->change_is_unsent(change->sequenceNumber, next_unsent_frag, gap_seq, get_seq_num_min(), need_reactivate_periodic_heartbeat) && (0 == n_fragments || min_unsent_fragment >= next_unsent_frag)) @@ -1730,7 +1740,8 @@ bool StatefulWriter::send_periodic_heartbeat( std::lock_guard guardW(mp_mutex); std::lock_guard guard_locator_selector_general(locator_selector_general_); - bool unacked_changes = false; + bool unacked_changes {false}; + bool irrelevants_removed {false}; if (!liveliness) { SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min(); @@ -1739,20 +1750,30 @@ bool StatefulWriter::send_periodic_heartbeat( first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1; } - unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_, - matched_remote_readers_, - [first_seq_to_check_acknowledge](ReaderProxy* reader) - { - return reader->has_unacknowledged(first_seq_to_check_acknowledge); - } - ); + for_matched_readers(matched_local_readers_, matched_datasharing_readers_, + matched_remote_readers_, + [first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader) + { + if (!unacked_changes) + { + unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge); + } + + if (!irrelevants_removed) + { + irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed(); + } + + return unacked_changes && irrelevants_removed; + } + ); if (unacked_changes) { try { //TODO if separating, here sends periodic for all readers, instead of ones needed it. - send_heartbeat_to_all_readers(); + send_heartbeat_to_all_readers(irrelevants_removed); } catch (const RTPSMessageGroup::timeout&) { @@ -1842,6 +1863,7 @@ void StatefulWriter::send_heartbeat_to_nts( assert(firstSeq <= lastSeq); if (!liveliness) { + add_gaps_for_removed_irrelevants(remoteReaderProxy, group); add_gaps_for_holes_in_history_(group); } } @@ -2293,6 +2315,19 @@ bool StatefulWriter::get_connections( #endif // ifdef FASTDDS_STATISTICS +void StatefulWriter::add_gaps_for_removed_irrelevants( + ReaderProxy& remoteReaderProxy, + RTPSMessageGroup& group) +{ + if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed()) + { + group.add_gap(remoteReaderProxy.first_irrelevant_removed(), + SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1), + remoteReaderProxy.guid()); + remoteReaderProxy.reset_irrelevant_removed(); + } +} + void StatefulWriter::add_gaps_for_holes_in_history_( RTPSMessageGroup& group) { diff --git a/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp b/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp index 529d38eb608..cac30ded8f5 100644 --- a/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp +++ b/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp @@ -756,6 +756,120 @@ TEST(DDSContentFilter, OnlyFilterAliveChanges) ASSERT_EQ(reader.get_sample_lost_status().total_count, 0); } +/*! + * @test Regression test for https://eprosima.easyredmine.com/issues/23919 + * This test checks GAP messages are sent correctly when there is one reader with a content filter. + * The idea is, in the middle of a GAP sequence, a heartbet period message is sent. + */ +TEST_P(DDSContentFilter, CorrectGAPSendingOneReader) +{ + int32_t total_count {0}; + // Set up the reader with a content filter for index 1, 2, and 6 + PubSubReader reader(TEST_TOPIC_NAME, "index = 1 OR index = 2 OR index = 6", {}, true, false, + false); + reader + .reliability(RELIABLE_RELIABILITY_QOS) + .sample_lost_status_functor([&total_count](const SampleLostStatus& status) + { + total_count = status.total_count; + }).init(); + ASSERT_TRUE(reader.isInitialized()); + + // Set up the writer + PubSubWriter writer(TEST_TOPIC_NAME); + writer + .heartbeat_period_seconds(0) + .heartbeat_period_nanosec(100000000) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery + reader.wait_discovery(); + writer.wait_discovery(); + + // Send 10 samples + auto data = default_helloworld_data_generator(); + + decltype(data) expected_data; + expected_data.push_back(*data.begin()); // index 1 + expected_data.push_back(*std::next(data.begin())); // index 2 + expected_data.push_back(*std::next(data.begin(), 5)); // index 6 + + reader.startReception(expected_data); + + writer.send(data, 50); + + // Wait for reception and check + reader.block_for_all(); + ASSERT_EQ(0, total_count); +} + +/*! + * @test Regression test for https://eprosima.easyredmine.com/issues/23919 + * This test checks GAP messages are sent correctly when there is two readers with a content filter. + */ +TEST_P(DDSContentFilter, CorrectGAPSendingTwoReader) +{ + int32_t total_count {0}; + int32_t total_count_2 {0}; + // Set up the reader with a content filter for index 1, 2, and 6 + PubSubReader reader(TEST_TOPIC_NAME, "index = 1 OR index = 2 OR index = 6", {}, true, false, + false); + reader + .reliability(RELIABLE_RELIABILITY_QOS) + .sample_lost_status_functor([&total_count](const SampleLostStatus& status) + { + total_count = status.total_count; + }).init(); + ASSERT_TRUE(reader.isInitialized()); + + PubSubReader reader_2(TEST_TOPIC_NAME, "index = 3 OR index = 10", {}, true, false, + false); + reader_2 + .reliability(RELIABLE_RELIABILITY_QOS) + .sample_lost_status_functor([&total_count_2](const SampleLostStatus& status) + { + total_count_2 = status.total_count; + }).init(); + ASSERT_TRUE(reader.isInitialized()); + + // Set up the writer + PubSubWriter writer(TEST_TOPIC_NAME); + writer + .heartbeat_period_seconds(0) + .heartbeat_period_nanosec(100000000) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery + reader.wait_discovery(); + reader_2.wait_discovery(); + writer.wait_discovery(2); + + // Send 10 samples + auto data = default_helloworld_data_generator(); + + decltype(data) expected_data; + expected_data.push_back(*data.begin()); // index 1 + expected_data.push_back(*std::next(data.begin())); // index 2 + expected_data.push_back(*std::next(data.begin(), 5)); // index 6 + + decltype(data) expected_data_2; + expected_data_2.push_back(*std::next(data.begin(), 2)); // index 3 + expected_data_2.push_back(*std::next(data.begin(), 9)); // index 9 + + reader.startReception(expected_data); + reader_2.startReception(expected_data_2); + + writer.send(data, 50); + + // Wait for reception and check + reader.block_for_all(); + reader_2.block_for_all(); + ASSERT_EQ(0, total_count); + ASSERT_EQ(0, total_count_2); +} + /* * Regression test for https://eprosima.easyredmine.com/issues/23265 *