Skip to content

Commit 2dd4e92

Browse files
committed
Clarify interactions between count_unread_samples() and take_next()
Signed-off-by: Andrea Sorbini <[email protected]>
1 parent 679173b commit 2dd4e92

File tree

3 files changed

+50
-23
lines changed

3 files changed

+50
-23
lines changed

rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,30 @@ class RMW_Connext_Subscriber
453453
return has_data;
454454
}
455455

456+
rmw_ret_t
457+
count_unread_samples(size_t & unread_count)
458+
{
459+
// The action of counting unread samples is not currently mutually exclusive
460+
// with the action of taking samples out of the reader cache. Unfortunately
461+
// we cannot use a mutex to synchronize calls between
462+
// count_unread_samples() and and take_next() because we would run the risk
463+
// of a deadlock. This is because count_unread_samples() is supposed to be
464+
// called from within the reader's "exclusive area" (since it is
465+
// executed within a listener callback), while take_next() is usually called
466+
// from an executor/application thread and it must acquire the reader's
467+
// "exclusive area" before taking samples.
468+
// This might mean that an application which relies on data callbacks and the
469+
// count provided by this function, and which at the same time polls data
470+
// on the subscription, might end up missing some samples in the total count
471+
// notified to it via callback because the samples were taken out of the
472+
// cache before they could be notified to the listener.
473+
// Fortunately, in Connext the listener callback is notified right after a
474+
// sample is added to the reader cache and before any mutex is released,
475+
// which should allow this function to always report a correct number, as
476+
// long as it is only called within a (DDS) listener callback.
477+
return rmw_connextdds_count_unread_samples(this, unread_count);
478+
}
479+
456480
DDS_Subscriber * dds_subscriber() const
457481
{
458482
return DDS_DataReader_get_subscriber(this->dds_reader);

rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class RMW_Connext_Condition
123123

124124
template<typename FunctorT, typename FunctorA>
125125
void
126-
perform_action_and_update_state(FunctorT && update_condition, FunctorA && action)
126+
perform_action_and_update_state(FunctorA && action, FunctorT && update_condition)
127127
{
128128
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);
129129

@@ -782,28 +782,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
782782
}
783783
}
784784

785-
void notify_new_data()
786-
{
787-
size_t unread_samples = 0;
788-
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
789-
perform_action_and_update_state(
790-
[this, &unread_samples]() {
791-
if (unread_samples == 0) {
792-
return;
793-
}
794-
if (new_data_event_cb_) {
795-
new_data_event_cb_(data_event_user_data_, unread_samples);
796-
} else {
797-
unread_data_events_count_ += unread_samples;
798-
}
799-
},
800-
[this, &unread_samples]() {
801-
const rmw_ret_t rc = rmw_connextdds_count_unread_samples(this->sub, unread_samples);
802-
if (RMW_RET_OK != rc) {
803-
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
804-
}
805-
});
806-
}
785+
void notify_new_data();
807786

808787
protected:
809788
void update_status_deadline(

rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,30 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
750750
this->status_sample_lost_last.total_count;
751751
}
752752

753+
void
754+
RMW_Connext_SubscriberStatusCondition::notify_new_data()
755+
{
756+
size_t unread_samples = 0;
757+
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
758+
perform_action_and_update_state(
759+
[this, &unread_samples]() {
760+
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples);
761+
if (RMW_RET_OK != rc) {
762+
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
763+
}
764+
},
765+
[this, &unread_samples]() {
766+
if (unread_samples == 0) {
767+
return;
768+
}
769+
if (new_data_event_cb_) {
770+
new_data_event_cb_(data_event_user_data_, unread_samples);
771+
} else {
772+
unread_data_events_count_ += unread_samples;
773+
}
774+
});
775+
}
776+
753777
rmw_ret_t
754778
RMW_Connext_PublisherStatusCondition::install(
755779
RMW_Connext_Publisher * const pub)

0 commit comments

Comments
 (0)