Skip to content

Commit d0e8e43

Browse files
author
Mauro Passerino
committed
spin_some and spin_all call spin_some_impl
1 parent 0087392 commit d0e8e43

File tree

6 files changed

+64
-117
lines changed

6 files changed

+64
-117
lines changed

rclcpp/include/rclcpp/executors/events_executor.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ class EventsExecutor : public rclcpp::Executor
197197
void
198198
spin_once_impl(std::chrono::nanoseconds timeout) override;
199199

200+
RCLCPP_PUBLIC
201+
void
202+
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);
203+
200204
private:
201205
RCLCPP_DISABLE_COPY(EventsExecutor)
202206

rclcpp/include/rclcpp/executors/timers_manager.hpp

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,12 @@ class TimersManager
101101
std::chrono::nanoseconds execute_ready_timers();
102102

103103
/**
104-
* @brief Executes one ready timer if available
105-
*
106-
* @return true if there was a timer ready
104+
* @brief Executes head timer if ready at time point.
105+
* @param tp the timepoint to check for
106+
* @return true if head timer was ready at tp
107107
*/
108-
bool execute_head_timer();
108+
bool execute_head_timer(
109+
std::chrono::time_point<std::chrono::steady_clock> tp = TimePoint::max());
109110

110111
/**
111112
* @brief Get the amount of time before the next timer expires.
@@ -127,19 +128,13 @@ class TimersManager
127128
*/
128129
void remove_timer(rclcpp::TimerBase::SharedPtr timer);
129130

130-
/**
131-
* @brief Executes head timer if ready at time point.
132-
* @param tp the timepoint to check for
133-
* @return true if head timer was ready at tp
134-
*/
135-
bool execute_head_timer_if_ready_at_tp(
136-
std::chrono::time_point<std::chrono::steady_clock> tp);
137-
138131
// This is what the TimersManager uses to denote a duration forever.
139132
// We don't use std::chrono::nanoseconds::max because it will overflow.
140133
// See https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for
141134
static constexpr std::chrono::nanoseconds MAX_TIME = std::chrono::hours(90);
142135

136+
using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;
137+
143138
private:
144139
RCLCPP_DISABLE_COPY(TimersManager)
145140

rclcpp/src/rclcpp/executors/events_executor.cpp

Lines changed: 34 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -79,107 +79,75 @@ EventsExecutor::spin()
7979
void
8080
EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
8181
{
82-
if (spinning.exchange(true)) {
83-
throw std::runtime_error("spin_some() called while already spinning");
84-
}
85-
RCLCPP_SCOPE_EXIT(this->spinning.store(false););
86-
8782
// In this context a 0 input max_duration means no duration limit
8883
if (std::chrono::nanoseconds(0) == max_duration) {
8984
max_duration = timers_manager_->MAX_TIME;
9085
}
9186

92-
auto start = std::chrono::steady_clock::now();
93-
94-
auto max_duration_not_elapsed = [max_duration, start]() {
95-
auto elapsed_time = std::chrono::steady_clock::now() - start;
96-
return elapsed_time < max_duration;
97-
};
98-
99-
// Get the number of events ready at this time point
100-
std::unique_lock<std::mutex> lock(push_mutex_);
101-
size_t available_events_at_tp = events_queue_->size();
102-
lock.unlock();
103-
104-
size_t executed_events = 0;
105-
106-
// Checks if all events that were ready when spin_some was called, were executed.
107-
auto executed_ready_events_at_tp = [executed_events, available_events_at_tp]() {
108-
return executed_events >= available_events_at_tp;
109-
};
110-
111-
// Execute events and timers ready when spin_some was called,
112-
// until timeout or no more work available.
113-
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
114-
// Execute first event from queue if it exists
115-
if (!executed_ready_events_at_tp()) {
116-
std::unique_lock<std::mutex> lock(push_mutex_);
117-
118-
bool has_event = !events_queue_->empty();
119-
120-
if (has_event) {
121-
rmw_listener_event_t event = events_queue_->front();
122-
events_queue_->pop();
123-
// std::cout << "Execute event" << std::endl;
124-
this->execute_event(event);
125-
executed_events++;
126-
continue;
127-
}
128-
}
129-
130-
// Execute timer, if was ready at start
131-
if (timers_manager_->execute_head_timer_if_ready_at_tp(start)) {
132-
continue;
133-
}
134-
135-
// If there's no more work available, exit
136-
break;
137-
}
87+
return this->spin_some_impl(max_duration, false);
13888
}
13989

