Skip to content

Commit 5259203

Browse files
committed
Fix DataReader history enforcement to respect max_samples_per_instance
Signed-off-by: Raül <raulojeda@eprosima.com>
1 parent 7834a88 commit 5259203

File tree

3 files changed

+129
-4
lines changed

3 files changed

+129
-4
lines changed

src/cpp/fastdds/publisher/DataWriterHistory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ HistoryAttributes DataWriterHistory::to_history_attributes(
4848

4949
if (history_qos.kind != KEEP_ALL_HISTORY_QOS)
5050
{
51-
max_samples = history_qos.depth;
51+
max_samples = std::min(history_qos.depth, resource_limits_qos.max_samples_per_instance);
5252
if (topic_kind != NO_KEY)
5353
{
5454
max_samples *= resource_limits_qos.max_instances;

src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ DataReaderHistory::DataReaderHistory(
9595
else
9696
{
9797
resource_limited_qos_.max_instances = 1;
98-
resource_limited_qos_.max_samples_per_instance = resource_limited_qos_.max_samples;
98+
resource_limited_qos_.max_samples_per_instance = std::min(
99+
resource_limited_qos_.max_samples_per_instance,
100+
resource_limited_qos_.max_samples
101+
);
99102
key_changes_allocation_.initial = resource_limited_qos_.allocated_samples;
100103
key_changes_allocation_.maximum = resource_limited_qos_.max_samples;
101104

@@ -274,7 +277,8 @@ bool DataReaderHistory::received_change_keep_last(
274277
if (find_key(a_change->instanceHandle, vit))
275278
{
276279
DataReaderInstance::ChangeCollection& instance_changes = vit->second->cache_changes;
277-
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
280+
auto effective_depth = std::min(history_qos_.depth, resource_limited_qos_.max_samples_per_instance);
281+
if (instance_changes.size() < static_cast<size_t>(effective_depth))
278282
{
279283
ret_value = true;
280284
}
@@ -809,7 +813,8 @@ bool DataReaderHistory::completed_change_keep_last(
809813
{
810814
bool ret_value = false;
811815
DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes;
812-
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
816+
auto effective_depth = std::min(history_qos_.depth, resource_limited_qos_.max_samples_per_instance);
817+
if (instance_changes.size() < static_cast<size_t>(effective_depth))
813818
{
814819
ret_value = true;
815820
}

test/unittest/dds/subscriber/DataReaderTests.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3948,6 +3948,126 @@ TEST_F(DataReaderTests, set_related_datawriter)
39483948
DomainParticipantFactory::get_instance()->delete_participant(another_participant);
39493949
}
39503950

3951+
TEST_F(DataReaderTests, history_depth_vs_max_samples_per_instance)
3952+
{
3953+
static constexpr int32_t depth = 10;
3954+
static constexpr int32_t max_samples_per_instance = 5;
3955+
static constexpr int32_t num_samples_to_write = 12;
3956+
static constexpr int32_t expected_samples = 5;
3957+
3958+
DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT;
3959+
writer_qos.history().kind = KEEP_LAST_HISTORY_QOS;
3960+
writer_qos.history().depth = depth;
3961+
writer_qos.resource_limits().max_samples = 1000;
3962+
writer_qos.resource_limits().max_instances = 10;
3963+
writer_qos.resource_limits().max_samples_per_instance = max_samples_per_instance;
3964+
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
3965+
3966+
DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
3967+
reader_qos.history().kind = KEEP_LAST_HISTORY_QOS;
3968+
reader_qos.history().depth = depth;
3969+
reader_qos.resource_limits().max_samples = 1000;
3970+
reader_qos.resource_limits().max_instances = 10;
3971+
reader_qos.resource_limits().max_samples_per_instance = max_samples_per_instance;
3972+
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
3973+
3974+
create_instance_handles();
3975+
create_entities(nullptr, reader_qos, SUBSCRIBER_QOS_DEFAULT, writer_qos);
3976+
3977+
FooType data;
3978+
data.index(0);
3979+
3980+
// Write more samples than max_samples_per_instance
3981+
for (int32_t i = 0; i < num_samples_to_write; ++i)
3982+
{
3983+
data.message()[0] = static_cast<char>('0' + i);
3984+
data.message()[1] = '\0';
3985+
ASSERT_EQ(RETCODE_OK, data_writer_->write(&data, handle_ok_));
3986+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
3987+
}
3988+
3989+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
3990+
3991+
FooSeq data_seq;
3992+
SampleInfoSeq info_seq;
3993+
ASSERT_EQ(RETCODE_OK, data_reader_->take(data_seq, info_seq, LENGTH_UNLIMITED));
3994+
3995+
int valid_samples = 0;
3996+
for (LoanableCollection::size_type i = 0; i < info_seq.length(); ++i)
3997+
{
3998+
if (info_seq[i].valid_data)
3999+
{
4000+
valid_samples++;
4001+
}
4002+
}
4003+
4004+
ASSERT_EQ(expected_samples, valid_samples)
4005+
<< "Expected max_samples_per_instance (" << expected_samples
4006+
<< ") samples, not depth (" << depth << ")";
4007+
4008+
ASSERT_EQ(RETCODE_OK, data_reader_->return_loan(data_seq, info_seq));
4009+
}
4010+
4011+
TEST_F(DataReaderTests, history_depth_vs_max_samples_per_instance_no_key)
4012+
{
4013+
static constexpr int32_t depth = 10;
4014+
static constexpr int32_t max_samples_per_instance = 5;
4015+
static constexpr int32_t max_samples = 100;
4016+
static constexpr int32_t num_samples_to_write = 12;
4017+
static constexpr int32_t expected_samples = 5;
4018+
4019+
type_.reset(new FooBoundedTypeSupport());
4020+
4021+
DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT;
4022+
writer_qos.history().kind = KEEP_LAST_HISTORY_QOS;
4023+
writer_qos.history().depth = depth;
4024+
writer_qos.resource_limits().max_samples = max_samples;
4025+
writer_qos.resource_limits().max_instances = 1;
4026+
writer_qos.resource_limits().max_samples_per_instance = max_samples_per_instance;
4027+
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
4028+
4029+
DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
4030+
reader_qos.history().kind = KEEP_LAST_HISTORY_QOS;
4031+
reader_qos.history().depth = depth;
4032+
reader_qos.resource_limits().max_samples = max_samples;
4033+
reader_qos.resource_limits().max_instances = 1;
4034+
reader_qos.resource_limits().max_samples_per_instance = max_samples_per_instance;
4035+
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
4036+
4037+
create_entities(nullptr, reader_qos, SUBSCRIBER_QOS_DEFAULT, writer_qos);
4038+
4039+
FooBoundedType data;
4040+
4041+
// Write more samples than max_samples_per_instance
4042+
for (int32_t i = 0; i < num_samples_to_write; ++i)
4043+
{
4044+
data.index(i);
4045+
ASSERT_EQ(RETCODE_OK, data_writer_->write(&data, HANDLE_NIL));
4046+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
4047+
}
4048+
4049+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
4050+
4051+
FooBoundedSeq data_seq;
4052+
SampleInfoSeq info_seq;
4053+
ASSERT_EQ(RETCODE_OK, data_reader_->take(data_seq, info_seq, LENGTH_UNLIMITED));
4054+
4055+
int valid_samples = 0;
4056+
for (LoanableCollection::size_type i = 0; i < info_seq.length(); ++i)
4057+
{
4058+
if (info_seq[i].valid_data)
4059+
{
4060+
valid_samples++;
4061+
}
4062+
}
4063+
4064+
ASSERT_EQ(expected_samples, valid_samples)
4065+
<< "NO_KEY topic should respect max_samples_per_instance (" << expected_samples
4066+
<< ") not max_samples (" << max_samples << ") or depth (" << depth << ")";
4067+
4068+
ASSERT_EQ(RETCODE_OK, data_reader_->return_loan(data_seq, info_seq));
4069+
}
4070+
39514071
int main(
39524072
int argc,
39534073
char** argv)

0 commit comments

Comments
 (0)