Skip to content

Commit 0b18cfe

Browse files
Return sample notifying changes on instance state (#5943) (#5956)
* Refs #22929. Add regression blackbox test. * Refs #22929. Improve test with listener. * Refs #22929. Add `has_fake_sample` to `DataReaderInstance`. * Refs #22929. Manage state of `has_fake_sample`. * Refs #22929. Handle `has_fake_sample` on `ReadTakeCommand`. * Refs #22929. Avoid removing instances with fake sample. * Refs #22929. Add instances with fake sample to `data_available_instances_`. * Refs #22929. Method `writer_not_alive` returns whether a fake sample was added. * Refs #22929. Add fake sample independently of remaining samples * Refs #22929. Notify data_available when adding a fake sample. * Refs #22929. Remove shortcut for returning NO_DATA. * Refs #22929. Avoid processing fake samples in PubSubReader. * Refs #22929. Avoid processing fake samples in DDSBlackboxTestsMonitorService. * Refs #22929. Fix DataReaderTests. * Refs #22929. Fix Latency tests. * Refs #22929. Rename `fake` into `state notification`. * Refs #22929. Use HANDLE_NIL for publication handle. * Refs #22929. Use NIL publication handle to discard samples in tests. * Refs #22929. Use scoped name for HANDLE_NIL. --------- (cherry picked from commit 16b7477) Signed-off-by: Miguel Company <[email protected]> Co-authored-by: Miguel Company <[email protected]>
1 parent a044f65 commit 0b18cfe

File tree

11 files changed

+524
-229
lines changed

11 files changed

+524
-229
lines changed

src/cpp/fastdds/subscriber/DataReaderImpl.cpp

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,28 @@ bool DataReaderImpl::wait_for_unread_message(
388388
void DataReaderImpl::set_read_communication_status(
389389
bool trigger_value)
390390
{
391+
if (trigger_value)
392+
{
393+
auto user_reader = user_datareader_;
394+
395+
//First check if we can handle with on_data_on_readers
396+
SubscriberListener* subscriber_listener =
397+
subscriber_->get_listener_for(StatusMask::data_on_readers());
398+
if (subscriber_listener != nullptr)
399+
{
400+
subscriber_listener->on_data_on_readers(subscriber_->user_subscriber_);
401+
}
402+
else
403+
{
404+
// If not, try with on_data_available
405+
DataReaderListener* listener = get_listener_for(StatusMask::data_available());
406+
if (listener != nullptr)
407+
{
408+
listener->on_data_available(user_reader);
409+
}
410+
}
411+
}
412+
391413
StatusMask notify_status = StatusMask::data_on_readers();
392414
subscriber_->user_subscriber_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value);
393415

@@ -719,11 +741,6 @@ ReturnCode_t DataReaderImpl::read_or_take_next_sample(
719741
return RETCODE_NOT_ENABLED;
720742
}
721743

722-
if (history_.getHistorySize() == 0)
723-
{
724-
return RETCODE_NO_DATA;
725-
}
726-
727744
#if HAVE_STRICT_REALTIME
728745
auto max_blocking_time = std::chrono::steady_clock::now() +
729746
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
@@ -918,25 +935,6 @@ void DataReaderImpl::InnerDataReaderListener::on_data_available(
918935

919936
if (data_reader_->on_data_available(writer_guid, first_sequence, last_sequence))
920937
{
921-
auto user_reader = data_reader_->user_datareader_;
922-
923-
//First check if we can handle with on_data_on_readers
924-
SubscriberListener* subscriber_listener =
925-
data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers());
926-
if (subscriber_listener != nullptr)
927-
{
928-
subscriber_listener->on_data_on_readers(data_reader_->subscriber_->user_subscriber_);
929-
}
930-
else
931-
{
932-
// If not, try with on_data_available
933-
DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::data_available());
934-
if (listener != nullptr)
935-
{
936-
listener->on_data_available(user_reader);
937-
}
938-
}
939-
940938
data_reader_->set_read_communication_status(true);
941939
}
942940
}
@@ -1176,7 +1174,10 @@ void DataReaderImpl::update_subscription_matched_status(
11761174

11771175
if (count_change < 0)
11781176
{
1179-
history_.writer_not_alive(status.remoteEndpointGuid);
1177+
if (history_.writer_not_alive(status.remoteEndpointGuid))
1178+
{
1179+
set_read_communication_status(true);
1180+
}
11801181
try_notify_read_conditions();
11811182
}
11821183
}
@@ -1491,7 +1492,10 @@ LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
14911492
{
14921493
if (0 < status.not_alive_count_change)
14931494
{
1494-
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
1495+
if (history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)))
1496+
{
1497+
set_read_communication_status(true);
1498+
}
14951499
try_notify_read_conditions();
14961500
}
14971501

