@@ -60,17 +60,14 @@ EventsExecutor::spin()
6060 // When condition variable is notified, check this predicate to proceed
6161 auto has_event_predicate = [this ]() {return !events_queue_->empty ();};
6262
63- // Local event queue to allow entities to push events while we execute them
64- EventQueue execution_event_queue;
65-
6663 timers_manager_->start ();
6764
6865 while (rclcpp::ok (context_) && spinning.load ()) {
6966 std::unique_lock<std::mutex> push_lock (push_mutex_);
7067 // We wait here until something has been pushed to the event queue
7168 events_queue_cv_.wait (push_lock, has_event_predicate);
7269 // Local event queue to allow entities to push events while we execute them
73- execution_event_queue = events_queue_->get_all_events ();
70+ EventQueue execution_event_queue = events_queue_->get_all_events ();
7471 // Unlock the mutex
7572 push_lock.unlock ();
7673 // Consume all available events, this queue will be empty at the end of the function
@@ -92,33 +89,44 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
9289 max_duration = timers_manager_->MAX_TIME ;
9390 }
9491
95- // This function will wait until the first of the following events occur:
96- // - The input max_duration is elapsed
97- // - A timer triggers
98- // - An executor event is received and processed
92+ auto start = std::chrono::steady_clock::now ();
9993
100- // When condition variable is notified, check this predicate to proceed
101- auto has_event_predicate = [this ]() {return !events_queue_->empty ();};
94+ auto max_duration_not_elapsed = [max_duration, start]() {
95+ auto elapsed_time = std::chrono::steady_clock::now () - start;
96+ return elapsed_time < max_duration;
97+ };
10298
99+ // Execute events until timeout or no more work available
100+ while (max_duration_not_elapsed ()) {
101+ bool work_available = false ;
103102
104- // Select the smallest between input max_duration and timer timeout
105- auto next_timer_timeout = timers_manager_->get_head_timeout ();
106- if (next_timer_timeout < max_duration) {
107- max_duration = next_timer_timeout;
108- }
103+ // Check for ready timers
104+ auto next_timer_timeout = timers_manager_->get_head_timeout ();
105+
106+ bool timer_ready = next_timer_timeout < 0ns;
107+
108+ if (timer_ready) {
109+ timers_manager_->execute_head_timer ();
110+ work_available = true ;
111+ } else {
112+ // Execute first event from queue if it exists
113+ std::unique_lock<std::mutex> lock (push_mutex_);
109114
110- std::unique_lock<std::mutex> push_lock (push_mutex_);
111- // Wait until timeout or event
112- events_queue_cv_.wait_for (push_lock, max_duration, has_event_predicate);
113- // Local event queue to allow entities to push events while we execute them
114- EventQueue execution_event_queue = events_queue_->get_all_events ();
115- // We don't need the lock anymore
116- push_lock.unlock ();
117-
118- // Execute all ready timers
119- timers_manager_->execute_ready_timers ();
120- // Consume all available events, this queue will be empty at the end of the function
121- this ->consume_all_events (execution_event_queue);
115+ bool has_event = !events_queue_->empty ();
116+
117+ if (has_event) {
118+ rmw_listener_event_t event = events_queue_->front ();
119+ events_queue_->pop ();
120+ this ->execute_event (event);
121+ work_available = true ;
122+ }
123+ }
124+
125+ // If there's no more work available, exit
126+ if (!work_available) {
127+ break ;
128+ }
129+ }
122130}
123131
124132void
@@ -136,9 +144,6 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
136144 // When condition variable is notified, check this predicate to proceed
137145 auto has_event_predicate = [this ]() {return !events_queue_->empty ();};
138146
139- // Local event queue to allow entities to push events while we execute them
140- EventQueue execution_event_queue;
141-
142147 auto start = std::chrono::steady_clock::now ();
143148 auto max_duration_not_elapsed = [max_duration, start]() {
144149 auto elapsed_time = std::chrono::steady_clock::now () - start;
@@ -162,7 +167,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
162167 // Keep executing until no more work to do or timeout expired
163168 while (rclcpp::ok (context_) && spinning.load () && max_duration_not_elapsed ()) {
164169 std::unique_lock<std::mutex> push_lock (push_mutex_);
165- execution_event_queue = events_queue_->get_all_events ();
170+ EventQueue execution_event_queue = events_queue_->get_all_events ();
166171 push_lock.unlock ();
167172
168173 // Exit if there is no more work to do
0 commit comments