Skip to content

Commit 34cc960

Browse files
authored
Cleanup of #2683 (#2714)
Signed-off-by: Janosch Machowinski <[email protected]>
1 parent 06d400d commit 34cc960

File tree

4 files changed

+90
-58
lines changed

4 files changed

+90
-58
lines changed

rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
4242
* of this waitable has signaled the wait_set.
4343
*/
4444
RCLCPP_PUBLIC
45-
explicit ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback = {});
45+
explicit ExecutorNotifyWaitable(
46+
std::function<void(void)> on_execute_callback = {}, const rclcpp::Context::SharedPtr & context =
47+
rclcpp::contexts::get_global_default_context());
4648

4749
// Destructor
4850
RCLCPP_PUBLIC
@@ -167,8 +169,17 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
167169
std::function<void(size_t)> on_ready_callback_;
168170

169171
/// The collection of guard conditions to be waited on.
170-
std::set<rclcpp::GuardCondition::WeakPtr,
171-
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
172+
std::set<rclcpp::GuardCondition::SharedPtr> notify_guard_conditions_;
173+
174+
/// The indixes were our guard conditions were stored in the
175+
/// rcl waitset
176+
std::vector<size_t> idxs_of_added_guard_condition_;
177+
178+
/// set to true, if we got a pending trigger
179+
bool needs_processing = false;
180+
181+
/// A guard condition needed to generate wakeups
182+
rclcpp::GuardCondition::SharedPtr guard_condition_;
172183
};
173184

174185
} // namespace executors

rclcpp/src/rclcpp/executor.cpp

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
6666
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
6767
[this]() {
6868
this->entities_need_rebuild_.store(true);
69-
})),
69+
}, options.context)),
7070
entities_need_rebuild_(true),
7171
collector_(notify_waitable_),
7272
wait_set_({}, {}, {}, {}, {}, {}, options.context),
@@ -84,7 +84,9 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
8484
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
8585
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
8686

87-
wait_set_.add_waitable(notify_waitable_);
87+
// we need to initially rebuild the collection,
88+
// so that the notify_waitable_ is added
89+
collect_entities();
8890
}
8991

9092
Executor::~Executor()
@@ -745,33 +747,16 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
745747
// Clear any previous wait result
746748
this->wait_result_.reset();
747749

748-
// we need to make sure that callback groups don't get out of scope
749-
// during the wait. As in jazzy, they are not covered by the DynamicStorage,
750-
// we explicitly hold them here as a bugfix
751-
std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;
752-
753750
{
754751
std::lock_guard<std::mutex> guard(mutex_);
755752

756753
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
757754
this->collect_entities();
758755
}
759-
760-
auto callback_groups = this->collector_.get_all_callback_groups();
761-
cbgs.resize(callback_groups.size());
762-
for(const auto & w_ptr : callback_groups) {
763-
auto shr_ptr = w_ptr.lock();
764-
if(shr_ptr) {
765-
cbgs.push_back(std::move(shr_ptr));
766-
}
767-
}
768756
}
769757

770758
this->wait_result_.emplace(wait_set_.wait(timeout));
771759

772-
// drop references to the callback groups, before trying to execute anything
773-
cbgs.clear();
774-
775760
if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
776761
RCUTILS_LOG_WARN_NAMED(
777762
"rclcpp",

rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ namespace rclcpp
2020
namespace executors
2121
{
2222

23-
ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback)
24-
: execute_callback_(on_execute_callback)
23+
ExecutorNotifyWaitable::ExecutorNotifyWaitable(
24+
std::function<void(void)> on_execute_callback,
25+
const rclcpp::Context::SharedPtr & context)
26+
: execute_callback_(on_execute_callback),
27+
guard_condition_(std::make_shared<rclcpp::GuardCondition>(context))
2528
{
2629
}
2730

@@ -30,6 +33,9 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other)
3033
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
3134
this->execute_callback_ = other.execute_callback_;
3235
this->notify_guard_conditions_ = other.notify_guard_conditions_;
36+
this->guard_condition_ = other.guard_condition_;
37+
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
38+
this->needs_processing = other.needs_processing;
3339
}
3440

3541
ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
@@ -38,6 +44,9 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl
3844
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
3945
this->execute_callback_ = other.execute_callback_;
4046
this->notify_guard_conditions_ = other.notify_guard_conditions_;
47+
this->guard_condition_ = other.guard_condition_;
48+
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
49+
this->needs_processing = other.needs_processing;
4150
}
4251
return *this;
4352
}
@@ -47,21 +56,42 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
4756
{
4857
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
4958

59+
idxs_of_added_guard_condition_.clear();
60+
idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size());
61+
62+
if(needs_processing) {
63+
rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition();
64+
size_t rcl_index;
65+
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);
66+
67+
if (RCL_RET_OK != ret) {
68+
rclcpp::exceptions::throw_from_rcl_error(
69+
ret, "failed to add guard condition to wait set");
70+
}
71+
72+
idxs_of_added_guard_condition_.push_back(rcl_index);
73+
74+
// we want to directly wake up any way, not need to add the other guard conditions
75+
guard_condition_->trigger();
76+
77+
return;
78+
}
79+
5080
// Note: no guard conditions need to be re-triggered, since the guard
5181
// conditions in this class are not tracking a stateful condition, but instead
5282
// only serve to interrupt the wait set when new information is available to
5383
// consider.
54-
for (auto weak_guard_condition : this->notify_guard_conditions_) {
55-
auto guard_condition = weak_guard_condition.lock();
56-
if (!guard_condition) {continue;}
57-
84+
for (const auto & guard_condition : this->notify_guard_conditions_) {
5885
rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
59-
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL);
86+
size_t rcl_index;
87+
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);
6088

