Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,28 @@ bool DataReaderImpl::wait_for_unread_message(
void DataReaderImpl::set_read_communication_status(
bool trigger_value)
{
if (trigger_value)
{
auto user_reader = user_datareader_;

//First check if we can handle with on_data_on_readers
SubscriberListener* subscriber_listener =
subscriber_->get_listener_for(StatusMask::data_on_readers());
if (subscriber_listener != nullptr)
{
subscriber_listener->on_data_on_readers(subscriber_->user_subscriber_);
}
else
{
// If not, try with on_data_available
DataReaderListener* listener = get_listener_for(StatusMask::data_available());
if (listener != nullptr)
{
listener->on_data_available(user_reader);
}
}
}

StatusMask notify_status = StatusMask::data_on_readers();
subscriber_->user_subscriber_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value);

Expand Down Expand Up @@ -718,11 +740,6 @@ ReturnCode_t DataReaderImpl::read_or_take_next_sample(
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

if (history_.getHistorySize() == 0)
{
return ReturnCode_t::RETCODE_NO_DATA;
}

#if HAVE_STRICT_REALTIME
auto max_blocking_time = std::chrono::steady_clock::now() +
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
Expand Down Expand Up @@ -917,25 +934,6 @@ void DataReaderImpl::InnerDataReaderListener::on_data_available(

if (data_reader_->on_data_available(writer_guid, first_sequence, last_sequence))
{
auto user_reader = data_reader_->user_datareader_;

//First check if we can handle with on_data_on_readers
SubscriberListener* subscriber_listener =
data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers());
if (subscriber_listener != nullptr)
{
subscriber_listener->on_data_on_readers(data_reader_->subscriber_->user_subscriber_);
}
else
{
// If not, try with on_data_available
DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::data_available());
if (listener != nullptr)
{
listener->on_data_available(user_reader);
}
}

data_reader_->set_read_communication_status(true);
}
}
Expand Down Expand Up @@ -1175,7 +1173,10 @@ void DataReaderImpl::update_subscription_matched_status(

if (count_change < 0)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
if (history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)))
{
set_read_communication_status(true);
}
try_notify_read_conditions();
}
}
Expand Down Expand Up @@ -1463,7 +1464,10 @@ LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
{
if (0 < status.not_alive_count_change)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
if (history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)))
{
set_read_communication_status(true);
}
try_notify_read_conditions();
}

Expand Down
41 changes: 40 additions & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ struct ReadTakeCommand
++it;
}

// Check if there is a state notification sample available
if (!finished_ && instance_->second->has_state_notification_sample)
{
// Add sample and info to collections
bool deserialization_error = false;
bool added = add_sample(nullptr, deserialization_error);
if (added && take_samples)
{
instance_->second->has_state_notification_sample = false;
}
}

if (current_slot_ > first_slot)
{
history_.instance_viewed_nts(instance_->second);
Expand Down Expand Up @@ -395,7 +407,34 @@ struct ReadTakeCommand
}

SampleInfo& info = sample_infos_[current_slot_];
generate_info(info, *instance_->second, item);
const DataReaderInstance& instance = *instance_->second;
if (item)
{
generate_info(info, instance, item);
}
else
{
fastrtps::rtps::Time_t current_time;
fastrtps::rtps::Time_t::now(current_time);

info.sample_state = NOT_READ_SAMPLE_STATE;
info.instance_state = instance.instance_state;
info.view_state = instance.view_state;
info.disposed_generation_count = instance.disposed_generation_count;
info.no_writers_generation_count = instance.no_writers_generation_count;
info.sample_rank = 0;
info.generation_rank = 0;
info.absolute_generation_rank = 0;
info.source_timestamp = current_time;
info.reception_timestamp = current_time;
info.instance_handle = instance_->first;
info.publication_handle = HANDLE_NIL;

info.sample_identity = fastrtps::rtps::SampleIdentity{};
info.related_sample_identity = fastrtps::rtps::SampleIdentity{};

info.valid_data = false;
}
}

