Skip to content

Commit c17094e

Browse files
author
iRobot ROS
authored
Merge pull request #33 from mauropasse/mauro/irobot-add-events-executor
Change executor ownership model
2 parents 6c55f0b + 51b5450 commit c17094e

24 files changed

+501
-331
lines changed

rclcpp/include/rclcpp/client.hpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,6 @@ class ClientBase
162162
const rclcpp::executors::EventsExecutor * executor,
163163
rmw_listener_cb_t executor_callback) const;
164164

165-
RCLCPP_PUBLIC
166-
void
167-
set_on_destruction_callback(std::function<void(ClientBase *)> on_destruction_callback);
168-
169165
protected:
170166
RCLCPP_DISABLE_COPY(ClientBase)
171167

@@ -181,8 +177,6 @@ class ClientBase
181177
const rcl_node_t *
182178
get_rcl_node_handle() const;
183179

184-
std::function<void(ClientBase *)> on_destruction_callback_;
185-
186180
rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
187181
std::shared_ptr<rcl_node_t> node_handle_;
188182
std::shared_ptr<rclcpp::Context> context_;

rclcpp/include/rclcpp/executor.hpp

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -437,18 +437,6 @@ class Executor
437437
static void
438438
execute_client(rclcpp::ClientBase::SharedPtr client);
439439

440-
RCLCPP_PUBLIC
441-
static void
442-
execute_subscription(rclcpp::SubscriptionBase * subscription);
443-
444-
RCLCPP_PUBLIC
445-
static void
446-
execute_service(rclcpp::ServiceBase * service);
447-
448-
RCLCPP_PUBLIC
449-
static void
450-
execute_client(rclcpp::ClientBase * client);
451-
452440
/**
453441
* \throws std::runtime_error if the wait set can be cleared
454442
*/

rclcpp/include/rclcpp/executors/events_executor.hpp

Lines changed: 5 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_
1717

1818
#include <chrono>
19-
#include <deque>
2019
#include <memory>
20+
#include <queue>
2121
#include <vector>
2222

2323
#include "rclcpp/executor.hpp"
@@ -174,9 +174,7 @@ class EventsExecutor : public rclcpp::Executor
174174
private:
175175
RCLCPP_DISABLE_COPY(EventsExecutor)
176176

177-
// Event queue implementation is a deque only to
178-
// facilitate the removal of events from expired entities.
179-
using EventQueue = std::deque<rmw_listener_event_t>;
177+
using EventQueue = std::queue<rmw_listener_event_t>;
180178

181179
// Executor callback: Push new events into the queue and trigger cv.
182180
// This function is called by the DDS entities when an event happened,
@@ -192,46 +190,12 @@ class EventsExecutor : public rclcpp::Executor
192190
{
193191
std::unique_lock<std::mutex> lock(this_executor->push_mutex_);
194192

195-
this_executor->event_queue_.push_back(event);
193+
this_executor->event_queue_.push(event);
196194
}
197195
// Notify that the event queue has some events in it.
198196
this_executor->event_queue_cv_.notify_one();
199197
}
200198

201-
// This function allows to remove an entity from the EventsExecutor.
202-
// Entities are any of SubscriptionBase, PublisherBase, ClientBase, ServerBase, Waitable.
203-
// After an entity has been removed using this API, it can be safely destroyed without the risk
204-
// that the executor would try to access it again.
205-
template<typename T>
206-
void
207-
remove_entity(T * entity)
208-
{
209-
// We need to unset the callbacks to make sure that after removing events from the
210-
// queues, this entity will not push anymore before being completely destroyed.
211-
// This assumes that all supported entities implement this function.
212-
entity->set_events_executor_callback(nullptr, nullptr);
213-
214-
// Remove events associated with this entity from the event queue
215-
{
216-
std::unique_lock<std::mutex> lock(push_mutex_);
217-
event_queue_.erase(
218-
std::remove_if(
219-
event_queue_.begin(), event_queue_.end(),
220-
[&entity](rmw_listener_event_t event) {return event.entity == entity;}), event_queue_.end());
221-
}
222-
223-
// Remove events associated with this entity from the local event queue
224-
{
225-
std::unique_lock<std::mutex> lock(execution_mutex_);
226-
execution_event_queue_.erase(
227-
std::remove_if(
228-
execution_event_queue_.begin(), execution_event_queue_.end(),
229-
[&entity](rmw_listener_event_t event) {
230-
return event.entity == entity;
231-
}), execution_event_queue_.end());
232-
}
233-
}
234-
235199
/// Extract and execute events from the queue until it is empty
236200
RCLCPP_PUBLIC
237201
void
@@ -242,16 +206,14 @@ class EventsExecutor : public rclcpp::Executor
242206
void
243207
execute_event(const rmw_listener_event_t & event);
244208

