Skip to content

Commit 8fbb699

Browse files
MiguelCompanymergify[bot]
authored andcommitted
Return sample notifying changes on instance state (#5943)
* Refs #22929. Add regression blackbox test. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Improve test with listener. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Add `has_fake_sample` to `DataReaderInstance`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Manage state of `has_fake_sample`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Handle `has_fake_sample` on `ReadTakeCommand`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Avoid removing instances with fake sample. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Add instances with fake sample to `data_available_instances_`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Method `writer_not_alive` returns whether a fake sample was added. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Add fake sample independently of remaining samples Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Notify data_available when adding a fake sample. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Remove shortcut for returning NO_DATA. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Avoid processing fake samples in PubSubReader. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Avoid processing fake samples in DDSBlackboxTestsMonitorService. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Fix DataReaderTests. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Fix Latency tests. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Rename `fake` into `state notification`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Use HANDLE_NIL for publication handle. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Use NIL publication handle to discard samples in tests. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #22929. Use scoped name for HANDLE_NIL. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> --------- Signed-off-by: Miguel Company <miguelcompany@eprosima.com> (cherry picked from commit 16b7477) # Conflicts: # src/cpp/fastdds/subscriber/DataReaderImpl.cpp # src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp # test/blackbox/api/dds-pim/PubSubReader.hpp # test/blackbox/common/DDSBlackboxTestsMonitorService.cpp # test/performance/latency/LatencyTestPublisher.cpp # test/performance/latency/LatencyTestSubscriber.cpp
1 parent abf600c commit 8fbb699

File tree

11 files changed

+597
-86
lines changed

11 files changed

+597
-86
lines changed

src/cpp/fastdds/subscriber/DataReaderImpl.cpp

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,28 @@ bool DataReaderImpl::wait_for_unread_message(
387387
void DataReaderImpl::set_read_communication_status(
388388
bool trigger_value)
389389
{
390+
if (trigger_value)
391+
{
392+
auto user_reader = user_datareader_;
393+
394+
//First check if we can handle with on_data_on_readers
395+
SubscriberListener* subscriber_listener =
396+
subscriber_->get_listener_for(StatusMask::data_on_readers());
397+
if (subscriber_listener != nullptr)
398+
{
399+
subscriber_listener->on_data_on_readers(subscriber_->user_subscriber_);
400+
}
401+
else
402+
{
403+
// If not, try with on_data_available
404+
DataReaderListener* listener = get_listener_for(StatusMask::data_available());
405+
if (listener != nullptr)
406+
{
407+
listener->on_data_available(user_reader);
408+
}
409+
}
410+
}
411+
390412
StatusMask notify_status = StatusMask::data_on_readers();
391413
subscriber_->user_subscriber_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value);
392414

@@ -718,11 +740,14 @@ ReturnCode_t DataReaderImpl::read_or_take_next_sample(
718740
return ReturnCode_t::RETCODE_NOT_ENABLED;
719741
}
720742

743+
<<<<<<< HEAD
721744
if (history_.getHistorySize() == 0)
722745
{
723746
return ReturnCode_t::RETCODE_NO_DATA;
724747
}
725748

749+
=======
750+
>>>>>>> 16b7477b (Return sample notifying changes on instance state (#5943))
726751
#if HAVE_STRICT_REALTIME
727752
auto max_blocking_time = std::chrono::steady_clock::now() +
728753
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
@@ -917,25 +942,6 @@ void DataReaderImpl::InnerDataReaderListener::on_data_available(
917942

918943
if (data_reader_->on_data_available(writer_guid, first_sequence, last_sequence))
919944
{
920-
auto user_reader = data_reader_->user_datareader_;
921-
922-
//First check if we can handle with on_data_on_readers
923-
SubscriberListener* subscriber_listener =
924-
data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers());
925-
if (subscriber_listener != nullptr)
926-
{
927-
subscriber_listener->on_data_on_readers(data_reader_->subscriber_->user_subscriber_);
928-
}
929-
else
930-
{
931-
// If not, try with on_data_available
932-
DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::data_available());
933-
if (listener != nullptr)
934-
{
935-
listener->on_data_available(user_reader);
936-
}
937-
}
938-
939945
data_reader_->set_read_communication_status(true);
940946
}
941947
}
@@ -1175,7 +1181,14 @@ void DataReaderImpl::update_subscription_matched_status(
11751181

11761182
if (count_change < 0)
11771183
{
1184+
<<<<<<< HEAD
11781185
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
1186+
=======
1187+
if (history_.writer_not_alive(status.remoteEndpointGuid))
1188+
{
1189+
set_read_communication_status(true);
1190+
}
1191+
>>>>>>> 16b7477b (Return sample notifying changes on instance state (#5943))
11791192
try_notify_read_conditions();
11801193
}
11811194
}
@@ -1463,7 +1476,10 @@ LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
14631476
{
14641477
if (0 < status.not_alive_count_change)
14651478
{
1466-
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
1479+
if (history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)))
1480+
{
1481+
set_read_communication_status(true);
1482+
}
14671483
try_notify_read_conditions();
14681484
}
14691485

src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,18 @@ struct ReadTakeCommand
181181
++it;
182182
}
183183

184+
// Check if there is a state notification sample available
185+
if (!finished_ && instance_->second->has_state_notification_sample)
186+
{
187+
// Add sample and info to collections
188+
bool deserialization_error = false;
189+
bool added = add_sample(nullptr, deserialization_error);
190+
if (added && take_samples)
191+
{
192+
instance_->second->has_state_notification_sample = false;
193+
}
194+
}
195+
184196
if (current_slot_ > first_slot)
185197
{
186198
history_.instance_viewed_nts(instance_->second);
@@ -395,7 +407,34 @@ struct ReadTakeCommand
395407
}
396408

397409
SampleInfo& info = sample_infos_[current_slot_];
398-
generate_info(info, *instance_->second, item);
410+
const DataReaderInstance& instance = *instance_->second;
411+
if (item)
412+
{
413+
generate_info(info, instance, item);
414+
}
415+
else
416+
{
417+
fastdds::rtps::Time_t current_time;
418+
fastdds::rtps::Time_t::now(current_time);
419+
420+
info.sample_state = NOT_READ_SAMPLE_STATE;
421+
info.instance_state = instance.instance_state;
422+
info.view_state = instance.view_state;
423+
info.disposed_generation_count = instance.disposed_generation_count;
424+
info.no_writers_generation_count = instance.no_writers_generation_count;
425+
info.sample_rank = 0;
426+
info.generation_rank = 0;
427+
info.absolute_generation_rank = 0;
428+
info.source_timestamp = current_time;
429+
info.reception_timestamp = current_time;
430+
info.instance_handle = instance_->first;
431+
info.publication_handle = HANDLE_NIL;
432+
433+
info.sample_identity = rtps::SampleIdentity{};
434+
info.related_sample_identity = rtps::SampleIdentity{};
435+
436+
info.valid_data = false;
437+
}
399438
}
400439

401440
bool check_datasharing_validity(

src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ void DataReaderHistory::check_and_remove_instance(
666666
{
667667
DataReaderInstance* instance = instance_info->second.get();
668668

669-
if (instance->cache_changes.empty())
669+
if (instance->cache_changes.empty() && (false == instance->has_state_notification_sample))
670670
{
671671
if (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state &&
672672
instance->alive_writers.empty() &&
@@ -892,13 +892,24 @@ bool DataReaderHistory::update_instance_nts(
892892
return ret;
893893
}
894894

895-
void DataReaderHistory::writer_not_alive(
895+
bool DataReaderHistory::writer_not_alive(
896896
const GUID_t& writer_guid)
897897
{
898+
bool ret_val = false;
899+
898900
for (auto& it : instances_)
899901
{
902+
bool had_notification_sample = it.second->has_state_notification_sample;
900903
it.second->writer_removed(counters_, writer_guid);
904+
if (it.second->has_state_notification_sample && !had_notification_sample)
905+
{
906+
// Mark instance as data available
907+
data_available_instances_[it.first] = it.second;
908+
ret_val = true;
909+
}
901910
}
911+
912+
return ret_val;
902913
}
903914

904915
StateFilter DataReaderHistory::get_mask_status() const noexcept

src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,21 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory
348348
bool update_instance_nts(
349349
CacheChange_t* const change);
350350

351+
<<<<<<< HEAD
351352
void writer_not_alive(
352353
const fastrtps::rtps::GUID_t& writer_guid);
354+
=======
355+
/*!
356+
* @brief Inform the history that a writer should be considered as not alive.
357+
*
358+
* @param [in] writer_guid GUID of the writer that should be considered as not alive.
359+
*
360+
* @return true if a state notification sample was added to at least one instance.
361+
* This would mean that DATA_AVAILABLE status (and listener) shall be notified.
362+
*/
363+
bool writer_not_alive(
364+
const fastdds::rtps::GUID_t& writer_guid);
365+
>>>>>>> 16b7477b (Return sample notifying changes on instance state (#5943))
353366

354367
void check_and_remove_instance(
355368
instance_info& instance_info);

src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ struct DataReaderInstance
5858
int32_t disposed_generation_count = 0;
5959
//! Current no_writers generation of the instance
6060
int32_t no_writers_generation_count = 0;
61+
//! Whether the instance has a state notification sample available
62+
bool has_state_notification_sample = false;
6163

6264
DataReaderInstance(
6365
const eprosima::fastrtps::ResourceLimitedContainerConfig& changes_allocation,
@@ -132,6 +134,7 @@ struct DataReaderInstance
132134
break;
133135
}
134136

137+
has_state_notification_sample = false;
135138
return ret_val;
136139
}
137140

@@ -157,6 +160,7 @@ struct DataReaderInstance
157160
if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state))
158161
{
159162
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
163+
has_state_notification_sample = true;
160164
}
161165
if (ALIVE_INSTANCE_STATE == instance_state)
162166
{
@@ -291,6 +295,7 @@ struct DataReaderInstance
291295
{
292296
instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
293297
counters_update(counters.instances_alive, counters.instances_no_writers, counters, false);
298+
has_state_notification_sample = true;
294299
}
295300

296301
ret_val = true;

test/blackbox/api/dds-pim/PubSubReader.hpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@
3636
#if _MSC_VER
3737
#include <Windows.h>
3838
#endif // _MSC_VER
39+
<<<<<<< HEAD
3940

41+
=======
42+
#include <fastdds/dds/builtin/topic/ParticipantBuiltinTopicData.hpp>
43+
#include <fastdds/dds/common/InstanceHandle.hpp>
44+
>>>>>>> 16b7477b (Return sample notifying changes on instance state (#5943))
4045
#include <fastdds/dds/core/condition/GuardCondition.hpp>
4146
#include <fastdds/dds/core/condition/StatusCondition.hpp>
4247
#include <fastdds/dds/core/condition/WaitSet.hpp>
@@ -1737,7 +1742,10 @@ class PubSubReader
17371742

17381743
if (ReturnCode_t::RETCODE_OK == datareader_->take(data_seq, info_seq))
17391744
{
1740-
current_processed_count_++;
1745+
if (info_seq[0].publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
1746+
{
1747+
current_processed_count_++;
1748+
}
17411749
return true;
17421750
}
17431751
return false;
@@ -1749,7 +1757,10 @@ class PubSubReader
17491757
eprosima::fastdds::dds::SampleInfo dds_info;
17501758
if (datareader_->take_next_sample(data, &dds_info) == ReturnCode_t::RETCODE_OK)
17511759
{
1752-
current_processed_count_++;
1760+
if (dds_info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL)
1761+
{
1762+
current_processed_count_++;
1763+
}
17531764
return true;
17541765
}
17551766
return false;
@@ -1957,9 +1968,16 @@ class PubSubReader
19571968
eprosima::fastdds::dds::SampleInfo info;
19581969

19591970
ReturnCode_t success = take_ ?
1971+
<<<<<<< HEAD
19601972
datareader->take_next_sample(data, &info) :
19611973
datareader->read_next_sample(data, &info);
19621974
if (ReturnCode_t::RETCODE_OK == success)
1975+
=======
1976+
datareader->take_next_sample((void*)&data, &info) :
1977+
datareader->read_next_sample((void*)&data, &info);
1978+
if ((eprosima::fastdds::dds::RETCODE_OK == success) &&
1979+
(info.publication_handle != eprosima::fastdds::dds::HANDLE_NIL))
1980+
>>>>>>> 16b7477b (Return sample notifying changes on instance state (#5943))
19631981
{
19641982
returnedValue = true;
19651983

@@ -2021,6 +2039,12 @@ class PubSubReader
20212039
type& data = datas[i];
20222040
eprosima::fastdds::dds::SampleInfo& info = infos[i];
20232041

2042+
// Skip unknown samples
2043+
if (info.publication_handle == eprosima::fastdds::dds::HANDLE_NIL)
2044+
{
2045+
continue;
2046+
}
2047+
20242048
// Check order of changes.
20252049
LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() };
20262050
ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number());

0 commit comments

Comments
 (0)