bool check_datasharing_validity(
Expand Down
15 changes: 13 additions & 2 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ void DataReaderHistory::check_and_remove_instance(
{
DataReaderInstance* instance = instance_info->second.get();

if (instance->cache_changes.empty())
if (instance->cache_changes.empty() && (false == instance->has_state_notification_sample))
{
if (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state &&
instance->alive_writers.empty() &&
Expand Down Expand Up @@ -892,13 +892,24 @@ bool DataReaderHistory::update_instance_nts(
return ret;
}

void DataReaderHistory::writer_not_alive(
bool DataReaderHistory::writer_not_alive(
const GUID_t& writer_guid)
{
bool ret_val = false;

for (auto& it : instances_)
{
bool had_notification_sample = it.second->has_state_notification_sample;
it.second->writer_removed(counters_, writer_guid);
if (it.second->has_state_notification_sample && !had_notification_sample)
{
// Mark instance as data available
data_available_instances_[it.first] = it.second;
ret_val = true;
}
}

return ret_val;
}

StateFilter DataReaderHistory::get_mask_status() const noexcept
Expand Down
10 changes: 9 additions & 1 deletion src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,15 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory
bool update_instance_nts(
CacheChange_t* const change);

void writer_not_alive(
/*!
* @brief Inform the history that a writer should be considered as not alive.
*
* @param [in] writer_guid GUID of the writer that should be considered as not alive.
*
* @return true if a state notification sample was added to at least one instance.
* This would mean that DATA_AVAILABLE status (and listener) shall be notified.
*/
bool writer_not_alive(
const fastrtps::rtps::GUID_t& writer_guid);

void check_and_remove_instance(
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct DataReaderInstance
int32_t disposed_generation_count = 0;
//! Current no_writers generation of the instance
int32_t no_writers_generation_count = 0;
//! Whether the instance has a state notification sample available
bool has_state_notification_sample = false;

DataReaderInstance(
const eprosima::fastrtps::ResourceLimitedContainerConfig& changes_allocation,
Expand Down Expand Up @@ -132,6 +134,7 @@ struct DataReaderInstance
break;
}

has_state_notification_sample = false;
return ret_val;
}

Expand All @@ -157,6 +160,7 @@ struct DataReaderInstance
if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state))
{
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
has_state_notification_sample = true;
}
if (ALIVE_INSTANCE_STATE == instance_state)
{
Expand Down Expand Up @@ -291,6 +295,7 @@ struct DataReaderInstance
{
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
counters_update(counters.instances_alive, counters.instances_no_writers, counters, false);
has_state_notification_sample = true;
}

ret_val = true;
Expand Down
20 changes: 17 additions & 3 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <Windows.h>
#endif // _MSC_VER

#include <fastdds/dds/common/InstanceHandle.hpp>
#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <fastdds/dds/core/condition/StatusCondition.hpp>
#include <fastdds/dds/core/condition/WaitSet.hpp>
Expand Down Expand Up @@ -1737,7 +1738,10 @@ class PubSubReader

if (ReturnCode_t::RETCODE_OK == datareader_->take(data_seq, info_seq))
{
current_processed_count_++;
if (info_seq[0].publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
{
current_processed_count_++;
}
return true;
}
return false;
Expand All @@ -1749,7 +1753,10 @@ class PubSubReader
eprosima::fastdds::dds::SampleInfo dds_info;
if (datareader_->take_next_sample(data, &dds_info) == ReturnCode_t::RETCODE_OK)
{
current_processed_count_++;
if (dds_info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
{
current_processed_count_++;
}
return true;
}
return false;
Expand Down Expand Up @@ -1959,7 +1966,8 @@ class PubSubReader
ReturnCode_t success = take_ ?
datareader->take_next_sample(data, &info) :
datareader->read_next_sample(data, &info);
if (ReturnCode_t::RETCODE_OK == success)
if ((ReturnCode_t::RETCODE_OK == success) &&
(info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL))
{
returnedValue = true;

Expand Down Expand Up @@ -2021,6 +2029,12 @@ class PubSubReader
type& data = datas[i];
eprosima::fastdds::dds::SampleInfo& info = infos[i];

// Skip unknown samples
if (info.publication_handle == eprosima::fastdds::dds::HANDLE_NIL)
{
continue;
}

// Check order of changes.
LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() };
ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number());
Expand Down
Loading
Loading