6189
if (RCL_RET_OK != ret) {
6290
rclcpp::exceptions::throw_from_rcl_error(
6391
ret, "failed to add guard condition to wait set");
6492
}
93+
94+
idxs_of_added_guard_condition_.push_back(rcl_index);
6595
}
6696
}
6797

@@ -71,27 +101,33 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
71101
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
72102

73103
bool any_ready = false;
74-
for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
75-
const auto * rcl_guard_condition = wait_set.guard_conditions[ii];
104+
for (size_t rcl_index : idxs_of_added_guard_condition_) {
105+
if(rcl_index >= wait_set.size_of_guard_conditions) {
106+
throw std::runtime_error(
107+
"ExecutorNotifyWaitable::is_ready: Internal error, got index out of range");
108+
}
109+
110+
const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index];
76111

77112
if (nullptr == rcl_guard_condition) {
78113
continue;
79114
}
80-
for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
81-
auto guard_condition = weak_guard_condition.lock();
82-
if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
83-
any_ready = true;
84-
break;
85-
}
86-
}
115+
116+
any_ready = true;
117+
needs_processing = true;
118+
break;
87119
}
120+
88121
return any_ready;
89122
}
90123

91124
void
92125
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
93126
{
94127
std::lock_guard<std::mutex> lock(execute_mutex_);
128+
129+
needs_processing = false;
130+
95131
this->execute_callback_();
96132
}
97133

@@ -122,11 +158,7 @@ ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> c
122158
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
123159

124160
on_ready_callback_ = gc_callback;
125-
for (auto weak_gc : notify_guard_conditions_) {
126-
auto gc = weak_gc.lock();
127-
if (!gc) {
128-
continue;
129-
}
161+
for (const auto & gc : notify_guard_conditions_) {
130162
gc->set_on_trigger_callback(on_ready_callback_);
131163
}
132164
}
@@ -138,11 +170,7 @@ ExecutorNotifyWaitable::clear_on_ready_callback()
138170
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
139171

140172
on_ready_callback_ = nullptr;
141-
for (auto weak_gc : notify_guard_conditions_) {
142-
auto gc = weak_gc.lock();
143-
if (!gc) {
144-
continue;
145-
}
173+
for (const auto & gc : notify_guard_conditions_) {
146174
gc->set_on_trigger_callback(nullptr);
147175
}
148176
}
@@ -159,9 +187,9 @@ void
159187
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
160188
{
161189
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
162-
auto guard_condition = weak_guard_condition.lock();
163-
if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) {
164-
notify_guard_conditions_.insert(weak_guard_condition);
190+
const auto & guard_condition = weak_guard_condition.lock();
191+
if (guard_condition && notify_guard_conditions_.count(guard_condition) == 0) {
192+
notify_guard_conditions_.insert(guard_condition);
165193
if (on_ready_callback_) {
166194
guard_condition->set_on_trigger_callback(on_ready_callback_);
167195
}
@@ -172,11 +200,17 @@ void
172200
ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
173201
{
174202
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
175-
if (notify_guard_conditions_.count(weak_guard_condition) != 0) {
176-
notify_guard_conditions_.erase(weak_guard_condition);
177-
auto guard_condition = weak_guard_condition.lock();
203+
const auto & guard_condition = weak_guard_condition.lock();
204+
if (!guard_condition) {
205+
// If the lock did not work, the guard condition can't be
206+
// saved in the sets, therefore we don't need to remove it
207+
return;
208+
}
209+
auto it = notify_guard_conditions_.find(guard_condition);
210+
if (it != notify_guard_conditions_.end()) {
211+
notify_guard_conditions_.erase(it);
178212
// If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
179-
if (guard_condition && on_ready_callback_) {
213+
if (on_ready_callback_) {
180214
guard_condition->set_on_trigger_callback(nullptr);
181215
}
182216
}

rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ TEST_F(TestExecutorNotifyWaitable, wait) {
7676
std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(on_execute_callback);
7777

7878
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
79-
auto notify_guard_condition =
80-
node->get_node_base_interface()->get_shared_notify_guard_condition();
79+
auto notify_guard_condition = std::make_shared<rclcpp::GuardCondition>();
8180
EXPECT_NO_THROW(waitable->add_guard_condition(notify_guard_condition));
8281

8382
auto default_cbg = node->get_node_base_interface()->get_default_callback_group();
@@ -86,12 +85,15 @@ TEST_F(TestExecutorNotifyWaitable, wait) {
8685
auto waitables = node->get_node_waitables_interface();
8786
waitables->add_waitable(std::static_pointer_cast<rclcpp::Waitable>(waitable), default_cbg);
8887

88+
// notify the guard condition, this should trigger the on_execute_callback
89+
notify_guard_condition->trigger();
90+
8991
rclcpp::executors::SingleThreadedExecutor executor;
9092
executor.add_node(node);
9193
executor.spin_all(std::chrono::seconds(1));
9294
EXPECT_EQ(1u, on_execute_calls);
9395

94-
// on_execute_callback doesn't change if the topology doesn't change
96+
// no further trigger, therefore no further callback
9597
executor.spin_all(std::chrono::seconds(1));
9698
EXPECT_EQ(1u, on_execute_calls);
9799
}

0 commit comments

Comments
 (0)