Skip to content

Commit 831e8b9

Browse files
trubio-rtiasorbini
andauthored
Backport rmw callbacks implementation to Humble [ros2-73] (#157)
* backport: Add support for listener callbacks (#76) (d4330cc) * Add support for listener callbacks. * Fix wrong debug assertion when converting DDS_Duration values * Clarify interactions between count_unread_samples() and take_next() * Notify on changed matched status Signed-off-by: Christopher Wecht <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]> * backport: Conditional internal API access to support Connext 7+ (#121) (afa5055) Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]> * style: Fixed header inclusion order Signed-off-by: Taxo Rubio <[email protected]> --------- Signed-off-by: Christopher Wecht <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]> Co-authored-by: Andrea Sorbini <[email protected]>
1 parent 94bde75 commit 831e8b9

File tree

8 files changed

+335
-43
lines changed

8 files changed

+335
-43
lines changed

rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ rmw_ret_t
138138
rmw_connextdds_take_samples(
139139
RMW_Connext_Subscriber * const sub);
140140

141+
rmw_ret_t
142+
rmw_connextdds_count_unread_samples(
143+
RMW_Connext_Subscriber * const sub,
144+
size_t & unread_count);
145+
141146
rmw_ret_t
142147
rmw_connextdds_return_samples(
143148
RMW_Connext_Subscriber * const sub);

rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ RMW_CONNEXTDDS_PUBLIC
9292
rmw_ret_t
9393
rmw_api_connextdds_event_set_callback(
9494
rmw_event_t * event,
95-
rmw_event_callback_t callback,
96-
const void * user_data);
95+
const rmw_event_callback_t callback,
96+
const void * const user_data);
9797

9898
/*****************************************************************************
9999
* Info API
@@ -437,15 +437,15 @@ RMW_CONNEXTDDS_PUBLIC
437437
rmw_ret_t
438438
rmw_api_connextdds_service_set_on_new_request_callback(
439439
rmw_service_t * rmw_service,
440-
rmw_event_callback_t callback,
441-
const void * user_data);
440+
const rmw_event_callback_t callback,
441+
const void * const user_data);
442442

443443
RMW_CONNEXTDDS_PUBLIC
444444
rmw_ret_t
445445
rmw_api_connextdds_client_set_on_new_response_callback(
446446
rmw_client_t * rmw_client,
447-
rmw_event_callback_t callback,
448-
const void * user_data);
447+
const rmw_event_callback_t callback,
448+
const void * const user_data);
449449

450450
/*****************************************************************************
451451
* Subscription API
@@ -574,9 +574,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
574574
RMW_CONNEXTDDS_PUBLIC
575575
rmw_ret_t
576576
rmw_api_connextdds_subscription_set_on_new_message_callback(
577-
rmw_subscription_t * rmw_subscription,
578-
rmw_event_callback_t callback,
579-
const void * user_data);
577+
rmw_subscription_t * const rmw_subscription,
578+
const rmw_event_callback_t callback,
579+
const void * const user_data);
580580

581581
/*****************************************************************************
582582
* WaitSet API

rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,30 @@ class RMW_Connext_Subscriber
462462
return has_data;
463463
}
464464

465+
rmw_ret_t
466+
count_unread_samples(size_t & unread_count)
467+
{
468+
// The action of counting unread samples is not currently mutually exclusive
469+
// with the action of taking samples out of the reader cache. Unfortunately
470+
// we cannot use a mutex to synchronize calls between
471+
// count_unread_samples() and and take_next() because we would run the risk
472+
// of a deadlock. This is because count_unread_samples() is supposed to be
473+
// called from within the reader's "exclusive area" (since it is
474+
// executed within a listener callback), while take_next() is usually called
475+
// from an executor/application thread and it must acquire the reader's
476+
// "exclusive area" before taking samples.
477+
// This might mean that an application which relies on data callbacks and the
478+
// count provided by this function, and which at the same time polls data
479+
// on the subscription, might end up missing some samples in the total count
480+
// notified to it via callback because the samples were taken out of the
481+
// cache before they could be notified to the listener.
482+
// Fortunately, in Connext the listener callback is notified right after a
483+
// sample is added to the reader cache and before any mutex is released,
484+
// which should allow this function to always report a correct number, as
485+
// long as it is only called within a (DDS) listener callback.
486+
return rmw_connextdds_count_unread_samples(this, unread_count);
487+
}
488+
465489
DDS_Subscriber * dds_subscriber() const
466490
{
467491
return DDS_DataReader_get_subscriber(this->dds_reader);

rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_
1616
#define RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_
1717

18+
#include <condition_variable>
1819
#include "rmw_connextdds/context.hpp"
1920

2021
/******************************************************************************
@@ -121,6 +122,22 @@ class RMW_Connext_Condition
121122
}
122123
}
123124

125+
template<typename FunctorT, typename FunctorA>
126+
void
127+
perform_action_and_update_state(FunctorA && action, FunctorT && update_condition)
128+
{
129+
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);
130+
131+
action();
132+
133+
if (nullptr != this->waitset_mutex) {
134+
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
135+
update_condition();
136+
} else {
137+
update_condition();
138+
}
139+
}
140+
124141
protected:
125142
std::mutex mutex_internal;
126143
std::mutex * waitset_mutex;
@@ -333,8 +350,55 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
333350
virtual bool
334351
has_status(const rmw_event_type_t event_type) = 0;
335352

353+
void
354+
notify_new_event(rmw_event_type_t event_type)
355+
{
356+
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
357+
if (new_event_cb_[event_type]) {
358+
new_event_cb_[event_type](user_data_[event_type], 1);
359+
} else {
360+
unread_events_count_[event_type]++;
361+
}
362+
}
363+
364+
void
365+
set_new_event_callback(
366+
rmw_event_type_t event_type,
367+
rmw_event_callback_t callback,
368+
const void * user_data)
369+
{
370+
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
371+
372+
if (callback) {
373+
// Push events arrived before setting the executor's callback
374+
if (unread_events_count_[event_type] > 0) {
375+
callback(user_data, unread_events_count_[event_type]);
376+
unread_events_count_[event_type] = 0;
377+
}
378+
user_data_[event_type] = user_data;
379+
new_event_cb_[event_type] = callback;
380+
} else {
381+
user_data_[event_type] = nullptr;
382+
new_event_cb_[event_type] = nullptr;
383+
}
384+
}
385+
386+
void
387+
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);
388+
389+
void
390+
update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);
391+
336392
protected:
337393
DDS_StatusCondition * scond;
394+
std::mutex new_event_mutex_;
395+
rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {};
396+
const void * user_data_[RMW_EVENT_INVALID] = {};
397+
uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0};
398+
399+
bool triggered_inconsistent_topic{false};
400+
401+
struct DDS_InconsistentTopicStatus status_inconsistent_topic;
338402
};
339403

340404
void
@@ -712,6 +776,26 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
712776
return RMW_RET_OK;
713777
}
714778

779+
void set_on_new_data_callback(
780+
const rmw_event_callback_t callback,
781+
const void * const user_data)
782+
{
783+
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
784+
if (callback) {
785+
if (unread_data_events_count_ > 0) {
786+
callback(user_data, unread_data_events_count_);
787+
unread_data_events_count_ = 0;
788+
}
789+
new_data_event_cb_ = callback;
790+
data_event_user_data_ = user_data;
791+
} else {
792+
new_data_event_cb_ = nullptr;
793+
data_event_user_data_ = nullptr;
794+
}
795+
}
796+
797+
void notify_new_data();
798+
715799
protected:
716800
void update_status_deadline(
717801
const DDS_RequestedDeadlineMissedStatus * const status);
@@ -745,6 +829,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
745829

746830
RMW_Connext_Subscriber * sub;
747831

832+
std::mutex new_data_event_mutex_;
833+
rmw_event_callback_t new_data_event_cb_{nullptr};
834+
const void * data_event_user_data_{nullptr};
835+
uint64_t unread_data_events_count_ = 0;
836+
748837
friend class RMW_Connext_WaitSet;
749838
};
750839

rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ RMW_Connext_DataReaderListener_on_data_available(
121121

122122
UNUSED_ARG(reader);
123123

124+
self->notify_new_data();
124125
self->set_data_available(true);
125126
}
126127

@@ -706,6 +707,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(
706707

707708
this->status_deadline.total_count_change = this->status_deadline.total_count;
708709
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;
710+
711+
this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
709712
}
710713

711714
void
@@ -720,6 +723,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
720723
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
721724
this->status_liveliness.not_alive_count_change -=
722725
this->status_liveliness_last.not_alive_count;
726+
727+
this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
723728
}
724729

725730
void
@@ -731,6 +736,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos(
731736

732737
this->status_qos.total_count_change = this->status_qos.total_count;
733738
this->status_qos.total_count_change -= this->status_qos_last.total_count;
739+
this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
734740
}
735741

736742
void
@@ -743,6 +749,31 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
743749
this->status_sample_lost.total_count_change = this->status_sample_lost.total_count;
744750
this->status_sample_lost.total_count_change -=
745751
this->status_sample_lost_last.total_count;
752+
this->notify_new_event(RMW_EVENT_MESSAGE_LOST);
753+
}
754+
755+
void
756+
RMW_Connext_SubscriberStatusCondition::notify_new_data()
757+
{
758+
size_t unread_samples = 0;
759+
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
760+
perform_action_and_update_state(
761+
[this, &unread_samples]() {
762+
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples);
763+
if (RMW_RET_OK != rc) {
764+
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
765+
}
766+
},
767+
[this, &unread_samples]() {
768+
if (unread_samples == 0) {
769+
return;
770+
}
771+
if (new_data_event_cb_) {
772+
new_data_event_cb_(data_event_user_data_, unread_samples);
773+
} else {
774+
unread_data_events_count_ += unread_samples;
775+
}
776+
});
746777
}
747778

748779
rmw_ret_t
@@ -852,6 +883,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(
852883

853884
this->status_deadline.total_count_change = this->status_deadline.total_count;
854885
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;
886+
887+
this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED);
855888
}
856889

857890
void
@@ -863,6 +896,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(
863896

864897
this->status_liveliness.total_count_change = this->status_liveliness.total_count;
865898
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;
899+
900+
this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
866901
}
867902

868903
void
@@ -874,4 +909,6 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(
874909

875910
this->status_qos.total_count_change = this->status_qos.total_count;
876911
this->status_qos.total_count_change -= this->status_qos_last.total_count;
912+
913+
this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE);
877914
}

0 commit comments

Comments
 (0)