Skip to content

Commit 1412e94

Browse files
author
Mauro Passerino
committed
EventsExecutor to use std::function
Signed-off-by: Mauro Passerino <[email protected]>
1 parent 6ef29c1 commit 1412e94

File tree

9 files changed

+34
-186
lines changed

9 files changed

+34
-186
lines changed

rclcpp/include/rclcpp/executors/events_executor.hpp

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -209,31 +209,6 @@ class EventsExecutor : public rclcpp::Executor
209209
private:
210210
RCLCPP_DISABLE_COPY(EventsExecutor)
211211

212-
// Executor callback: Push new events into the queue and trigger cv.
213-
// This function is called by the DDS entities when an event happened,
214-
// like a subscription receiving a message.
215-
static void
216-
push_event(const void * event_data, size_t num_events)
217-
{
218-
if (!event_data) {
219-
throw std::runtime_error("Executor event data not valid.");
220-
}
221-
222-
auto data = static_cast<const executors::EventsExecutorCallbackData *>(event_data);
223-
224-
executors::EventsExecutor * this_executor = data->executor;
225-
ExecutorEvent event = data->event;
226-
event.num_events = num_events;
227-
228-
// Event queue mutex scope
229-
{
230-
std::unique_lock<std::mutex> lock(this_executor->push_mutex_);
231-
this_executor->events_queue_->push(event);
232-
}
233-
// Notify that the event queue has some events in it.
234-
this_executor->events_queue_cv_.notify_one();
235-
}
236-
237212
// Execute a single event
238213
RCLCPP_PUBLIC
239214
void

rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,8 @@ class EventsExecutorEntitiesCollector final
214214
void
215215
unset_guard_condition_callback(const rclcpp::GuardCondition * guard_condition);
216216

217-
void
218-
remove_callback_data(void * entity_id, ExecutorEventType type);
219-
220-
const EventsExecutorCallbackData *
221-
get_callback_data(void * entity_id, ExecutorEventType type);
217+
std::function<void(size_t)>
218+
create_entity_callback(void * entity_id, ExecutorEventType type);
222219

