@@ -183,7 +183,7 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
183183 // When condition variable is notified, check this predicate to proceed
184184 auto has_event_predicate = [this ]() {return !events_queue_->empty ();};
185185
186- ExecutorEvent event ;
186+ ExecutorEvent single_event ;
187187 bool has_event = false ;
188188
189189 {
@@ -194,15 +194,39 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
194194 // Grab first event from queue if it exists
195195 has_event = !events_queue_->empty ();
196196 if (has_event) {
197- event = events_queue_->front ();
198- events_queue_->pop ();
197+ // If the event has num_events > 1, we have to only execute a single event and
198+ // decrement the event counter, leaving the event in the queue for future processing.
199+ // But we can't modify an element from a temporary object (the queue is a shared_ptr)
200+ // so, we need a local copy to manipulate and then update the original queue.
201+ std::queue<ExecutorEvent> local_events_queue = events_queue_->pop_all_events ();
202+
203+ ExecutorEvent & front_event = local_events_queue.front ();
204+
205+ if (front_event.num_events > 1 ) {
206+ // Decrement the counter by one, keeping the event in the front.
207+ front_event.num_events --;
208+ } else {
209+ // We have a single event, pop it from queue.
210+ local_events_queue.pop ();
211+ }
212+
213+ // Make sure we only execute a single event
214+ single_event = front_event;
215+ single_event.num_events = 1 ;
216+
217+ // Update the global queue
218+ while (!local_events_queue.empty ()) {
219+ ExecutorEvent event = local_events_queue.front ();
220+ local_events_queue.pop ();
221+ events_queue_->push (event);
222+ }
199223 }
200224 }
201225
202226 // If we wake up from the wait with an event, it means that it
203227 // arrived before any of the timers expired.
204228 if (has_event) {
205- this ->execute_event (event );
229+ this ->execute_event (single_event );
206230 } else {
207231 timers_manager_->execute_head_timer ();
208232 }
@@ -254,7 +278,9 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
254278 auto subscription = entities_collector_->get_subscription (event.entity_id );
255279
256280 if (subscription) {
257- execute_subscription (subscription);
281+ for (size_t i = 0 ; i < event.num_events ; i++) {
282+ execute_subscription (subscription);
283+ }
258284 }
259285 break ;
260286 }
@@ -264,7 +290,9 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
264290 auto service = entities_collector_->get_service (event.entity_id );
265291
266292 if (service) {
267- execute_service (service);
293+ for (size_t i = 0 ; i < event.num_events ; i++) {
294+ execute_service (service);
295+ }
268296 }
269297 break ;
270298 }
@@ -274,7 +302,9 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
274302 auto client = entities_collector_->get_client (event.entity_id );
275303
276304 if (client) {
277- execute_client (client);
305+ for (size_t i = 0 ; i < event.num_events ; i++) {
306+ execute_client (client);
307+ }
278308 }
279309 break ;
280310 }
@@ -284,8 +314,10 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
284314 auto waitable = entities_collector_->get_waitable (event.entity_id );
285315
286316 if (waitable) {
287- auto data = waitable->take_data ();
288- waitable->execute (data);
317+ for (size_t i = 0 ; i < event.num_events ; i++) {
318+ auto data = waitable->take_data ();
319+ waitable->execute (data);
320+ }
289321 }
290322 break ;
291323 }
0 commit comments