14090
void
14191
EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
14292
{
143-
if (spinning.exchange(true)) {
144-
throw std::runtime_error("spin_all() called while already spinning");
93+
if (max_duration <= 0ns) {
94+
throw std::invalid_argument("max_duration must be positive");
14595
}
96+
return this->spin_some_impl(max_duration, true);
97+
}
14698

147-
if (max_duration < 0ns) {
148-
throw std::invalid_argument("max_duration must be positive");
99+
void
100+
EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
101+
{
102+
if (spinning.exchange(true)) {
103+
throw std::runtime_error("spin_some() called while already spinning");
149104
}
150105

151106
RCLCPP_SCOPE_EXIT(this->spinning.store(false););
152107

153-
// In this context a 0 input max_duration means no duration limit
154-
if (std::chrono::nanoseconds(0) == max_duration) {
155-
max_duration = timers_manager_->MAX_TIME;
156-
}
157-
158108
auto start = std::chrono::steady_clock::now();
159109

160110
auto max_duration_not_elapsed = [max_duration, start]() {
161111
auto elapsed_time = std::chrono::steady_clock::now() - start;
162112
return elapsed_time < max_duration;
163113
};
164114

165-
// Execute timer and events until timeout or no more work available
115+
size_t ready_events_at_start = 0;
116+
size_t executed_events = 0;
117+
118+
if (!exhaustive) {
119+
// Get the number of events ready at start
120+
std::unique_lock<std::mutex> lock(push_mutex_);
121+
ready_events_at_start = events_queue_->size();
122+
lock.unlock();
123+
}
124+
166125
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
167-
// Execute first event from queue if it exists
168-
{
126+
// Execute first ready event from queue if exists
127+
if (exhaustive || (executed_events < ready_events_at_start)) {
169128
std::unique_lock<std::mutex> lock(push_mutex_);
170-
171129
bool has_event = !events_queue_->empty();
172130

173131
if (has_event) {
174132
rmw_listener_event_t event = events_queue_->front();
175133
events_queue_->pop();
176134
this->execute_event(event);
135+
executed_events++;
177136
continue;
178137
}
179138
}
180139

181-
// Execute timer, if was ready
182-
if (timers_manager_->execute_head_timer()) {
140+
bool timer_executed;
141+
142+
if (exhaustive) {
143+
// Execute timer if is ready
144+
timer_executed = timers_manager_->execute_head_timer();
145+
} else {
146+
// Execute timer if was ready at start
147+
timer_executed = timers_manager_->execute_head_timer(start);
148+
}
149+
150+
if (timer_executed) {
183151
continue;
184152
}
185153

rclcpp/src/rclcpp/executors/timers_manager.cpp

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ std::chrono::nanoseconds TimersManager::execute_ready_timers()
116116
return this->get_head_timeout_unsafe(timers_heap);
117117
}
118118

119-
bool TimersManager::execute_head_timer()
119+
bool TimersManager::execute_head_timer(
120+
std::chrono::time_point<std::chrono::steady_clock> timepoint)
120121
{
121122
// Do not allow to interfere with the thread running
122123
if (running_) {
@@ -127,13 +128,27 @@ bool TimersManager::execute_head_timer()
127128
std::unique_lock<std::mutex> lock(timers_mutex_);
128129

129130
TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();
131+
130132
// Nothing to do if we don't have any timer
131133
if (timers_heap.empty()) {
132134
return false;
133135
}
134136

135137
TimerPtr head = timers_heap.front();
136-
if (head->is_ready()) {
138+
139+
bool timer_ready = false;
140+
141+
if (timepoint != TimePoint::max()) {
142+
// A ready timer will return a negative duration when calling time_until_trigger
143+
auto timepoint_timer_ready = std::chrono::steady_clock::now() + head->time_until_trigger();
144+
145+
// Here we check if the timer was already ready at timepoint
146+
timer_ready = timepoint_timer_ready < timepoint;
147+
} else {
148+
timer_ready = head->is_ready();
149+
}
150+
151+
if (timer_ready) {
137152
// Head timer is ready, execute
138153
head->execute_callback();
139154
timers_heap.heapify_root();
@@ -230,40 +245,3 @@ void TimersManager::remove_timer(TimerPtr timer)
230245
timers_cv_.notify_one();
231246
}
232247
}
233-
234-
bool TimersManager::execute_head_timer_if_ready_at_tp(
235-
std::chrono::time_point<std::chrono::steady_clock> timepoint)
236-
{
237-
// Do not allow to interfere with the thread running
238-
if (running_) {
239-
throw std::runtime_error(
240-
"TimersManager::execute_head_timer_if_ready_at_tp() can't"
241-
"be used while timers thread is running");
242-
}
243-
244-
std::unique_lock<std::mutex> lock(timers_mutex_);
245-
246-
TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();
247-
248-
// Nothing to do if we don't have any timer
249-
if (timers_heap.empty()) {
250-
return false;
251-
}
252-
253-
TimerPtr head = timers_heap.front();
254-
255-
// A ready timer will return a negative duration when calling time_until_trigger
256-
auto timepoint_timer_ready = std::chrono::steady_clock::now() + head->time_until_trigger();
257-
258-
auto timer_was_ready_at_tp = timepoint_timer_ready < timepoint;
259-
260-
if (timer_was_ready_at_tp) {
261-
// Head timer is ready, execute
262-
head->execute_callback();
263-
timers_heap.heapify_root();
264-
weak_timers_heap_.store(timers_heap);
265-
return true;
266-
} else {
267-
return false;
268-
}
269-
}

rclcpp/test/rclcpp/executors/test_events_executor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ TEST_F(TestEventsExecutor, spin_all_max_duration)
268268
}
269269

270270
EventsExecutor executor;
271+
EXPECT_THROW(executor.spin_all(0ms), std::invalid_argument);
271272
EXPECT_THROW(executor.spin_all(-5ms), std::invalid_argument);
272273
}
273274

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,7 @@ TYPED_TEST(TestExecutorsStable, spinAll) {
501501
// executor.
502502
bool spin_exited = false;
503503
std::thread spinner([&spin_exited, &executor, this]() {
504+
std::this_thread::sleep_for(10ms);
504505
executor.spin_all(1s);
505506
executor.remove_node(this->node, true);
506507
spin_exited = true;

0 commit comments

Comments
 (0)