src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ struct ReadTakeCommand
180180
++it;
181181
}
182182

183+
// Check if there is a state notification sample available
184+
if (!finished_ && instance_->second->has_state_notification_sample)
185+
{
186+
// Add sample and info to collections
187+
bool deserialization_error = false;
188+
bool added = add_sample(nullptr, deserialization_error);
189+
if (added && take_samples)
190+
{
191+
instance_->second->has_state_notification_sample = false;
192+
}
193+
}
194+
183195
if (current_slot_ > first_slot)
184196
{
185197
history_.instance_viewed_nts(instance_->second);
@@ -394,7 +406,34 @@ struct ReadTakeCommand
394406
}
395407

396408
SampleInfo& info = sample_infos_[current_slot_];
397-
generate_info(info, *instance_->second, item);
409+
const DataReaderInstance& instance = *instance_->second;
410+
if (item)
411+
{
412+
generate_info(info, instance, item);
413+
}
414+
else
415+
{
416+
fastdds::rtps::Time_t current_time;
417+
fastdds::rtps::Time_t::now(current_time);
418+
419+
info.sample_state = NOT_READ_SAMPLE_STATE;
420+
info.instance_state = instance.instance_state;
421+
info.view_state = instance.view_state;
422+
info.disposed_generation_count = instance.disposed_generation_count;
423+
info.no_writers_generation_count = instance.no_writers_generation_count;
424+
info.sample_rank = 0;
425+
info.generation_rank = 0;
426+
info.absolute_generation_rank = 0;
427+
info.source_timestamp = current_time;
428+
info.reception_timestamp = current_time;
429+
info.instance_handle = instance_->first;
430+
info.publication_handle = HANDLE_NIL;
431+
432+
info.sample_identity = rtps::SampleIdentity{};
433+
info.related_sample_identity = rtps::SampleIdentity{};
434+
435+
info.valid_data = false;
436+
}
398437
}
399438

