Skip to content

Commit 87cee28

Browse files
committed
Add support for listener callbacks.
Signed-off-by: Andrea Sorbini <[email protected]>
1 parent 35a37aa commit 87cee28

File tree

7 files changed

+271
-41
lines changed

7 files changed

+271
-41
lines changed

rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ rmw_ret_t
132132
rmw_connextdds_take_samples(
133133
RMW_Connext_Subscriber * const sub);
134134

135+
rmw_ret_t
136+
rmw_connextdds_count_unread_samples(
137+
RMW_Connext_Subscriber * const sub,
138+
size_t & unread_count);
139+
135140
rmw_ret_t
136141
rmw_connextdds_return_samples(
137142
RMW_Connext_Subscriber * const sub);

rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ RMW_CONNEXTDDS_PUBLIC
9090
rmw_ret_t
9191
rmw_api_connextdds_event_set_callback(
9292
rmw_event_t * event,
93-
rmw_event_callback_t callback,
94-
const void * user_data);
93+
const rmw_event_callback_t callback,
94+
const void * const user_data);
9595

9696
/*****************************************************************************
9797
* Info API
@@ -435,15 +435,15 @@ RMW_CONNEXTDDS_PUBLIC
435435
rmw_ret_t
436436
rmw_api_connextdds_service_set_on_new_request_callback(
437437
rmw_service_t * rmw_service,
438-
rmw_event_callback_t callback,
439-
const void * user_data);
438+
const rmw_event_callback_t callback,
439+
const void * const user_data);
440440

441441
RMW_CONNEXTDDS_PUBLIC
442442
rmw_ret_t
443443
rmw_api_connextdds_client_set_on_new_response_callback(
444444
rmw_client_t * rmw_client,
445-
rmw_event_callback_t callback,
446-
const void * user_data);
445+
const rmw_event_callback_t callback,
446+
const void * const user_data);
447447

448448
/*****************************************************************************
449449
* Subscription API
@@ -560,9 +560,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
560560
RMW_CONNEXTDDS_PUBLIC
561561
rmw_ret_t
562562
rmw_api_connextdds_subscription_set_on_new_message_callback(
563-
rmw_subscription_t * rmw_subscription,
564-
rmw_event_callback_t callback,
565-
const void * user_data);
563+
rmw_subscription_t * const rmw_subscription,
564+
const rmw_event_callback_t callback,
565+
const void * const user_data);
566566

567567
/*****************************************************************************
568568
* WaitSet API

rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,22 @@ class RMW_Connext_Condition
121121
}
122122
}
123123

124+
template<typename FunctorT, typename FunctorA>
125+
void
126+
perform_action_and_update_state(FunctorT && update_condition, FunctorA && action)
127+
{
128+
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);
129+
130+
action();
131+
132+
if (nullptr != this->waitset_mutex) {
133+
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
134+
update_condition();
135+
} else {
136+
update_condition();
137+
}
138+
}
139+
124140
protected:
125141
std::mutex mutex_internal;
126142
std::mutex * waitset_mutex;
@@ -333,8 +349,44 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
333349
virtual bool
334350
has_status(const rmw_event_type_t event_type) = 0;
335351

352+
void
353+
notify_new_event()
354+
{
355+
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
356+
if (new_event_cb_) {
357+
new_event_cb_(user_data_, 1);
358+
} else {
359+
unread_events_count_++;
360+
}
361+
}
362+
363+
void
364+
set_new_event_callback(
365+
rmw_event_callback_t callback,
366+
const void * user_data)
367+
{
368+
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
369+
370+
if (callback) {
371+
// Push events arrived before setting the executor's callback
372+
if (unread_events_count_ > 0) {
373+
callback(user_data, unread_events_count_);
374+
unread_events_count_ = 0;
375+
}
376+
user_data_ = user_data;
377+
new_event_cb_ = callback;
378+
} else {
379+
user_data_ = nullptr;
380+
new_event_cb_ = nullptr;
381+
}
382+
}
383+
336384
protected:
337385
DDS_StatusCondition * scond;
386+
std::mutex new_event_mutex_;
387+
rmw_event_callback_t new_event_cb_{nullptr};
388+
const void * user_data_{nullptr};
389+
uint64_t unread_events_count_ = 0;
338390
};
339391

340392
void
@@ -712,6 +764,47 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
712764
return RMW_RET_OK;
713765
}
714766

767+
void set_on_new_data_callback(
768+
const rmw_event_callback_t callback,
769+
const void * const user_data)
770+
{
771+
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
772+
if (callback) {
773+
if (unread_data_events_count_ > 0) {
774+
callback(user_data, unread_data_events_count_);
775+
unread_data_events_count_ = 0;
776+
}
777+
new_data_event_cb_ = callback;
778+
data_event_user_data_ = user_data;
779+
} else {
780+
new_data_event_cb_ = nullptr;
781+
data_event_user_data_ = nullptr;
782+
}
783+
}
784+
785+
void notify_new_data()
786+
{
787+
size_t unread_samples = 0;
788+
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
789+
perform_action_and_update_state(
790+
[this, &unread_samples]() {
791+
if (unread_samples == 0) {
792+
return;
793+
}
794+
if (new_data_event_cb_) {
795+
new_data_event_cb_(data_event_user_data_, unread_samples);
796+
} else {
797+
unread_data_events_count_ += unread_samples;
798+
}
799+
},
800+
[this, &unread_samples]() {
801+
const rmw_ret_t rc = rmw_connextdds_count_unread_samples(this->sub, unread_samples);
802+
if (RMW_RET_OK != rc) {
803+
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
804+
}
805+
});
806+
}
807+
715808
protected:
716809
void update_status_deadline(
717810
const DDS_RequestedDeadlineMissedStatus * const status);
@@ -745,6 +838,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
745838

746839
RMW_Connext_Subscriber * sub;
747840

841+
std::mutex new_data_event_mutex_;
842+
rmw_event_callback_t new_data_event_cb_{nullptr};
843+
const void * data_event_user_data_{nullptr};
844+
uint64_t unread_data_events_count_ = 0;
845+
748846
friend class RMW_Connext_WaitSet;
749847
};
750848

rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ RMW_Connext_DataReaderListener_on_data_available(
121121

122122
UNUSED_ARG(reader);
123123

124+
self->notify_new_data();
124125
self->set_data_available(true);
125126
}
126127

@@ -706,6 +707,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(
706707

707708
this->status_deadline.total_count_change = this->status_deadline.total_count;
708709
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;
710+
711+
this->notify_new_event();
709712
}
710713

711714
void
@@ -720,6 +723,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
720723
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
721724
this->status_liveliness.not_alive_count_change -=
722725
this->status_liveliness_last.not_alive_count;
726+
727+
this->notify_new_event();
723728
}
724729

725730
void
@@ -852,6 +857,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(
852857

853858
this->status_deadline.total_count_change = this->status_deadline.total_count;
854859
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;
860+
861+
this->notify_new_event();
855862
}
856863

857864
void
@@ -863,6 +870,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(
863870

864871
this->status_liveliness.total_count_change = this->status_liveliness.total_count;
865872
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;
873+
874+
this->notify_new_event();
866875
}
867876

868877
void

rmw_connextdds_common/src/common/rmw_listener.cpp

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,58 +19,86 @@
1919
******************************************************************************/
2020
rmw_ret_t
2121
rmw_api_connextdds_event_set_callback(
22-
rmw_event_t * event,
23-
rmw_event_callback_t callback,
24-
const void * user_data)
22+
rmw_event_t * const event,
23+
const rmw_event_callback_t callback,
24+
const void * const user_data)
2525
{
26-
UNUSED_ARG(event);
27-
UNUSED_ARG(callback);
28-
UNUSED_ARG(user_data);
29-
RMW_CONNEXT_LOG_ERROR_SET("rmw_event_set_callback not implemented")
30-
return RMW_RET_UNSUPPORTED;
26+
RMW_CHECK_ARGUMENT_FOR_NULL(event, RMW_RET_INVALID_ARGUMENT);
27+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
28+
event,
29+
event->implementation_identifier,
30+
RMW_CONNEXTDDS_ID,
31+
return RMW_RET_INVALID_ARGUMENT);
32+
33+
RMW_Connext_StatusCondition * condition = nullptr;
34+
if (RMW_Connext_Event::reader_event(event)) {
35+
condition = RMW_Connext_Event::subscriber(event)->condition();
36+
} else {
37+
condition = RMW_Connext_Event::publisher(event)->condition();
38+
}
39+
condition->set_new_event_callback(callback, user_data);
40+
return RMW_RET_OK;
3141
}
3242

