Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dad585b
Refs #22929. Add regression blackbox test.
MiguelCompany Jul 17, 2025
c914d26
Refs #22929. Improve test with listener.
MiguelCompany Jul 18, 2025
aae6e18
Refs #22929. Add `has_fake_sample` to `DataReaderInstance`.
MiguelCompany Mar 12, 2025
3fb4d70
Refs #22929. Manage state of `has_fake_sample`.
MiguelCompany Mar 12, 2025
b9018d6
Refs #22929. Handle `has_fake_sample` on `ReadTakeCommand`.
MiguelCompany Mar 12, 2025
fb81378
Refs #22929. Avoid removing instances with fake sample.
MiguelCompany Mar 12, 2025
9c5cc55
Refs #22929. Add instances with fake sample to `data_available_instan…
MiguelCompany Mar 12, 2025
cd486ea
Refs #22929. Method `writer_not_alive` returns whether a fake sample …
MiguelCompany Mar 12, 2025
dfea4b4
Refs #22929. Add fake sample independently of remaining samples
MiguelCompany Mar 12, 2025
eb61bfe
Refs #22929. Notify data_available when adding a fake sample.
MiguelCompany Jul 18, 2025
de71f19
Refs #22929. Remove shortcut for returning NO_DATA.
MiguelCompany Jul 18, 2025
7335d2d
Refs #22929. Avoid processing fake samples in PubSubReader.
MiguelCompany Jul 21, 2025
ff1bb32
Refs #22929. Avoid processing fake samples in DDSBlackboxTestsMonitor…
MiguelCompany Jul 21, 2025
06ed8b3
Refs #22929. Fix DataReaderTests.
MiguelCompany Jul 21, 2025
57a1faf
Refs #22929. Fix Latency tests.
MiguelCompany Jul 21, 2025
e9e1c04
Refs #22929. Rename `fake` into `state notification`.
MiguelCompany Jul 23, 2025
4c9c7a2
Refs #22929. Use HANDLE_NIL for publication handle.
MiguelCompany Jul 23, 2025
38885e4
Refs #22929. Use NIL publication handle to discard samples in tests.
MiguelCompany Jul 23, 2025
023dad5
Refs #22929. Use scoped name for HANDLE_NIL.
MiguelCompany Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,28 @@ bool DataReaderImpl::wait_for_unread_message(
void DataReaderImpl::set_read_communication_status(
bool trigger_value)
{
if (trigger_value)
{
auto user_reader = user_datareader_;

//First check if we can handle with on_data_on_readers
SubscriberListener* subscriber_listener =
subscriber_->get_listener_for(StatusMask::data_on_readers());
if (subscriber_listener != nullptr)
{
subscriber_listener->on_data_on_readers(subscriber_->user_subscriber_);
}
else
{
// If not, try with on_data_available
DataReaderListener* listener = get_listener_for(StatusMask::data_available());
if (listener != nullptr)
{
listener->on_data_available(user_reader);
}
}
}

StatusMask notify_status = StatusMask::data_on_readers();
subscriber_->user_subscriber_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value);

Expand Down Expand Up @@ -721,11 +743,6 @@ ReturnCode_t DataReaderImpl::read_or_take_next_sample(
return RETCODE_NOT_ENABLED;
}

if (history_.getHistorySize() == 0)
{
return RETCODE_NO_DATA;
}

#if HAVE_STRICT_REALTIME
auto max_blocking_time = std::chrono::steady_clock::now() +
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
Expand Down Expand Up @@ -920,25 +937,6 @@ void DataReaderImpl::InnerDataReaderListener::on_data_available(

if (data_reader_->on_data_available(writer_guid, first_sequence, last_sequence))
{
auto user_reader = data_reader_->user_datareader_;

//First check if we can handle with on_data_on_readers
SubscriberListener* subscriber_listener =
data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers());
if (subscriber_listener != nullptr)
{
subscriber_listener->on_data_on_readers(data_reader_->subscriber_->user_subscriber_);
}
else
{
// If not, try with on_data_available
DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::data_available());
if (listener != nullptr)
{
listener->on_data_available(user_reader);
}
}

data_reader_->set_read_communication_status(true);
}
}
Expand Down Expand Up @@ -1178,7 +1176,10 @@ void DataReaderImpl::update_subscription_matched_status(

if (count_change < 0)
{
history_.writer_not_alive(status.remoteEndpointGuid);
if (history_.writer_not_alive(status.remoteEndpointGuid))
{
set_read_communication_status(true);
}
try_notify_read_conditions();
}
}
Expand Down Expand Up @@ -1493,7 +1494,10 @@ LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
{
if (0 < status.not_alive_count_change)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
if (history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)))
{
set_read_communication_status(true);
}
try_notify_read_conditions();
}

Expand Down
41 changes: 40 additions & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ struct ReadTakeCommand
++it;
}

// Check if there is a state notification sample available
if (!finished_ && instance_->second->has_state_notification_sample)
{
// Add sample and info to collections
bool deserialization_error = false;
bool added = add_sample(nullptr, deserialization_error);
if (added && take_samples)
{
instance_->second->has_state_notification_sample = false;
}
}

if (current_slot_ > first_slot)
{
history_.instance_viewed_nts(instance_->second);
Expand Down Expand Up @@ -395,7 +407,34 @@ struct ReadTakeCommand
}