223220
/// Return true if the node belongs to the collector
224221
/**
@@ -269,17 +266,6 @@ class EventsExecutorEntitiesCollector final
269266
EventsExecutor * associated_executor_ = nullptr;
270267
/// Instance of the timers manager used by the associated executor
271268
TimersManager::SharedPtr timers_manager_;
272-
273-
/// Callback data objects mapped to the number of listeners sharing the same object.
274-
/// When no more listeners use the object, it can be removed from the map.
275-
/// For example, the entities collector holds every node's guard condition, which
276-
/// share the same EventsExecutorCallbackData object ptr to use as their callback arg:
277-
/// cb_data_object = {executor_ptr, entities_collector_ptr, WAITABLE_EVENT};
278-
/// Node1->gc(&cb_data_object)
279-
/// Node2->gc(&cb_data_object)
280-
/// So the maps has: (cb_data_object, 2)
281-
/// When both nodes are removed (counter = 0), the cb_data_object can be destroyed.
282-
std::unordered_map<EventsExecutorCallbackData, size_t, KeyHasher> callback_data_map_;
283269
};
284270

285271
} // namespace executors

rclcpp/include/rclcpp/executors/events_executor_event_types.hpp

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ namespace rclcpp
2020
namespace executors
2121
{
2222

23-
// forward declaration of EventsExecutor to avoid circular dependency
24-
class EventsExecutor;
25-
2623
enum ExecutorEventType
2724
{
2825
SUBSCRIPTION_EVENT,
@@ -38,42 +35,6 @@ struct ExecutorEvent
3835
size_t num_events;
3936
};
4037

41-
// The EventsExecutorCallbackData struct is what the listeners
42-
// will use as argument when calling their callbacks from the
43-
// RMW implementation. The listeners get a (void *) of this struct,
44-
// and the executor is in charge to cast it back and use the data.
45-
struct EventsExecutorCallbackData
46-
{
47-
EventsExecutorCallbackData(
48-
EventsExecutor * _executor,
49-
ExecutorEvent _event)
50-
{
51-
executor = _executor;
52-
event = _event;
53-
}
54-
55-
// Equal operator
56-
bool operator==(const EventsExecutorCallbackData & other) const
57-
{
58-
return event.entity_id == other.event.entity_id;
59-
}
60-
61-
// Struct members
62-
EventsExecutor * executor;
63-
ExecutorEvent event;
64-
};
65-
66-
// To be able to use std::unordered_map with an EventsExecutorCallbackData
67-
// as key, we need a hasher. We use the entity ID as hash, since it is
68-
// unique for each EventsExecutorCallbackData object.
69-
struct KeyHasher
70-
{
71-
size_t operator()(const EventsExecutorCallbackData & key) const
72-
{
73-
return std::hash<const void *>()(key.event.entity_id);
74-
}
75-
};
76-
7738
} // namespace executors
7839
} // namespace rclcpp
7940

rclcpp/include/rclcpp/waitable.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define RCLCPP__WAITABLE_HPP_
1717

1818
#include <atomic>
19+
#include <functional>
1920
#include <memory>
2021

2122
#include "rclcpp/macros.hpp"

rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp

Lines changed: 26 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include "rclcpp/executors/events_executor.hpp"
2121
#include "rclcpp/executors/events_executor_entities_collector.hpp"
2222

23-
using rclcpp::executors::EventsExecutorCallbackData;
23+
using rclcpp::executors::ExecutorEvent;
2424
using rclcpp::executors::EventsExecutorEntitiesCollector;
2525

2626
EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector(
@@ -76,7 +76,6 @@ EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector()
7676
weak_nodes_.clear();
7777
weak_clients_map_.clear();
7878
weak_services_map_.clear();
79-
callback_data_map_.clear();
8079
weak_waitables_map_.clear();
8180
weak_subscriptions_map_.clear();
8281
weak_nodes_to_guard_conditions_.clear();
@@ -238,9 +237,8 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks(
238237
if (subscription) {
239238
weak_subscriptions_map_.emplace(subscription.get(), subscription);
240239

241-
subscription->set_listener_callback(
242-
&EventsExecutor::push_event,
243-
get_callback_data(subscription.get(), SUBSCRIPTION_EVENT));
240+
subscription->set_on_new_message_callback(
241+
create_entity_callback(subscription.get(), SUBSCRIPTION_EVENT));
244242
}
245243
return false;
246244
});
@@ -249,9 +247,8 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks(
249247
if (service) {
250248
weak_services_map_.emplace(service.get(), service);
251249

252-
service->set_listener_callback(
253-
&EventsExecutor::push_event,
254-
get_callback_data(service.get(), SERVICE_EVENT));
250+
service->set_on_new_request_callback(
251+
create_entity_callback(service.get(), SERVICE_EVENT));
255252
}
256253
return false;
257254
});
@@ -260,9 +257,8 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks(
260257
if (client) {
261258
weak_clients_map_.emplace(client.get(), client);
262259

263-
client->set_listener_callback(
264-
&EventsExecutor::push_event,
265-
get_callback_data(client.get(), CLIENT_EVENT));
260+
client->set_on_new_response_callback(
261+
create_entity_callback(client.get(), CLIENT_EVENT));
266262
}
267263
return false;
268264
});
@@ -271,9 +267,8 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks(
271267
if (waitable) {
272268
weak_waitables_map_.emplace(waitable.get(), waitable);
273269

274-
waitable->set_listener_callback(
275-
&EventsExecutor::push_event,
276-
get_callback_data(waitable.get(), WAITABLE_EVENT));
270+
// waitable->set_listener_callback(
271+
// create_entity_callback(waitable.get(), WAITABLE_EVENT));
277272
}
278273
return false;
279274
});
@@ -296,36 +291,32 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks(
296291
group->find_subscription_ptrs_if(
297292
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
298293
if (subscription) {
299-
subscription->set_listener_callback(nullptr, nullptr);
294+
subscription->set_on_new_message_callback(nullptr);
300295
weak_subscriptions_map_.erase(subscription.get());
301-
remove_callback_data(subscription.get(), SUBSCRIPTION_EVENT);
302296
}
303297
return false;
304298
});
305299
group->find_service_ptrs_if(
306300
[this](const rclcpp::ServiceBase::SharedPtr & service) {
307301
if (service) {
308-
service->set_listener_callback(nullptr, nullptr);
302+
service->set_on_new_request_callback(nullptr);
309303
weak_services_map_.erase(service.get());
310-
remove_callback_data(service.get(), SERVICE_EVENT);
311304
}
312305
return false;
313306
});
314307
group->find_client_ptrs_if(
315308
[this](const rclcpp::ClientBase::SharedPtr & client) {
316309
if (client) {
317-
client->set_listener_callback(nullptr, nullptr);
310+
client->set_on_new_response_callback(nullptr);
318311
weak_clients_map_.erase(client.get());
319-
remove_callback_data(client.get(), CLIENT_EVENT);
320312
}
321313
return false;
322314
});
323315
group->find_waitable_ptrs_if(
324316
[this](const rclcpp::Waitable::SharedPtr & waitable) {
325317
if (waitable) {
326-
waitable->set_listener_callback(nullptr, nullptr);
318+
waitable->set_listener_callback(nullptr);
327319
weak_waitables_map_.erase(waitable.get());
328-
remove_callback_data(waitable.get(), WAITABLE_EVENT);
329320
}
330321
return false;
331322
});
@@ -487,28 +478,13 @@ void
487478
EventsExecutorEntitiesCollector::set_guard_condition_callback(
488479
const rclcpp::GuardCondition * guard_condition)
489480
{
490-
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
491-
guard_condition,
492-
&EventsExecutor::push_event,
493-
get_callback_data(this, WAITABLE_EVENT));
494-
495-
if (ret != RCL_RET_OK) {
496-
throw std::runtime_error("Couldn't set guard condition event callback");
497-
}
481+
// create_entity_callback(this, WAITABLE_EVENT);
498482
}
499483

500484
void
501485
EventsExecutorEntitiesCollector::unset_guard_condition_callback(
502486
const rclcpp::GuardCondition * guard_condition)
503487
{
504-
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
505-
guard_condition, nullptr, nullptr);
506-
507-
if (ret != RCL_RET_OK) {
508-
throw std::runtime_error("Couldn't unset guard condition event callback");
509-
}
510-
511-
remove_callback_data(this, WAITABLE_EVENT);
512488
}
513489

514490
rclcpp::SubscriptionBase::SharedPtr
@@ -592,56 +568,21 @@ EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitab
592568
{
593569
weak_waitables_map_.emplace(waitable.get(), waitable);
594570

595-
waitable->set_listener_callback(
596-
&EventsExecutor::push_event,
597-
get_callback_data(waitable.get(), WAITABLE_EVENT));
571+
waitable->set_listener_callback([] (size_t, int){});
598572
}
599573

600-
const EventsExecutorCallbackData *
601-
EventsExecutorEntitiesCollector::get_callback_data(
574+
std::function<void(size_t)>
575+
EventsExecutorEntitiesCollector::create_entity_callback(
602576
void * entity_id, ExecutorEventType event_type)
603577
{
604-
// Create an entity callback data object and check if
605-
// we already have stored one like it
606-
ExecutorEvent event = {entity_id, event_type, 0};
607-
EventsExecutorCallbackData data(associated_executor_, event);
608-
609-
auto it = callback_data_map_.find(data);
610-
611-
if (it != callback_data_map_.end()) {
612-
// We found a callback data matching entity ID and type.
613-
// Increment callback data counter and return pointer to data
614-
it->second++;
615-
return &it->first;
616-
}
617-
618-
// There was no callback data object matching ID and type,
619-
// create one and set counter to 1.
620-
callback_data_map_.emplace(data, 1);
621-
622-
// Return a pointer to the just added entity callback data.
623-
it = callback_data_map_.find(data);
624-
return &it->first;
625-
}
626-
627-
void
628-
EventsExecutorEntitiesCollector::remove_callback_data(
629-
void * entity_id, ExecutorEventType event_type)
630-
{
631-
// Create an entity callback data object and check if
632-
// we already have stored one like it
633-
ExecutorEvent event = {entity_id, event_type, 0};
634-
EventsExecutorCallbackData data(associated_executor_, event);
635-
636-
auto it = callback_data_map_.find(data);
637-
638-
if (it != callback_data_map_.end()) {
639-
// We found a callback data matching entity ID and type.
640-
// If we have more than 1 decrement counter, otherwise remove it.
641-
if (it->second > 1) {
642-
it->second--;
643-
} else {
644-
callback_data_map_.erase(it);
578+
return [this, entity_id, event_type](size_t num_events) {
579+
ExecutorEvent event = {entity_id, event_type, num_events};
580+
// Event queue mutex scope
581+
{
582+
std::unique_lock<std::mutex> lock(associated_executor_->push_mutex_);
583+
associated_executor_->events_queue_->push(event);
645584
}
646-
}
585+
// Notify that the event queue has some events in it.
586+
associated_executor_->events_queue_cv_.notify_one();
587+
};
647588
}

rclcpp/src/rclcpp/qos_event.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,9 @@ QOSEventHandlerBase::set_on_new_event_callback(
8585
}
8686

8787
void
88-
QOSEventHandlerBase::set_listener_callback(
89-
rmw_listener_callback_t callback,
90-
const void * user_data)
88+
QOSEventHandlerBase::set_listener_callback(std::function<void(size_t, int)> callback)
9189
{
92-
set_on_new_event_callback(callback, user_data);
90+
// set_on_new_event_callback(callback);
9391
}
9492

9593

rclcpp/src/rclcpp/subscription_intra_process_base.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ SubscriptionIntraProcessBase::get_actual_qos() const
3939
}
4040

4141
void
42-
SubscriptionIntraProcessBase::set_listener_callback(
43-
rmw_listener_callback_t callback,
44-
const void * user_data)
42+
SubscriptionIntraProcessBase::set_listener_callback(std::function<void(size_t, int)> callback)
4543
{
4644
(void)callback;
47-
(void)user_data;
4845
}

rclcpp/src/rclcpp/waitable.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,9 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state)
6161
}
6262

6363
void
64-
Waitable::set_listener_callback(
65-
rmw_listener_callback_t callback,
66-
const void * user_data)
64+
Waitable::set_listener_callback(std::function<void(size_t, int)> callback)
6765
{
6866
(void)callback;
69-
(void)user_data;
7067

7168
throw std::runtime_error(
7269
"Custom waitables should override set_listener_callback() "

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -447,16 +447,8 @@ class TestWaitable : public rclcpp::Waitable
447447
}
448448

449449
void
450-
set_listener_callback(
451-
rmw_listener_callback_t callback,
452-
const void * user_data) override
450+
set_listener_callback(std::function<void(size_t, int)> callback) override
453451
{
454-
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
455-
&gc_, callback, user_data);
456-
457-
if (RCL_RET_OK != ret) {
458-
throw std::runtime_error(std::string("Couldn't set guard condition callback"));
459-
}
460452
}
461453

462454
private:

0 commit comments

Comments
 (0)