Skip to content

Commit efa40d1

Browse files
richiwareMiguelCompany
authored andcommitted
Reset irrelevant sequence numbers interval in proxy readers (#6241)
* Refs #24038. Regression test Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Fix Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Apply suggestion Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #24038. Apply new uncrustify Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
1 parent f260ca3 commit efa40d1

File tree

2 files changed

+119
-17
lines changed

2 files changed

+119
-17
lines changed

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ void ReaderProxy::stop()
173173
next_expected_acknack_count_ = 0;
174174
last_nackfrag_count_ = 0;
175175
changes_low_mark_ = SequenceNumber_t();
176+
177+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
178+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
176179
}
177180

178181
void ReaderProxy::disable_timers()
@@ -348,14 +351,17 @@ bool ReaderProxy::change_is_unsent(
348351
SequenceNumber_t::unknown() != gap_seq)
349352
{
350353
// Check if the hole is due to irrelevant changes removed without informing the reader
351-
if (gap_seq == first_irrelevant_removed_)
352-
{
353-
first_irrelevant_removed_ = SequenceNumber_t::unknown();
354-
last_irrelevant_removed_ = SequenceNumber_t::unknown();
355-
}
356-
else if (gap_seq < last_irrelevant_removed_)
354+
if (first_irrelevant_removed_ <= gap_seq )
357355
{
358-
last_irrelevant_removed_ = gap_seq - 1;
356+
if (gap_seq == first_irrelevant_removed_)
357+
{
358+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
359+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
360+
}
361+
else if (gap_seq < last_irrelevant_removed_)
362+
{
363+
last_irrelevant_removed_ = gap_seq - 1;
364+
}
359365
}
360366
}
361367
}
@@ -470,14 +476,17 @@ bool ReaderProxy::requested_changes_set(
470476
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
471477
{
472478
// Check if the hole is due to irrelevant changes removed without informing the reader
473-
if (sit == first_irrelevant_removed_)
474-
{
475-
first_irrelevant_removed_ = SequenceNumber_t::unknown();
476-
last_irrelevant_removed_ = SequenceNumber_t::unknown();
477-
}
478-
else if (sit < last_irrelevant_removed_)
479+
if (first_irrelevant_removed_ <= sit )
479480
{
480-
last_irrelevant_removed_ = sit - 1;
481+
if (sit == first_irrelevant_removed_)
482+
{
483+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
484+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
485+
}
486+
else if (sit < last_irrelevant_removed_)
487+
{
488+
last_irrelevant_removed_ = sit - 1;
489+
}
481490
}
482491
}
483492
}

test/blackbox/common/DDSBlackboxTestsContentFilter.cpp

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ class DDSContentFilter : public testing::TestWithParam<communication_type>
160160
{
161161
case communication_type::INTRAPROCESS:
162162
library_settings.intraprocess_delivery = eprosima::fastdds::IntraprocessDeliveryType::INTRAPROCESS_FULL;
163-
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(library_settings);
163+
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(
164+
library_settings);
164165
break;
165166
case communication_type::DATASHARING:
166167
enable_datasharing = true;
@@ -182,7 +183,8 @@ class DDSContentFilter : public testing::TestWithParam<communication_type>
182183
{
183184
case communication_type::INTRAPROCESS:
184185
library_settings.intraprocess_delivery = eprosima::fastdds::IntraprocessDeliveryType::INTRAPROCESS_OFF;
185-
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(library_settings);
186+
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(
187+
library_settings);
186188
break;
187189
case communication_type::DATASHARING:
188190
break;
@@ -784,7 +786,7 @@ TEST_P(DDSContentFilter, CorrectGAPSendingTwoReader)
784786
{
785787
total_count_2 = status.total_count;
786788
}).init();
787-
ASSERT_TRUE(reader.isInitialized());
789+
ASSERT_TRUE(reader_2.isInitialized());
788790

789791
// Set up the writer
790792
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
@@ -853,6 +855,97 @@ TEST(DDSContentFilter, filter_other_type_name)
853855
ASSERT_EQ(dds::DomainParticipantFactory::get_instance()->delete_participant(participant), RETCODE_OK);
854856
}
855857

858+
859+
/*
860+
* Regression test for https://eprosima.easyredmine.com/issues/24038
861+
*
862+
* This test checks that reusing a ReaderProxy object when a new DataReader is matched does not lead to incorrect
863+
* behaviour due to poorly initialised data.
864+
*/
865+
TEST(DDSContentFilter, reusing_reader_proxy)
866+
{
867+
int32_t total_count {0};
868+
int32_t total_count_2 {0};
869+
870+
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, "index = 1 OR index = 2 OR index = 6", {}, true, false,
871+
false);
872+
reader
873+
.reliability(RELIABLE_RELIABILITY_QOS)
874+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
875+
.sample_lost_status_functor([&total_count](const SampleLostStatus& status)
876+
{
877+
total_count = status.total_count;
878+
}).init();
879+
ASSERT_TRUE(reader.isInitialized());
880+
881+
auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
882+
883+
// Set up the writer
884+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
885+
writer
886+
.add_user_transport_to_pparams(udp_transport)
887+
.disable_builtin_transport()
888+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
889+
.history_depth(10)
890+
//.heartbeat_period_seconds(100)
891+
.init();
892+
ASSERT_TRUE(writer.isInitialized());
893+
894+
// Wait for discovery
895+
reader.wait_discovery();
896+
writer.wait_discovery();
897+
898+
// Send 10 samples
899+
auto data = default_helloworld_data_generator();
900+
901+
decltype(data) expected_data;
902+
expected_data.push_back(*data.begin()); // index 1
903+
expected_data.push_back(*std::next(data.begin())); // index 2
904+
expected_data.push_back(*std::next(data.begin(), 5)); // index 6
905+
decltype(data) expected_data_2;
906+
expected_data_2.push_back(*std::next(data.begin(), 8)); // index 9
907+
expected_data_2.push_back(*std::next(data.begin(), 2)); // index 3
908+
expected_data_2.push_back(*std::next(data.begin(), 3)); // index 4
909+
910+
reader.startReception(expected_data);
911+
912+
writer.send(data, 50);
913+
914+
// Wait for reception and check
915+
reader.block_for_all();
916+
ASSERT_EQ(0, total_count);
917+
918+
reader.destroy();
919+
920+
data = default_helloworld_data_generator(4);
921+
922+
writer.send(data, 50);
923+
924+
PubSubReader<HelloWorldPubSubType> reader_2(TEST_TOPIC_NAME, "index = 3 OR index = 4 OR index = 9", {}, true, false,
925+
false);
926+
reader_2
927+
.reliability(RELIABLE_RELIABILITY_QOS)
928+
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
929+
.sample_lost_status_functor([&total_count_2](const SampleLostStatus& status)
930+
{
931+
total_count_2 += status.total_count;
932+
}).init();
933+
ASSERT_TRUE(reader_2.isInitialized());
934+
935+
936+
reader_2.wait_discovery();
937+
writer.wait_discovery();
938+
939+
data = default_helloworld_data_generator(1);
940+
941+
writer.send(data, 50);
942+
943+
reader_2.startReception(expected_data_2);
944+
945+
// Wait for reception and check
946+
reader_2.block_for_all();
947+
}
948+
856949
#ifdef INSTANTIATE_TEST_SUITE_P
857950
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
858951
#else

0 commit comments

Comments
 (0)