SampleInfo& info = sample_infos_[current_slot_];
generate_info(info, *instance_->second, item);
const DataReaderInstance& instance = *instance_->second;
if (item)
{
generate_info(info, instance, item);
}
else
{
fastdds::rtps::Time_t current_time;
fastdds::rtps::Time_t::now(current_time);

info.sample_state = NOT_READ_SAMPLE_STATE;
info.instance_state = instance.instance_state;
info.view_state = instance.view_state;
info.disposed_generation_count = instance.disposed_generation_count;
info.no_writers_generation_count = instance.no_writers_generation_count;
info.sample_rank = 0;
info.generation_rank = 0;
info.absolute_generation_rank = 0;
info.source_timestamp = current_time;
info.reception_timestamp = current_time;
info.instance_handle = instance_->first;
info.publication_handle = HANDLE_NIL;

info.sample_identity = rtps::SampleIdentity{};
info.related_sample_identity = rtps::SampleIdentity{};

info.valid_data = false;
}
}

bool check_datasharing_validity(
Expand Down
15 changes: 13 additions & 2 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ void DataReaderHistory::check_and_remove_instance(
{
DataReaderInstance* instance = instance_info->second.get();

if (instance->cache_changes.empty())
if (instance->cache_changes.empty() && (false == instance->has_state_notification_sample))
{
if (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state &&
instance->alive_writers.empty() &&
Expand Down Expand Up @@ -899,13 +899,24 @@ bool DataReaderHistory::update_instance_nts(
return ret;
}

void DataReaderHistory::writer_not_alive(
bool DataReaderHistory::writer_not_alive(
const GUID_t& writer_guid)
{
bool ret_val = false;

for (auto& it : instances_)
{
bool had_notification_sample = it.second->has_state_notification_sample;
it.second->writer_removed(counters_, writer_guid);
if (it.second->has_state_notification_sample && !had_notification_sample)
{
// Mark instance as data available
data_available_instances_[it.first] = it.second;
ret_val = true;
}
}

return ret_val;
}

StateFilter DataReaderHistory::get_mask_status() const noexcept
Expand Down
10 changes: 9 additions & 1 deletion src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,15 @@ class DataReaderHistory : public eprosima::fastdds::rtps::ReaderHistory
bool update_instance_nts(
CacheChange_t* const change);

void writer_not_alive(
/*!
* @brief Inform the history that a writer should be considered as not alive.
*
* @param [in] writer_guid GUID of the writer that should be considered as not alive.
*
* @return true if a state notification sample was added to at least one instance.
* This would mean that DATA_AVAILABLE status (and listener) shall be notified.
*/
bool writer_not_alive(
const fastdds::rtps::GUID_t& writer_guid);

void check_and_remove_instance(
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct DataReaderInstance
int32_t disposed_generation_count = 0;
//! Current no_writers generation of the instance
int32_t no_writers_generation_count = 0;
//! Whether the instance has a state notification sample available
bool has_state_notification_sample = false;

DataReaderInstance(
const eprosima::fastdds::ResourceLimitedContainerConfig& changes_allocation,
Expand Down Expand Up @@ -132,6 +134,7 @@ struct DataReaderInstance
break;
}

has_state_notification_sample = false;
return ret_val;
}

Expand All @@ -157,6 +160,7 @@ struct DataReaderInstance
if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state))
{
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
has_state_notification_sample = true;
}
if (ALIVE_INSTANCE_STATE == instance_state)
{
Expand Down Expand Up @@ -291,6 +295,7 @@ struct DataReaderInstance
{
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
counters_update(counters.instances_alive, counters.instances_no_writers, counters, false);
has_state_notification_sample = true;
}

ret_val = true;
Expand Down
20 changes: 17 additions & 3 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <Windows.h>
#endif // _MSC_VER
#include <fastdds/dds/builtin/topic/ParticipantBuiltinTopicData.hpp>
#include <fastdds/dds/common/InstanceHandle.hpp>
#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <fastdds/dds/core/condition/StatusCondition.hpp>
#include <fastdds/dds/core/condition/WaitSet.hpp>
Expand Down Expand Up @@ -1785,7 +1786,10 @@ class PubSubReader

if (eprosima::fastdds::dds::RETCODE_OK == datareader_->take(data_seq, info_seq))
{
current_processed_count_++;
if (info_seq[0].publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
{
current_processed_count_++;
}
return true;
}
return false;
Expand All @@ -1797,7 +1801,10 @@ class PubSubReader
eprosima::fastdds::dds::SampleInfo dds_info;
if (datareader_->take_next_sample(data, &dds_info) == eprosima::fastdds::dds::RETCODE_OK)
{
current_processed_count_++;
if (dds_info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
{
current_processed_count_++;
}
return true;
}
return false;
Expand Down Expand Up @@ -2017,7 +2024,8 @@ class PubSubReader
ReturnCode_t success = take_ ?
datareader->take_next_sample((void*)&data, &info) :
datareader->read_next_sample((void*)&data, &info);
if (eprosima::fastdds::dds::RETCODE_OK == success)
if ((eprosima::fastdds::dds::RETCODE_OK == success) &&
(info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL))
{
returnedValue = true;

Expand Down Expand Up @@ -2071,6 +2079,12 @@ class PubSubReader
type& data = datas[i];
eprosima::fastdds::dds::SampleInfo& info = infos[i];

// Skip unknown samples
if (info.publication_handle == eprosima::fastdds::dds::HANDLE_NIL)
{
continue;
}

// Check order of changes.
LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() };
ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number());
Expand Down
Loading
Loading