Skip to content

Commit 61f8677

Browse files
authored
Reset irrelevant sequence numbers interval in proxy readers (#6241)
* Refs #24038. Regression test Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Fix Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Apply suggestion Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Apply new uncrustify Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
1 parent 3e3f261 commit 61f8677

File tree

2 files changed

+119
-17
lines changed

2 files changed

+119
-17
lines changed

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ void ReaderProxy::stop()
177177
next_expected_acknack_count_ = 0;
178178
last_nackfrag_count_ = 0;
179179
changes_low_mark_ = SequenceNumber_t();
180+
181+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
182+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
180183
}
181184

182185
void ReaderProxy::disable_timers()
@@ -352,14 +355,17 @@ bool ReaderProxy::change_is_unsent(
352355
SequenceNumber_t::unknown() != gap_seq)
353356
{
354357
// Check if the hole is due to irrelevant changes removed without informing the reader
355-
if (gap_seq == first_irrelevant_removed_)
356-
{
357-
first_irrelevant_removed_ = SequenceNumber_t::unknown();
358-
last_irrelevant_removed_ = SequenceNumber_t::unknown();
359-
}
360-
else if (gap_seq < last_irrelevant_removed_)
358+
if (first_irrelevant_removed_ <= gap_seq )
361359
{
362-
last_irrelevant_removed_ = gap_seq - 1;
360+
if (gap_seq == first_irrelevant_removed_)
361+
{
362+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
363+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
364+
}
365+
else if (gap_seq < last_irrelevant_removed_)
366+
{
367+
last_irrelevant_removed_ = gap_seq - 1;
368+
}
363369
}
364370
}
365371
}
@@ -481,14 +487,17 @@ bool ReaderProxy::requested_changes_set(
481487
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
482488
{
483489
// Check if the hole is due to irrelevant changes removed without informing the reader
484-
if (sit == first_irrelevant_removed_)
485-
{
486-
first_irrelevant_removed_ = SequenceNumber_t::unknown();
487-
last_irrelevant_removed_ = SequenceNumber_t::unknown();
488-
}
489-
else if (sit < last_irrelevant_removed_)
490+
if (first_irrelevant_removed_ <= sit )
490491
{
491-
last_irrelevant_removed_ = sit - 1;
492+
if (sit == first_irrelevant_removed_)
493+
{
494+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
495+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
496+
}
497+
else if (sit < last_irrelevant_removed_)
498+
{
499+
last_irrelevant_removed_ = sit - 1;
500+
}
492501
}
493502
}
494503
}

test/blackbox/common/DDSBlackboxTestsContentFilter.cpp

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ class DDSContentFilter : public testing::TestWithParam<communication_type>
160160
{
161161
case communication_type::INTRAPROCESS:
162162
library_settings.intraprocess_delivery = eprosima::fastdds::IntraprocessDeliveryType::INTRAPROCESS_FULL;
163-
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(library_settings);
163+
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(
164+
library_settings);
164165
break;
165166
case communication_type::DATASHARING:
166167
enable_datasharing = true;
@@ -182,7 +183,8 @@ class DDSContentFilter : public testing::TestWithParam<communication_type>
182183
{
183184
case communication_type::INTRAPROCESS:
184185
library_settings.intraprocess_delivery = eprosima::fastdds::IntraprocessDeliveryType::INTRAPROCESS_OFF;
185-
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(library_settings);
186+
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(
187+
library_settings);
186188
break;
187189
case communication_type::DATASHARING:
188190
break;
@@ -877,7 +879,7 @@ TEST_P(DDSContentFilter, CorrectGAPSendingTwoReader)
877879
{
878880
total_count_2 = status.total_count;
879881
}).init();
880-
ASSERT_TRUE(reader.isInitialized());
882+
ASSERT_TRUE(reader_2.isInitialized());
881883

882884
// Set up the writer
883885
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
@@ -946,6 +948,97 @@ TEST(DDSContentFilter, filter_other_type_name)
946948
ASSERT_EQ(dds::DomainParticipantFactory::get_instance()->delete_participant(participant), RETCODE_OK);
947949
}
948950

951+
952+
/*
953+
* Regression test for https://eprosima.easyredmine.com/issues/24038
954+
*
955+
* This test checks that reusing a ReaderProxy object when a new DataReader is matched does not lead to incorrect
956+
* behaviour due to poorly initialised data.
957+
*/
958+
TEST(DDSContentFilter, reusing_reader_proxy)
959+
{
960+
int32_t total_count {0};
961+
int32_t total_count_2 {0};
962+
963+
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, "index = 1 OR index = 2 OR index = 6", {}, true, false,
964+
false);
965+
reader
966+
.reliability(RELIABLE_RELIABILITY_QOS)
967+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
968+
.sample_lost_status_functor([&total_count](const SampleLostStatus& status)
969+
{
970+
total_count = status.total_count;
971+
}).init();
972+
ASSERT_TRUE(reader.isInitialized());
973+
974+
auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
975+
976+
// Set up the writer
977+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
978+
writer
979+
.add_user_transport_to_pparams(udp_transport)
980+
.disable_builtin_transport()
981+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
982+
.history_depth(10)
983+
//.heartbeat_period_seconds(100)
984+
.init();
985+
ASSERT_TRUE(writer.isInitialized());
986+
987+
// Wait for discovery
988+
reader.wait_discovery();
989+
writer.wait_discovery();
990+
991+
// Send 10 samples
992+
auto data = default_helloworld_data_generator();
993+
994+
decltype(data) expected_data;
995+
expected_data.push_back(*data.begin()); // index 1
996+
expected_data.push_back(*std::next(data.begin())); // index 2
997+
expected_data.push_back(*std::next(data.begin(), 5)); // index 6
998+
decltype(data) expected_data_2;
999+
expected_data_2.push_back(*std::next(data.begin(), 8)); // index 9
1000+
expected_data_2.push_back(*std::next(data.begin(), 2)); // index 3
1001+
expected_data_2.push_back(*std::next(data.begin(), 3)); // index 4
1002+
1003+
reader.startReception(expected_data);
1004+
1005+
writer.send(data, 50);
1006+
1007+
// Wait for reception and check
1008+
reader.block_for_all();
1009+
ASSERT_EQ(0, total_count);
1010+
1011+
reader.destroy();
1012+
1013+
data = default_helloworld_data_generator(4);
1014+
1015+
writer.send(data, 50);
1016+
1017+
PubSubReader<HelloWorldPubSubType> reader_2(TEST_TOPIC_NAME, "index = 3 OR index = 4 OR index = 9", {}, true, false,
1018+
false);
1019+
reader_2
1020+
.reliability(RELIABLE_RELIABILITY_QOS)
1021+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
1022+
.sample_lost_status_functor([&total_count_2](const SampleLostStatus& status)
1023+
{
1024+
total_count_2 += status.total_count;
1025+
}).init();
1026+
ASSERT_TRUE(reader_2.isInitialized());
1027+
1028+
1029+
reader_2.wait_discovery();
1030+
writer.wait_discovery();
1031+
1032+
data = default_helloworld_data_generator(1);
1033+
1034+
writer.send(data, 50);
1035+
1036+
reader_2.startReception(expected_data_2);
1037+
1038+
// Wait for reception and check
1039+
reader_2.block_for_all();
1040+
}
1041+
9491042
#ifdef INSTANTIATE_TEST_SUITE_P
9501043
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
9511044
#else

0 commit comments

Comments
 (0)