Skip to content

Commit 0087392

Browse files
author
Mauro Passerino
committed
rework spin_some & spin_all
1 parent 76a833d commit 0087392

File tree

7 files changed

+138
-49
lines changed

7 files changed

+138
-49
lines changed

rclcpp/include/rclcpp/executors/events_executor.hpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,10 @@ class EventsExecutor : public rclcpp::Executor
8282

8383
/// Events executor implementation of spin some
8484
/**
85-
* This function will execute ready timers and events
86-
* until timeout or no more work available.
85+
* This non-blocking function will execute the timers and events
86+
* that were ready when this API was called, until timeout or no
87+
* more work available. New ready-timers/events arrived while
88+
* executing work, won't be taken into account here.
8789
*
8890
* Example:
8991
* while(condition) {
@@ -96,6 +98,20 @@ class EventsExecutor : public rclcpp::Executor
9698
void
9799
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override;
98100

101+
/// Events executor implementation of spin all
102+
/**
103+
* This non-blocking function will execute timers and events
104+
* until timeout or no more work available. If new ready-timers/events
105+
* arrive while executing work available, they will be executed
106+
* as long as the timeout hasn't expired.
107+
*
108+
* Example:
109+
* while(condition) {
110+
* spin_all();
111+
* sleep(); // User should have some sync work or
112+
* // sleep to avoid a 100% CPU usage
113+
* }
114+
*/
99115
RCLCPP_PUBLIC
100116
void
101117
spin_all(std::chrono::nanoseconds max_duration) override;

rclcpp/include/rclcpp/executors/timers_manager.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ class TimersManager
127127
*/
128128
void remove_timer(rclcpp::TimerBase::SharedPtr timer);
129129

130+
/**
131+
* @brief Executes head timer if ready at time point.
132+
* @param tp the timepoint to check for
133+
* @return true if head timer was ready at tp
134+
*/
135+
bool execute_head_timer_if_ready_at_tp(
136+
std::chrono::time_point<std::chrono::steady_clock> tp);
137+
130138
// This is what the TimersManager uses to denote a duration forever.
131139
// We don't use std::chrono::nanoseconds::max because it will overflow.
132140
// See https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for

rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ class EventsQueue
8383
bool
8484
empty() const = 0;
8585

86+
/**
87+
* @brief Returns the number of elements in the queue.
88+
* @return the number of elements in the queue.
89+
*/
90+
RCLCPP_PUBLIC
91+
virtual
92+
size_t
93+
size() const = 0;
94+
8695
/**
8796
* @brief Initializes the queue
8897
*/

rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,18 @@ class SimpleEventsQueue : public EventsQueue
8989
return event_queue_.empty();
9090
}
9191

92+
/**
93+
* @brief Returns the number of elements in the queue.
94+
* @return the number of elements in the queue.
95+
*/
96+
RCLCPP_PUBLIC
97+
virtual
98+
size_t
99+
size() const
100+
{
101+
return event_queue_.size();
102+
}
103+
92104
/**
93105
* @brief Initializes the queue
94106
*/

rclcpp/src/rclcpp/executors/events_executor.cpp

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -96,90 +96,95 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
9696
return elapsed_time < max_duration;
9797
};
9898

