Skip to content

Commit d4330cc

Browse files
authored
Add support for listener callbacks (#76)
* Add support for listener callbacks. * Fix wrong debug assertion when converting DDS_Duration values * Clarify interactions between count_unread_samples() and take_next() * Notify on changed matched status Signed-off-by: Christopher Wecht <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]>
1 parent 9e67b64 commit d4330cc

File tree

8 files changed

+307
-41
lines changed

8 files changed

+307
-41
lines changed

rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ rmw_ret_t
138138
rmw_connextdds_take_samples(
139139
RMW_Connext_Subscriber * const sub);
140140

141+
rmw_ret_t
142+
rmw_connextdds_count_unread_samples(
143+
RMW_Connext_Subscriber * const sub,
144+
size_t & unread_count);
145+
141146
rmw_ret_t
142147
rmw_connextdds_return_samples(
143148
RMW_Connext_Subscriber * const sub);

rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ RMW_CONNEXTDDS_PUBLIC
9292
rmw_ret_t
9393
rmw_api_connextdds_event_set_callback(
9494
rmw_event_t * event,
95-
rmw_event_callback_t callback,
96-
const void * user_data);
95+
const rmw_event_callback_t callback,
96+
const void * const user_data);
9797

9898
/*****************************************************************************
9999
* Info API
@@ -443,15 +443,15 @@ RMW_CONNEXTDDS_PUBLIC
443443
rmw_ret_t
444444
rmw_api_connextdds_service_set_on_new_request_callback(
445445
rmw_service_t * rmw_service,
446-
rmw_event_callback_t callback,
447-
const void * user_data);
446+
const rmw_event_callback_t callback,
447+
const void * const user_data);
448448

449449
RMW_CONNEXTDDS_PUBLIC
450450
rmw_ret_t
451451
rmw_api_connextdds_client_set_on_new_response_callback(
452452
rmw_client_t * rmw_client,
453-
rmw_event_callback_t callback,
454-
const void * user_data);
453+
const rmw_event_callback_t callback,
454+
const void * const user_data);
455455

456456
/*****************************************************************************
457457
* Subscription API
@@ -580,9 +580,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
580580
RMW_CONNEXTDDS_PUBLIC
581581
rmw_ret_t
582582
rmw_api_connextdds_subscription_set_on_new_message_callback(
583-
rmw_subscription_t * rmw_subscription,
584-
rmw_event_callback_t callback,
585-
const void * user_data);
583+
rmw_subscription_t * const rmw_subscription,
584+
const rmw_event_callback_t callback,
585+
const void * const user_data);
586586

587587
/*****************************************************************************
588588
* WaitSet API

rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,30 @@ class RMW_Connext_Subscriber
462462
return has_data;
463463
}
464464

465+
rmw_ret_t
466+
count_unread_samples(size_t & unread_count)
467+
{
468+
// The action of counting unread samples is not currently mutually exclusive
469+
// with the action of taking samples out of the reader cache. Unfortunately
470+
// we cannot use a mutex to synchronize calls between
471+
// count_unread_samples() and and take_next() because we would run the risk
472+
// of a deadlock. This is because count_unread_samples() is supposed to be
473+
// called from within the reader's "exclusive area" (since it is
474+
// executed within a listener callback), while take_next() is usually called
475+
// from an executor/application thread and it must acquire the reader's
476+
// "exclusive area" before taking samples.
477+
// This might mean that an application which relies on data callbacks and the
478+
// count provided by this function, and which at the same time polls data
479+
// on the subscription, might end up missing some samples in the total count
480+
// notified to it via callback because the samples were taken out of the
481+
// cache before they could be notified to the listener.
482+
// Fortunately, in Connext the listener callback is notified right after a
483+
// sample is added to the reader cache and before any mutex is released,
484+
// which should allow this function to always report a correct number, as
485+
// long as it is only called within a (DDS) listener callback.
486+
return rmw_connextdds_count_unread_samples(this, unread_count);
487+
}
488+
465489
DDS_Subscriber * dds_subscriber() const
466490
{
467491
return DDS_DataReader_get_subscriber(this->dds_reader);

rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,22 @@ class RMW_Connext_Condition
121121
}
122122
}
123123

124+
template<typename FunctorT, typename FunctorA>
125+
void
126+
perform_action_and_update_state(FunctorA && action, FunctorT && update_condition)
127+
{
128+
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);
129+
130+
action();
131+
132+
if (nullptr != this->waitset_mutex) {
133+
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
134+
update_condition();
135+
} else {
136+
update_condition();
137+
}
138+
}
139+
124140
protected:
125141
std::mutex mutex_internal;
126142
std::mutex * waitset_mutex;
@@ -334,6 +350,39 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
334350
virtual bool
335351
has_status(const rmw_event_type_t event_type) = 0;
336352

353+
void
354+
notify_new_event(rmw_event_type_t event_type)
355+
{
356+
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
357+
if (new_event_cb_[event_type]) {
358+
new_event_cb_[event_type](user_data_[event_type], 1);
359+
} else {
360+
unread_events_count_[event_type]++;
361+
}
362+
}
363+
364+
void
365+
set_new_event_callback(
366+
rmw_event_type_t event_type,
367+
rmw_event_callback_t callback,
368+
const void * user_data)
369+
{
370+
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
371+
372+
if (callback) {
373+
// Push events arrived before setting the executor's callback
374+
if (unread_events_count_[event_type] > 0) {
375+
callback(user_data, unread_events_count_[event_type]);
376+
unread_events_count_[event_type] = 0;
377+
}
378+
user_data_[event_type] = user_data;
379+
new_event_cb_[event_type] = callback;
380+
} else {
381+
user_data_[event_type] = nullptr;
382+
new_event_cb_[event_type] = nullptr;
383+
}
384+
}
385+
337386
void
338387
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);
339388

@@ -358,6 +407,10 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
358407

359408
protected:
360409
DDS_StatusCondition * scond;
410+
std::mutex new_event_mutex_;
411+
rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {};
412+
const void * user_data_[RMW_EVENT_INVALID] = {};
413+
uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0};
361414

362415
bool triggered_inconsistent_topic{false};
363416

@@ -779,6 +832,26 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
779832
return RMW_RET_OK;
780833
}
781834

835+
void set_on_new_data_callback(
836+
const rmw_event_callback_t callback,
837+
const void * const user_data)
838+
{
839+
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
840+
if (callback) {
841+
if (unread_data_events_count_ > 0) {
842+
callback(user_data, unread_data_events_count_);
843+
unread_data_events_count_ = 0;
844+
}
845+
new_data_event_cb_ = callback;
846+
data_event_user_data_ = user_data;
847+
} else {
848+
new_data_event_cb_ = nullptr;
849+
data_event_user_data_ = nullptr;
850+
}
851+
}
852+
853+
void notify_new_data();
854+
782855
inline rmw_ret_t
783856
get_matched_status(rmw_matched_status_t * const status)
784857
{
@@ -838,6 +911,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
838911

839912
RMW_Connext_Subscriber * sub;
840913

914+
std::mutex new_data_event_mutex_;
915+
rmw_event_callback_t new_data_event_cb_{nullptr};
916+
const void * data_event_user_data_{nullptr};
917+
uint64_t unread_data_events_count_ = 0;
918+
841919
friend class RMW_Connext_WaitSet;
842920
};
843921

rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ RMW_Connext_DataReaderListener_on_data_available(
135135

136136
UNUSED_ARG(reader);
137137

138+
self->notify_new_data();
138139
self->set_data_available(true);
139140
}
140141

@@ -799,6 +800,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(
799800

800801
this->status_deadline.total_count_change = this->status_deadline.total_count;
801802
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;
803+
804+
this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
802805
}
803806

804807
void
@@ -813,6 +816,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
813816
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
814817
this->status_liveliness.not_alive_count_change -=
815818
this->status_liveliness_last.not_alive_count;
819+
820+
this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
816821
}
817822

818823
void
@@ -824,6 +829,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos(
824829

825830
this->status_qos.total_count_change = this->status_qos.total_count;
826831
this->status_qos.total_count_change -= this->status_qos_last.total_count;
832+
this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
827833
}
828834

829835
void
@@ -836,6 +842,31 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
836842
this->status_sample_lost.total_count_change = this->status_sample_lost.total_count;
837843
this->status_sample_lost.total_count_change -=
838844
this->status_sample_lost_last.total_count;
845+
this->notify_new_event(RMW_EVENT_MESSAGE_LOST);
846+
}
847+
848+
void
849+
RMW_Connext_SubscriberStatusCondition::notify_new_data()
850+
{
851+
size_t unread_samples = 0;
852+
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
853+
perform_action_and_update_state(
854+
[this, &unread_samples]() {
855+
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples);
856+
if (RMW_RET_OK != rc) {
857+
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
858+
}
859+
},
860+
[this, &unread_samples]() {
861+
if (unread_samples == 0) {
862+
return;
863+
}
864+
if (new_data_event_cb_) {
865+
new_data_event_cb_(data_event_user_data_, unread_samples);
866+
} else {
867+
unread_data_events_count_ += unread_samples;
868+
}
869+
});
839870
}
840871

841872
void
@@ -849,6 +880,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_matched(
849880
this->status_matched.total_count - this->status_matched_last.total_count;
850881
this->status_matched.current_count_change =
851882
this->status_matched.current_count - this->status_matched_last.current_count;
883+
884+
this->notify_new_event(RMW_EVENT_SUBSCRIPTION_MATCHED);
852885
}
853886

854887
rmw_ret_t
@@ -998,6 +1031,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(
9981031

9991032
this->status_deadline.total_count_change = this->status_deadline.total_count;
10001033
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;
1034+
1035+
this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED);
10011036
}
10021037

10031038
void
@@ -1009,6 +1044,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(
10091044

10101045
this->status_liveliness.total_count_change = this->status_liveliness.total_count;
10111046
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;
1047+
1048+
this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
10121049
}
10131050

10141051
void
@@ -1020,6 +1057,8 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(
10201057

10211058
this->status_qos.total_count_change = this->status_qos.total_count;
10221059
this->status_qos.total_count_change -= this->status_qos_last.total_count;
1060+
1061+
this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE);
10231062
}
10241063

10251064
void
@@ -1033,4 +1072,6 @@ RMW_Connext_PublisherStatusCondition::update_status_matched(
10331072
this->status_matched.total_count - this->status_matched_last.total_count;
10341073
this->status_matched.current_count_change =
10351074
this->status_matched.current_count - this->status_matched_last.current_count;
1075+
1076+
this->notify_new_event(RMW_EVENT_PUBLICATION_MATCHED);
10361077
}

0 commit comments

Comments
 (0)