400439
bool check_datasharing_validity(

src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ void DataReaderHistory::check_and_remove_instance(
661661
{
662662
DataReaderInstance* instance = instance_info->second.get();
663663

664-
if (instance->cache_changes.empty())
664+
if (instance->cache_changes.empty() && (false == instance->has_state_notification_sample))
665665
{
666666
if (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state &&
667667
instance->alive_writers.empty() &&
@@ -887,13 +887,24 @@ bool DataReaderHistory::update_instance_nts(
887887
return ret;
888888
}
889889

890-
void DataReaderHistory::writer_not_alive(
890+
bool DataReaderHistory::writer_not_alive(
891891
const GUID_t& writer_guid)
892892
{
893+
bool ret_val = false;
894+
893895
for (auto& it : instances_)
894896
{
897+
bool had_notification_sample = it.second->has_state_notification_sample;
895898
it.second->writer_removed(counters_, writer_guid);
899+
if (it.second->has_state_notification_sample && !had_notification_sample)
900+
{
901+
// Mark instance as data available
902+
data_available_instances_[it.first] = it.second;
903+
ret_val = true;
904+
}
896905
}
906+
907+
return ret_val;
897908
}
898909

899910
StateFilter DataReaderHistory::get_mask_status() const noexcept

src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,15 @@ class DataReaderHistory : public eprosima::fastdds::rtps::ReaderHistory
349349
bool update_instance_nts(
350350
CacheChange_t* const change);
351351

352-
void writer_not_alive(
352+
/*!
353+
* @brief Inform the history that a writer should be considered as not alive.
354+
*
355+
* @param [in] writer_guid GUID of the writer that should be considered as not alive.
356+
*
357+
* @return true if a state notification sample was added to at least one instance.
358+
* This would mean that DATA_AVAILABLE status (and listener) shall be notified.
359+
*/
360+
bool writer_not_alive(
353361
const fastdds::rtps::GUID_t& writer_guid);
354362

355363
void check_and_remove_instance(

src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ struct DataReaderInstance
5858
int32_t disposed_generation_count = 0;
5959
//! Current no_writers generation of the instance
6060
int32_t no_writers_generation_count = 0;
61+
//! Whether the instance has a state notification sample available
62+
bool has_state_notification_sample = false;
6163

6264
DataReaderInstance(
6365
const eprosima::fastdds::ResourceLimitedContainerConfig& changes_allocation,
@@ -132,6 +134,7 @@ struct DataReaderInstance
132134
break;
133135
}
134136

137+
has_state_notification_sample = false;
135138
return ret_val;
136139
}
137140

@@ -157,6 +160,7 @@ struct DataReaderInstance
157160
if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state))
158161
{
159162
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
163+
has_state_notification_sample = true;
160164
}
161165
if (ALIVE_INSTANCE_STATE == instance_state)
162166
{
@@ -291,6 +295,7 @@ struct DataReaderInstance
291295
{
292296
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
293297
counters_update(counters.instances_alive, counters.instances_no_writers, counters, false);
298+
has_state_notification_sample = true;
294299
}
295300

296301
ret_val = true;

test/blackbox/api/dds-pim/PubSubReader.hpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <Windows.h>
3636
#endif // _MSC_VER
3737
#include <fastdds/dds/builtin/topic/ParticipantBuiltinTopicData.hpp>
38+
#include <fastdds/dds/common/InstanceHandle.hpp>
3839
#include <fastdds/dds/core/condition/GuardCondition.hpp>
3940
#include <fastdds/dds/core/condition/StatusCondition.hpp>
4041
#include <fastdds/dds/core/condition/WaitSet.hpp>
@@ -1785,7 +1786,10 @@ class PubSubReader
17851786

17861787
if (eprosima::fastdds::dds::RETCODE_OK == datareader_->take(data_seq, info_seq))
17871788
{
1788-
current_processed_count_++;
1789+
if (info_seq[0].publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
1790+
{
1791+
current_processed_count_++;
1792+
}
17891793
return true;
17901794
}
17911795
return false;
@@ -1797,7 +1801,10 @@ class PubSubReader
17971801
eprosima::fastdds::dds::SampleInfo dds_info;
17981802
if (datareader_->take_next_sample(data, &dds_info) == eprosima::fastdds::dds::RETCODE_OK)
17991803
{
1800-
current_processed_count_++;
1804+
if (dds_info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
1805+
{
1806+
current_processed_count_++;
1807+
}
18011808
return true;
18021809
}
18031810
return false;
@@ -2017,7 +2024,8 @@ class PubSubReader
20172024
ReturnCode_t success = take_ ?
20182025
datareader->take_next_sample((void*)&data, &info) :
20192026
datareader->read_next_sample((void*)&data, &info);
2020-
if (eprosima::fastdds::dds::RETCODE_OK == success)
2027+
if ((eprosima::fastdds::dds::RETCODE_OK == success) &&
2028+
(info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL))
20212029
{
20222030
returnedValue = true;
20232031

@@ -2071,6 +2079,12 @@ class PubSubReader
20712079
type& data = datas[i];
20722080
eprosima::fastdds::dds::SampleInfo& info = infos[i];
20732081

2082+
// Skip unknown samples
2083+
if (info.publication_handle == eprosima::fastdds::dds::HANDLE_NIL)
2084+
{
2085+
continue;
2086+
}
2087+
20742088
// Check order of changes.
20752089
LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() };
20762090
ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number());

0 commit comments

Comments
 (0)