99-
// Execute events until timeout or no more work available
100-
while (max_duration_not_elapsed()) {
101-
bool work_available = false;
99+
// Get the number of events ready at this time point
100+
std::unique_lock<std::mutex> lock(push_mutex_);
101+
size_t available_events_at_tp = events_queue_->size();
102+
lock.unlock();
102103

103-
// Check for ready timers
104-
auto next_timer_timeout = timers_manager_->get_head_timeout();
104+
size_t executed_events = 0;
105105

106-
bool timer_ready = next_timer_timeout < 0ns;
106+
// Checks if all events that were ready when spin_some was called, were executed.
107+
auto executed_ready_events_at_tp = [executed_events, available_events_at_tp]() {
108+
return executed_events >= available_events_at_tp;
109+
};
107110

108-
if (timer_ready) {
109-
timers_manager_->execute_head_timer();
110-
work_available = true;
111-
} else {
112-
// Execute first event from queue if it exists
111+
// Execute events and timers ready when spin_some was called,
112+
// until timeout or no more work available.
113+
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
114+
// Execute first event from queue if it exists
115+
if (!executed_ready_events_at_tp()) {
113116
std::unique_lock<std::mutex> lock(push_mutex_);
114117

115118
bool has_event = !events_queue_->empty();
116119

117120
if (has_event) {
118121
rmw_listener_event_t event = events_queue_->front();
119122
events_queue_->pop();
123+
// std::cout << "Execute event" << std::endl;
120124
this->execute_event(event);
121-
work_available = true;
125+
executed_events++;
126+
continue;
122127
}
123128
}
124129

125-
// If there's no more work available, exit
126-
if (!work_available) {
127-
break;
130+
// Execute timer, if was ready at start
131+
if (timers_manager_->execute_head_timer_if_ready_at_tp(start)) {
132+
continue;
128133
}
134+
135+
// If there's no more work available, exit
136+
break;
129137
}
130138
}
131139

132140
void
133141
EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
134142
{
135-
if (max_duration <= 0ns) {
136-
throw std::invalid_argument("max_duration must be positive");
143+
if (spinning.exchange(true)) {
144+
throw std::runtime_error("spin_all() called while already spinning");
137145
}
138146

139-
if (spinning.exchange(true)) {
140-
throw std::runtime_error("spin_some() called while already spinning");
147+
if (max_duration < 0ns) {
148+
throw std::invalid_argument("max_duration must be positive");
141149
}
150+
142151
RCLCPP_SCOPE_EXIT(this->spinning.store(false););
143152

144-
// When condition variable is notified, check this predicate to proceed
145-
auto has_event_predicate = [this]() {return !events_queue_->empty();};
153+
// In this context a 0 input max_duration means no duration limit
154+
if (std::chrono::nanoseconds(0) == max_duration) {
155+
max_duration = timers_manager_->MAX_TIME;
156+
}
146157

147158
auto start = std::chrono::steady_clock::now();
159+
148160
auto max_duration_not_elapsed = [max_duration, start]() {
149161
auto elapsed_time = std::chrono::steady_clock::now() - start;
150162
return elapsed_time < max_duration;
151163
};
152164

153-
// Select the smallest between input max duration and timer timeout
154-
auto next_timer_timeout = timers_manager_->get_head_timeout();
155-
if (next_timer_timeout < max_duration) {
156-
max_duration = next_timer_timeout;
157-
}
158-
159-
{
160-
// Wait once until timeout or event
161-
std::unique_lock<std::mutex> push_lock(push_mutex_);
162-
events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate);
163-
}
165+
// Execute timer and events until timeout or no more work available
166+
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
167+
// Execute first event from queue if it exists
168+
{
169+
std::unique_lock<std::mutex> lock(push_mutex_);
164170

165-
auto timeout = timers_manager_->get_head_timeout();
171+
bool has_event = !events_queue_->empty();
166172

167-
// Keep executing until no more work to do or timeout expired
168-
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
169-
std::unique_lock<std::mutex> push_lock(push_mutex_);
170-
EventQueue execution_event_queue = events_queue_->get_all_events();
171-
push_lock.unlock();
173+
if (has_event) {
174+
rmw_listener_event_t event = events_queue_->front();
175+
events_queue_->pop();
176+
this->execute_event(event);
177+
continue;
178+
}
179+
}
172180

173-
// Exit if there is no more work to do
174-
const bool ready_timer = timeout < 0ns;
175-
const bool has_events = !execution_event_queue.empty();
176-
if (!ready_timer && !has_events) {
177-
break;
181+
// Execute timer, if was ready
182+
if (timers_manager_->execute_head_timer()) {
183+
continue;
178184
}
179185

180-
// Execute all ready work
181-
timeout = timers_manager_->execute_ready_timers();
182-
this->consume_all_events(execution_event_queue);
186+
// If there's no more work available, exit
187+
break;
183188
}
184189
}
185190

rclcpp/src/rclcpp/executors/timers_manager.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,40 @@ void TimersManager::remove_timer(TimerPtr timer)
230230
timers_cv_.notify_one();
231231
}
232232
}
233+
234+
bool TimersManager::execute_head_timer_if_ready_at_tp(
235+
std::chrono::time_point<std::chrono::steady_clock> timepoint)
236+
{
237+
// Do not allow to interfere with the thread running
238+
if (running_) {
239+
throw std::runtime_error(
240+
"TimersManager::execute_head_timer_if_ready_at_tp() can't"
241+
"be used while timers thread is running");
242+
}
243+
244+
std::unique_lock<std::mutex> lock(timers_mutex_);
245+
246+
TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();
247+
248+
// Nothing to do if we don't have any timer
249+
if (timers_heap.empty()) {
250+
return false;
251+
}
252+
253+
TimerPtr head = timers_heap.front();
254+
255+
// A ready timer will return a negative duration when calling time_until_trigger
256+
auto timepoint_timer_ready = std::chrono::steady_clock::now() + head->time_until_trigger();
257+
258+
auto timer_was_ready_at_tp = timepoint_timer_ready < timepoint;
259+
260+
if (timer_was_ready_at_tp) {
261+
// Head timer is ready, execute
262+
head->execute_callback();
263+
timers_heap.heapify_root();
264+
weak_timers_heap_.store(timers_heap);
265+
return true;
266+
} else {
267+
return false;
268+
}
269+
}

rclcpp/test/rclcpp/executors/test_events_executor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ TEST_F(TestEventsExecutor, spin_all_max_duration)
254254
t_runs++;
255255
});
256256

257+
// Sleep some time for the timer to be ready when spin
258+
std::this_thread::sleep_for(10ms);
259+
257260
EventsExecutor executor;
258261
executor.add_node(node);
259262

@@ -265,7 +268,6 @@ TEST_F(TestEventsExecutor, spin_all_max_duration)
265268
}
266269

267270
EventsExecutor executor;
268-
EXPECT_THROW(executor.spin_all(0ms), std::invalid_argument);
269271
EXPECT_THROW(executor.spin_all(-5ms), std::invalid_argument);
270272
}
271273

0 commit comments

Comments
 (0)