Skip to content

Commit 89c5c6a

Browse files
richiwareemiliocuestaf
authored andcommitted
Reset irrelevant sequence numbers interval in proxy readers (#6241)
* Refs #24038. Regression test Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Fix Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Apply suggestion Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Apply new uncrustify Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> (cherry picked from commit 61f8677) # Conflicts: # test/blackbox/common/DDSBlackboxTestsContentFilter.cpp
1 parent 9de24a6 commit 89c5c6a

File tree

2 files changed

+115
-15
lines changed

2 files changed

+115
-15
lines changed

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ void ReaderProxy::stop()
173173
next_expected_acknack_count_ = 0;
174174
last_nackfrag_count_ = 0;
175175
changes_low_mark_ = SequenceNumber_t();
176+
177+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
178+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
176179
}
177180

178181
void ReaderProxy::disable_timers()
@@ -348,14 +351,17 @@ bool ReaderProxy::change_is_unsent(
348351
SequenceNumber_t::unknown() != gap_seq)
349352
{
350353
// Check if the hole is due to irrelevant changes removed without informing the reader
351-
if (gap_seq == first_irrelevant_removed_)
352-
{
353-
first_irrelevant_removed_ = SequenceNumber_t::unknown();
354-
last_irrelevant_removed_ = SequenceNumber_t::unknown();
355-
}
356-
else if (gap_seq < last_irrelevant_removed_)
354+
if (first_irrelevant_removed_ <= gap_seq )
357355
{
358-
last_irrelevant_removed_ = gap_seq - 1;
356+
if (gap_seq == first_irrelevant_removed_)
357+
{
358+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
359+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
360+
}
361+
else if (gap_seq < last_irrelevant_removed_)
362+
{
363+
last_irrelevant_removed_ = gap_seq - 1;
364+
}
359365
}
360366
}
361367
}
@@ -470,14 +476,17 @@ bool ReaderProxy::requested_changes_set(
470476
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
471477
{
472478
// Check if the hole is due to irrelevant changes removed without informing the reader
473-
if (sit == first_irrelevant_removed_)
474-
{
475-
first_irrelevant_removed_ = SequenceNumber_t::unknown();
476-
last_irrelevant_removed_ = SequenceNumber_t::unknown();
477-
}
478-
else if (sit < last_irrelevant_removed_)
479+
if (first_irrelevant_removed_ <= sit )
479480
{
480-
last_irrelevant_removed_ = sit - 1;
481+
if (sit == first_irrelevant_removed_)
482+
{
483+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
484+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
485+
}
486+
else if (sit < last_irrelevant_removed_)
487+
{
488+
last_irrelevant_removed_ = sit - 1;
489+
}
481490
}
482491
}
483492
}

test/blackbox/common/DDSBlackboxTestsContentFilter.cpp

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ TEST_P(DDSContentFilter, CorrectGAPSendingTwoReader)
831831
{
832832
total_count_2 = status.total_count;
833833
}).init();
834-
ASSERT_TRUE(reader.isInitialized());
834+
ASSERT_TRUE(reader_2.isInitialized());
835835

836836
// Set up the writer
837837
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
@@ -903,6 +903,97 @@ TEST(DDSContentFilter, filter_other_type_name)
903903
ASSERT_EQ(dds::DomainParticipantFactory::get_instance()->delete_participant(participant), ReturnCode_t::RETCODE_OK);
904904
}
905905

906+
907+
/*
908+
* Regression test for https://eprosima.easyredmine.com/issues/24038
909+
*
910+
* This test checks that reusing a ReaderProxy object when a new DataReader is matched does not lead to incorrect
911+
* behaviour due to poorly initialised data.
912+
*/
913+
TEST(DDSContentFilter, reusing_reader_proxy)
914+
{
915+
int32_t total_count {0};
916+
int32_t total_count_2 {0};
917+
918+
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, "index = 1 OR index = 2 OR index = 6", {}, true, false,
919+
false);
920+
reader
921+
.reliability(RELIABLE_RELIABILITY_QOS)
922+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
923+
.sample_lost_status_functor([&total_count](const SampleLostStatus& status)
924+
{
925+
total_count = status.total_count;
926+
}).init();
927+
ASSERT_TRUE(reader.isInitialized());
928+
929+
auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
930+
931+
// Set up the writer
932+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
933+
writer
934+
.add_user_transport_to_pparams(udp_transport)
935+
.disable_builtin_transport()
936+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
937+
.history_depth(10)
938+
//.heartbeat_period_seconds(100)
939+
.init();
940+
ASSERT_TRUE(writer.isInitialized());
941+
942+
// Wait for discovery
943+
reader.wait_discovery();
944+
writer.wait_discovery();
945+
946+
// Send 10 samples
947+
auto data = default_helloworld_data_generator();
948+
949+
decltype(data) expected_data;
950+
expected_data.push_back(*data.begin()); // index 1
951+
expected_data.push_back(*std::next(data.begin())); // index 2
952+
expected_data.push_back(*std::next(data.begin(), 5)); // index 6
953+
decltype(data) expected_data_2;
954+
expected_data_2.push_back(*std::next(data.begin(), 8)); // index 9
955+
expected_data_2.push_back(*std::next(data.begin(), 2)); // index 3
956+
expected_data_2.push_back(*std::next(data.begin(), 3)); // index 4
957+
958+
reader.startReception(expected_data);
959+
960+
writer.send(data, 50);
961+
962+
// Wait for reception and check
963+
reader.block_for_all();
964+
ASSERT_EQ(0, total_count);
965+
966+
reader.destroy();
967+
968+
data = default_helloworld_data_generator(4);
969+
970+
writer.send(data, 50);
971+
972+
PubSubReader<HelloWorldPubSubType> reader_2(TEST_TOPIC_NAME, "index = 3 OR index = 4 OR index = 9", {}, true, false,
973+
false);
974+
reader_2
975+
.reliability(RELIABLE_RELIABILITY_QOS)
976+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
977+
.sample_lost_status_functor([&total_count_2](const SampleLostStatus& status)
978+
{
979+
total_count_2 += status.total_count;
980+
}).init();
981+
ASSERT_TRUE(reader_2.isInitialized());
982+
983+
984+
reader_2.wait_discovery();
985+
writer.wait_discovery();
986+
987+
data = default_helloworld_data_generator(1);
988+
989+
writer.send(data, 50);
990+
991+
reader_2.startReception(expected_data_2);
992+
993+
// Wait for reception and check
994+
reader_2.block_for_all();
995+
}
996+
906997
#ifdef INSTANTIATE_TEST_SUITE_P
907998
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
908999
#else

0 commit comments

Comments
 (0)