3343
/******************************************************************************
3444
* Service Listener API
3545
******************************************************************************/
3646
rmw_ret_t
3747
rmw_api_connextdds_service_set_on_new_request_callback(
38-
rmw_service_t * rmw_service,
39-
rmw_event_callback_t callback,
40-
const void * user_data)
48+
rmw_service_t * const service,
49+
const rmw_event_callback_t callback,
50+
const void * const user_data)
4151
{
42-
UNUSED_ARG(rmw_service);
43-
UNUSED_ARG(callback);
44-
UNUSED_ARG(user_data);
45-
RMW_CONNEXT_LOG_ERROR_SET("rmw_service_set_on_new_request_callback not implemented")
46-
return RMW_RET_UNSUPPORTED;
52+
RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT);
53+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
54+
service,
55+
service->implementation_identifier,
56+
RMW_CONNEXTDDS_ID,
57+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
58+
59+
RMW_Connext_Service * const svc_impl =
60+
reinterpret_cast<RMW_Connext_Service *>(service->data);
61+
svc_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data);
62+
return RMW_RET_OK;
4763
}
4864

4965
rmw_ret_t
5066
rmw_api_connextdds_client_set_on_new_response_callback(
51-
rmw_client_t * rmw_client,
52-
rmw_event_callback_t callback,
53-
const void * user_data)
67+
rmw_client_t * const client,
68+
const rmw_event_callback_t callback,
69+
const void * const user_data)
5470
{
55-
UNUSED_ARG(rmw_client);
56-
UNUSED_ARG(callback);
57-
UNUSED_ARG(user_data);
58-
RMW_CONNEXT_LOG_ERROR_SET("rmw_client_set_on_new_response_callback not implemented")
59-
return RMW_RET_UNSUPPORTED;
71+
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
72+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
73+
client,
74+
client->implementation_identifier,
75+
RMW_CONNEXTDDS_ID,
76+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
77+
78+
RMW_Connext_Client * const client_impl =
79+
reinterpret_cast<RMW_Connext_Client *>(client->data);
80+
client_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data);
81+
return RMW_RET_OK;
6082
}
6183

6284
/******************************************************************************
6385
* Subscription Listener API
6486
******************************************************************************/
6587
rmw_ret_t
6688
rmw_api_connextdds_subscription_set_on_new_message_callback(
67-
rmw_subscription_t * rmw_subscription,
68-
rmw_event_callback_t callback,
69-
const void * user_data)
89+
rmw_subscription_t * const subscription,
90+
const rmw_event_callback_t callback,
91+
const void * const user_data)
7092
{
71-
UNUSED_ARG(rmw_subscription);
72-
UNUSED_ARG(callback);
73-
UNUSED_ARG(user_data);
74-
RMW_CONNEXT_LOG_ERROR_SET("rmw_subscription_set_on_new_message_callback not implemented")
75-
return RMW_RET_UNSUPPORTED;
93+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
94+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
95+
subscription,
96+
subscription->implementation_identifier,
97+
RMW_CONNEXTDDS_ID,
98+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
99+
100+
RMW_Connext_Subscriber * const sub_impl =
101+
reinterpret_cast<RMW_Connext_Subscriber *>(subscription->data);
102+
sub_impl->condition()->set_on_new_data_callback(callback, user_data);
103+
return RMW_RET_OK;
76104
}

0 commit comments

Comments
 (0)