245-
// We use two instances of EventQueue to allow threads to push events while we execute them
209+
// Queue where entities can push events
246210
EventQueue event_queue_;
247-
EventQueue execution_event_queue_;
248211

249212
EventsExecutorEntitiesCollector::SharedPtr entities_collector_;
250213
EventsExecutorNotifyWaitable::SharedPtr executor_notifier_;
214+
251215
// Mutex to protect the insertion of events in the queue
252216
std::mutex push_mutex_;
253-
// Mutex to protect the execution of events
254-
std::mutex execution_mutex_;
255217
// Variable used to notify when an event is added to the queue
256218
std::condition_variable event_queue_cv_;
257219
// Timers manager

rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <list>
1919
#include <map>
2020
#include <memory>
21+
#include <unordered_map>
2122
#include <vector>
2223

2324
#include "rclcpp/executors/event_waitable.hpp"
@@ -48,7 +49,9 @@ class EventsExecutor;
4849
* When this occurs, the execute API takes care of handling changes
4950
* in the entities currently used by the executor.
5051
*/
51-
class EventsExecutorEntitiesCollector final : public EventWaitable
52+
class EventsExecutorEntitiesCollector final
53+
: public EventWaitable,
54+
public std::enable_shared_from_this<EventsExecutorEntitiesCollector>
5255
{
5356
public:
5457
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorEntitiesCollector)
@@ -62,6 +65,10 @@ class EventsExecutorEntitiesCollector final : public EventWaitable
6265
RCLCPP_PUBLIC
6366
~EventsExecutorEntitiesCollector();
6467

68+
// Initialize entities collector
69+
RCLCPP_PUBLIC
70+
void init();
71+
6572
// The purpose of "execute" is handling the situation of a new entity added to
6673
// a node, while the executor is already spinning.
6774
// This consists in setting that entitiy's callback.
@@ -141,6 +148,50 @@ class EventsExecutorEntitiesCollector final : public EventWaitable
141148
std::vector<rclcpp::CallbackGroup::WeakPtr>
142149
get_automatically_added_callback_groups_from_nodes();
143150

151+
///
152+
/**
153+
* Get the subscription shared pointer corresponding
154+
* to a subscription identifier
155+
*/
156+
RCLCPP_PUBLIC
157+
rclcpp::SubscriptionBase::SharedPtr
158+
get_subscription(const void * subscription_id);
159+
160+
///
161+
/**
162+
* Get the client shared pointer corresponding
163+
* to a client identifier
164+
*/
165+
RCLCPP_PUBLIC
166+
rclcpp::ClientBase::SharedPtr
167+
get_client(const void * client_id);
168+
169+
///
170+
/**
171+
* Get the service shared pointer corresponding
172+
* to a service identifier
173+
*/
174+
RCLCPP_PUBLIC
175+
rclcpp::ServiceBase::SharedPtr
176+
get_service(const void * service_id);
177+
178+
///
179+
/**
180+
* Get the waitable shared pointer corresponding
181+
* to a waitable identifier
182+
*/
183+
RCLCPP_PUBLIC
184+
rclcpp::Waitable::SharedPtr
185+
get_waitable(const void * waitable_id);
186+
187+
///
188+
/**
189+
* Add a weak pointer to a waitable
190+
*/
191+
RCLCPP_PUBLIC
192+
void
193+
add_waitable(rclcpp::Waitable::SharedPtr waitable);
194+
144195
private:
145196
void
146197
set_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group);
@@ -190,6 +241,15 @@ class EventsExecutorEntitiesCollector final : public EventWaitable
190241

191242
/// List of weak nodes registered in the events executor
192243
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
244+
245+
// Maps: entity identifiers to weak pointers from the entities registered in the executor
246+
// so in the case of an event providing and ID, we can retrieve and own the corresponding
247+
// entity while it performs work
248+
std::unordered_map<const void *, rclcpp::SubscriptionBase::WeakPtr> weak_subscriptions_map_;
249+
std::unordered_map<const void *, rclcpp::ServiceBase::WeakPtr> weak_services_map_;
250+
std::unordered_map<const void *, rclcpp::ClientBase::WeakPtr> weak_clients_map_;
251+
std::unordered_map<const void *, rclcpp::Waitable::WeakPtr> weak_waitables_map_;
252+
193253
/// Executor using this entities collector object
194254
EventsExecutor * associated_executor_ = nullptr;
195255
/// Instance of the timers manager used by the associated executor

rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,7 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
4040

4141
// Destructor
4242
RCLCPP_PUBLIC
43-
virtual ~EventsExecutorNotifyWaitable()
44-
{
45-
if (on_destruction_callback_) {
46-
on_destruction_callback_(this);
47-
}
48-
}
43+
virtual ~EventsExecutorNotifyWaitable() = default;
4944

5045
// The function is a no-op, since we only care of waking up the executor
5146
RCLCPP_PUBLIC

0 commit comments

Comments
 (0)