Skip to content

Commit 89f2106

Browse files
committed
Address reviewer feedback and fix templates
Signed-off-by: Michael Carroll <[email protected]>
1 parent a524bf0 commit 89f2106

File tree

11 files changed

+470
-175
lines changed

11 files changed

+470
-175
lines changed

rclcpp/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ set(${PROJECT_NAME}_SRCS
5454
src/rclcpp/executable_list.cpp
5555
src/rclcpp/executor.cpp
5656
src/rclcpp/executors.cpp
57-
src/rclcpp/executors/executor_notify_waitable.cpp
58-
src/rclcpp/executors/executor_entities_collector.cpp
5957
src/rclcpp/executors/executor_entities_collection.cpp
58+
src/rclcpp/executors/executor_entities_collector.cpp
59+
src/rclcpp/executors/executor_notify_waitable.cpp
6060
src/rclcpp/executors/multi_threaded_executor.cpp
6161
src/rclcpp/executors/single_threaded_executor.cpp
6262
src/rclcpp/executors/static_executor_entities_collector.cpp

rclcpp/include/rclcpp/executors/executor_entities_collection.hpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,19 @@ namespace executors
3535
template<typename EntityValueType>
3636
struct CollectionEntry
3737
{
38-
typename EntityValueType::WeakPtr entity;
38+
using EntityWeakPtr = typename EntityValueType::WeakPtr;
39+
using EntitySharedPtr = typename EntityValueType::SharedPtr;
40+
41+
EntityWeakPtr entity;
3942
rclcpp::CallbackGroup::WeakPtr callback_group;
4043
};
4144

4245
template<typename CollectionType>
4346
void update_entities(
4447
const CollectionType & update_from,
45-
CollectionType update_to,
46-
std::function<void(typename CollectionType::mapped_type::EntitySharedPtr)> on_added,
47-
std::function<void(typename CollectionType::mapped_type::EntitySharedPtr)> on_removed
48+
CollectionType & update_to,
49+
std::function<void(typename CollectionType::EntitySharedPtr)> on_added,
50+
std::function<void(typename CollectionType::EntitySharedPtr)> on_removed
4851
)
4952
{
5053
for (auto it = update_to.begin(); it != update_to.end(); ) {
@@ -60,7 +63,7 @@ void update_entities(
6063
}
6164
for (auto it = update_from.begin(); it != update_from.end(); ++it) {
6265
if (update_to.count(it->first) == 0) {
63-
auto entity = it->entity.lock();
66+
auto entity = it->second.entity.lock();
6467
if (entity) {
6568
on_added(entity);
6669
}
@@ -82,7 +85,7 @@ class EntityCollection
8285
std::function<void(EntitySharedPtr)> on_added,
8386
std::function<void(EntitySharedPtr)> on_removed)
8487
{
85-
update_entities(*this, other, on_added, on_removed);
88+
update_entities(other, *this, on_added, on_removed);
8689
}
8790
};
8891

@@ -118,6 +121,7 @@ struct ExecutorEntitiesCollection
118121
GuardConditionCollection guard_conditions;
119122
WaitableCollection waitables;
120123

124+
bool empty() const;
121125
void clear();
122126
};
123127

rclcpp/include/rclcpp/executors/executor_entities_collector.hpp

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ class ExecutorEntitiesCollector
143143
std::vector<rclcpp::CallbackGroup::WeakPtr>
144144
get_automatically_added_callback_groups();
145145

146+
RCLCPP_PUBLIC
147+
std::shared_ptr<ExecutorNotifyWaitable> get_notify_waitable();
148+
146149
/// Update the underlying collections
147150
/**
148151
* This will prune nodes and callback groups that are no longer valid as well
@@ -153,10 +156,34 @@ class ExecutorEntitiesCollector
153156
update_collections();
154157

155158
protected:
159+
using NodeCollection = std::set<
160+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
161+
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>;
162+
156163
using CallbackGroupCollection = std::set<
157164
rclcpp::CallbackGroup::WeakPtr,
158165
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>;
159166

167+
using WeakNodesToGuardConditionsMap = std::map<
168+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
169+
const rclcpp::GuardCondition *,
170+
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>;
171+
172+
using WeakGroupsToGuardConditionsMap = std::map<
173+
rclcpp::CallbackGroup::WeakPtr,
174+
const rclcpp::GuardCondition *,
175+
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>;
176+
177+
RCLCPP_PUBLIC
178+
NodeCollection::iterator
179+
remove_weak_node(NodeCollection::iterator weak_node) RCPPUTILS_TSA_REQUIRES(mutex_);
180+
181+
RCLCPP_PUBLIC
182+
CallbackGroupCollection::iterator
183+
remove_weak_callback_group(
184+
CallbackGroupCollection::iterator weak_group_it,
185+
CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_);
186+
160187
RCLCPP_PUBLIC
161188
void
162189
add_callback_group_to_collection(
@@ -172,7 +199,7 @@ class ExecutorEntitiesCollector
172199
RCLCPP_PUBLIC
173200
void
174201
add_automatically_associated_callback_groups(
175-
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> nodes_to_check)
202+
const NodeCollection & nodes_to_check)
176203
RCPPUTILS_TSA_REQUIRES(mutex_);
177204

178205
RCLCPP_PUBLIC
@@ -181,19 +208,25 @@ class ExecutorEntitiesCollector
181208

182209
mutable std::mutex mutex_;
183210

211+
/// Callback groups that were added via `add_callback_group`
184212
CallbackGroupCollection
185213
manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
186214

215+
/// Callback groups that were added by their association with added nodes
187216
CallbackGroupCollection
188217
automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
189218

190219
/// nodes that are associated with the executor
191-
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
220+
NodeCollection
192221
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
193222

223+
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
224+
225+
WeakGroupsToGuardConditionsMap weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
226+
194227
std::shared_ptr<ExecutorNotifyWaitable> notify_waitable_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
195228
};
196229
} // namespace executors
197230
} // namespace rclcpp
198-
231+
//
199232
#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_

rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
8585
*/
8686
RCLCPP_PUBLIC
8787
void
88-
add_guard_condition(rclcpp::GuardCondition * guard_condition);
88+
add_guard_condition(const rclcpp::GuardCondition * guard_condition);
8989

9090
/// Remove a guard condition from being waited on.
9191
/**
9292
* \param[in] guard_condition The guard condition to remove.
9393
*/
9494
RCLCPP_PUBLIC
9595
void
96-
remove_guard_condition(rclcpp::GuardCondition * guard_condition);
96+
remove_guard_condition(const rclcpp::GuardCondition * guard_condition);
9797

9898
/// Get the number of ready guard_conditions
9999
/**
@@ -107,8 +107,17 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
107107
/// Callback to run when waitable executes
108108
std::function<void(void)> execute_callback_;
109109

110+
/// The collection of guard conditions to be waited on.
111+
std::mutex guard_condition_mutex_;
112+
110113
/// The collection of guard conditions to be waited on.
111114
std::list<const rclcpp::GuardCondition *> notify_guard_conditions_;
115+
116+
/// The collection of guard conditions to be waited on.
117+
std::list<const rclcpp::GuardCondition *> to_add_;
118+
119+
/// The collection of guard conditions to be waited on.
120+
std::list<const rclcpp::GuardCondition *> to_remove_;
112121
};
113122

114123
} // namespace executors

rclcpp/src/rclcpp/callback_group.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ CallbackGroup::get_notify_guard_condition(const rclcpp::Context::SharedPtr conte
141141
rclcpp::GuardCondition::SharedPtr
142142
CallbackGroup::get_notify_guard_condition()
143143
{
144+
std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
144145
auto context_ptr = this->get_context_();
145146
if (context_ptr && context_ptr->is_valid()) {
146147
std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
@@ -156,6 +157,8 @@ CallbackGroup::get_notify_guard_condition()
156157
}
157158

158159
return notify_guard_condition_;
160+
} else {
161+
throw std::runtime_error("Couldn't get guard condition from invalid context");
159162
}
160163
return nullptr;
161164
}

rclcpp/src/rclcpp/executors/executor_entities_collection.cpp

Lines changed: 65 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ namespace rclcpp
1818
{
1919
namespace executors
2020
{
21+
bool ExecutorEntitiesCollection::empty() const
22+
{
23+
return (subscriptions.empty() &&
24+
timers.empty() &&
25+
guard_conditions.empty() &&
26+
clients.empty() &&
27+
services.empty() &&
28+
waitables.empty());
29+
}
2130

2231
void ExecutorEntitiesCollection::clear()
2332
{
@@ -29,6 +38,7 @@ void ExecutorEntitiesCollection::clear()
2938
waitables.clear();
3039
}
3140

41+
3242
void
3343
build_entities_collection(
3444
const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
@@ -42,43 +52,46 @@ build_entities_collection(
4252
continue;
4353
}
4454

45-
group_ptr->collect_all_ptrs(
46-
[&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
47-
collection.subscriptions.insert(
48-
{
49-
subscription->get_subscription_handle().get(),
50-
{subscription, weak_group_ptr}
51-
});
52-
},
53-
[&collection, weak_group_ptr](const rclcpp::ServiceBase::SharedPtr & service) {
54-
collection.services.insert(
55-
{
56-
service->get_service_handle().get(),
57-
{service, weak_group_ptr}
58-
});
59-
},
60-
[&collection, weak_group_ptr](const rclcpp::ClientBase::SharedPtr & client) {
61-
collection.clients.insert(
62-
{
63-
client->get_client_handle().get(),
64-
{client, weak_group_ptr}
65-
});
66-
},
67-
[&collection, weak_group_ptr](const rclcpp::TimerBase::SharedPtr & timer) {
68-
collection.timers.insert(
69-
{
70-
timer->get_timer_handle().get(),
71-
{timer, weak_group_ptr}
72-
});
73-
},
74-
[&collection, weak_group_ptr](const rclcpp::Waitable::SharedPtr & waitable) {
75-
collection.waitables.insert(
76-
{
77-
waitable.get(),
78-
{waitable, weak_group_ptr}
79-
});
80-
}
81-
);
55+
if (group_ptr->can_be_taken_from().load())
56+
{
57+
group_ptr->collect_all_ptrs(
58+
[&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
59+
collection.subscriptions.insert(
60+
{
61+
subscription->get_subscription_handle().get(),
62+
{subscription, weak_group_ptr}
63+
});
64+
},
65+
[&collection, weak_group_ptr](const rclcpp::ServiceBase::SharedPtr & service) {
66+
collection.services.insert(
67+
{
68+
service->get_service_handle().get(),
69+
{service, weak_group_ptr}
70+
});
71+
},
72+
[&collection, weak_group_ptr](const rclcpp::ClientBase::SharedPtr & client) {
73+
collection.clients.insert(
74+
{
75+
client->get_client_handle().get(),
76+
{client, weak_group_ptr}
77+
});
78+
},
79+
[&collection, weak_group_ptr](const rclcpp::TimerBase::SharedPtr & timer) {
80+
collection.timers.insert(
81+
{
82+
timer->get_timer_handle().get(),
83+
{timer, weak_group_ptr}
84+
});
85+
},
86+
[&collection, weak_group_ptr](const rclcpp::Waitable::SharedPtr & waitable) {
87+
collection.waitables.insert(
88+
{
89+
waitable.get(),
90+
{waitable, weak_group_ptr}
91+
});
92+
}
93+
);
94+
}
8295
}
8396
}
8497

@@ -88,24 +101,28 @@ void check_ready(
88101
std::deque<rclcpp::AnyExecutable> & executables,
89102
size_t size_of_waited_entities,
90103
typename EntityCollectionType::Key * waited_entities,
91-
std::function<bool(rclcpp::AnyExecutable,
92-
typename EntityCollectionType::EntitySharedPtr)> fill_executable)
104+
std::function<bool(rclcpp::AnyExecutable &,
105+
typename EntityCollectionType::EntitySharedPtr &)> fill_executable)
93106
{
94107
for (size_t ii = 0; ii < size_of_waited_entities; ++ii) {
95108
if (waited_entities[ii]) {
96109
auto entity_iter = collection.find(waited_entities[ii]);
97110
if (entity_iter != collection.end()) {
98111
auto entity = entity_iter->second.entity.lock();
99-
auto callback_group = entity_iter->second.callback_group.lock();
100-
101-
if (!entity || !callback_group || !callback_group->can_be_taken_from().load()) {
112+
if (!entity) {
102113
continue;
103114
}
104115

116+
auto callback_group = entity_iter->second.callback_group.lock();
117+
if (callback_group && !callback_group->can_be_taken_from().load()) {
118+
continue;
119+
}
105120
rclcpp::AnyExecutable exec;
121+
106122
exec.callback_group = callback_group;
107123
if (fill_executable(exec, entity)) {
108124
executables.push_back(exec);
125+
} else {
109126
}
110127
}
111128
}
@@ -123,15 +140,13 @@ ready_executables(
123140
if (wait_result.kind() != rclcpp::WaitResultKind::Ready) {
124141
return ret;
125142
}
126-
127143
auto rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
128-
129144
check_ready(
130145
collection.timers,
131146
ret,
132147
rcl_wait_set.size_of_timers,
133148
rcl_wait_set.timers,
134-
[](auto exec, auto timer) {
149+
[](rclcpp::AnyExecutable & exec, auto timer) {
135150
if (!timer->call()) {
136151
return false;
137152
}
@@ -144,17 +159,18 @@ ready_executables(
144159
ret,
145160
rcl_wait_set.size_of_subscriptions,
146161
rcl_wait_set.subscriptions,
147-
[](auto exec, auto subscription) {
162+
[](rclcpp::AnyExecutable & exec, auto subscription) {
148163
exec.subscription = subscription;
149164
return true;
150165
});
151166

167+
152168
check_ready(
153169
collection.services,
154170
ret,
155171
rcl_wait_set.size_of_services,
156172
rcl_wait_set.services,
157-
[](auto exec, auto service) {
173+
[](rclcpp::AnyExecutable & exec, auto service) {
158174
exec.service = service;
159175
return true;
160176
});
@@ -164,7 +180,7 @@ ready_executables(
164180
ret,
165181
rcl_wait_set.size_of_clients,
166182
rcl_wait_set.clients,
167-
[](auto exec, auto client) {
183+
[](rclcpp::AnyExecutable & exec, auto client) {
168184
exec.client = client;
169185
return true;
170186
});
@@ -173,7 +189,7 @@ ready_executables(
173189
auto waitable = entry.entity.lock();
174190
if (waitable && waitable->is_ready(&rcl_wait_set)) {
175191
auto group = entry.callback_group.lock();
176-
if (!group || !group->can_be_taken_from().load()) {
192+
if (group && !group->can_be_taken_from().load()) {
177193
continue;
178194
}
179195

0 commit comments

Comments
 (0)