Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ void ReaderProxy::stop()
next_expected_acknack_count_ = 0;
last_nackfrag_count_ = 0;
changes_low_mark_ = SequenceNumber_t();

first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}

void ReaderProxy::disable_timers()
Expand Down Expand Up @@ -352,14 +355,17 @@ bool ReaderProxy::change_is_unsent(
SequenceNumber_t::unknown() != gap_seq)
{
// Check if the hole is due to irrelevant changes removed without informing the reader
if (gap_seq == first_irrelevant_removed_)
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}
else if (gap_seq < last_irrelevant_removed_)
if (first_irrelevant_removed_ <= gap_seq )
{
last_irrelevant_removed_ = gap_seq - 1;
if (gap_seq == first_irrelevant_removed_)
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}
else if (gap_seq < last_irrelevant_removed_)
{
last_irrelevant_removed_ = gap_seq - 1;
}
}
}
}
Expand Down Expand Up @@ -481,14 +487,17 @@ bool ReaderProxy::requested_changes_set(
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
{
// Check if the hole is due to irrelevant changes removed without informing the reader
if (sit == first_irrelevant_removed_)
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}
else if (sit < last_irrelevant_removed_)
if (first_irrelevant_removed_ <= sit )
{
last_irrelevant_removed_ = sit - 1;
if (sit == first_irrelevant_removed_)
{
first_irrelevant_removed_ = SequenceNumber_t::unknown();
last_irrelevant_removed_ = SequenceNumber_t::unknown();
}
else if (sit < last_irrelevant_removed_)
{
last_irrelevant_removed_ = sit - 1;
}
}
}
}
Expand Down
99 changes: 96 additions & 3 deletions test/blackbox/common/DDSBlackboxTestsContentFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ class DDSContentFilter : public testing::TestWithParam<communication_type>
{
case communication_type::INTRAPROCESS:
library_settings.intraprocess_delivery = eprosima::fastdds::IntraprocessDeliveryType::INTRAPROCESS_FULL;
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(library_settings);
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(
library_settings);
break;
case communication_type::DATASHARING:
enable_datasharing = true;
Expand All @@ -182,7 +183,8 @@ class DDSContentFilter : public testing::TestWithParam<communication_type>
{
case communication_type::INTRAPROCESS:
library_settings.intraprocess_delivery = eprosima::fastdds::IntraprocessDeliveryType::INTRAPROCESS_OFF;
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(library_settings);
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_library_settings(
library_settings);
break;
case communication_type::DATASHARING:
break;
Expand Down Expand Up @@ -877,7 +879,7 @@ TEST_P(DDSContentFilter, CorrectGAPSendingTwoReader)
{
total_count_2 = status.total_count;
}).init();
ASSERT_TRUE(reader.isInitialized());
ASSERT_TRUE(reader_2.isInitialized());

// Set up the writer
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -946,6 +948,97 @@ TEST(DDSContentFilter, filter_other_type_name)
ASSERT_EQ(dds::DomainParticipantFactory::get_instance()->delete_participant(participant), RETCODE_OK);
}


/*
* Regression test for https://eprosima.easyredmine.com/issues/24038
*
* This test checks that reusing a ReaderProxy object when a new DataReader is matched does not lead to incorrect
* behaviour due to poorly initialised data.
*/
TEST(DDSContentFilter, reusing_reader_proxy)
{
int32_t total_count {0};
int32_t total_count_2 {0};

PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, "index = 1 OR index = 2 OR index = 6", {}, true, false,
false);
reader
.reliability(RELIABLE_RELIABILITY_QOS)
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
.sample_lost_status_functor([&total_count](const SampleLostStatus& status)
{
total_count = status.total_count;
}).init();
ASSERT_TRUE(reader.isInitialized());

auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();

// Set up the writer
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
writer
.add_user_transport_to_pparams(udp_transport)
.disable_builtin_transport()
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
.history_depth(10)
//.heartbeat_period_seconds(100)
.init();
ASSERT_TRUE(writer.isInitialized());

// Wait for discovery
reader.wait_discovery();
writer.wait_discovery();

// Send 10 samples
auto data = default_helloworld_data_generator();

decltype(data) expected_data;
expected_data.push_back(*data.begin()); // index 1
expected_data.push_back(*std::next(data.begin())); // index 2
expected_data.push_back(*std::next(data.begin(), 5)); // index 6
decltype(data) expected_data_2;
expected_data_2.push_back(*std::next(data.begin(), 8)); // index 9
expected_data_2.push_back(*std::next(data.begin(), 2)); // index 3
expected_data_2.push_back(*std::next(data.begin(), 3)); // index 4

reader.startReception(expected_data);

writer.send(data, 50);

// Wait for reception and check
reader.block_for_all();
ASSERT_EQ(0, total_count);

reader.destroy();

data = default_helloworld_data_generator(4);

writer.send(data, 50);

PubSubReader<HelloWorldPubSubType> reader_2(TEST_TOPIC_NAME, "index = 3 OR index = 4 OR index = 9", {}, true, false,
false);
reader_2
.reliability(RELIABLE_RELIABILITY_QOS)
.durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS)
.sample_lost_status_functor([&total_count_2](const SampleLostStatus& status)
{
total_count_2 += status.total_count;
}).init();
ASSERT_TRUE(reader_2.isInitialized());


reader_2.wait_discovery();
writer.wait_discovery();

data = default_helloworld_data_generator(1);

writer.send(data, 50);

reader_2.startReception(expected_data_2);

// Wait for reception and check
reader_2.block_for_all();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading