Skip to content

Commit 8ca3bda

Browse files
committed
Optimize EventManager waiting logic and error handling
1 parent f603ab7 commit 8ca3bda

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

agents-core/vision_agents/core/events/manager.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def __init__(self, ignore_unknown_events: bool = True):
142142
self._shutdown = False
143143
self._silent_events: set[type] = set()
144144
self._handler_tasks: Dict[uuid.UUID, asyncio.Task[Any]] = {}
145+
self._received_event = asyncio.Event()
145146

146147
self.register(ExceptionEvent)
147148
self.register(HealthCheckEvent)
@@ -213,6 +214,7 @@ def merge(self, em: "EventManager"):
213214
em._queue = self._queue
214215
em._silent_events = self._silent_events
215216
em._processing_task = None # Clear the stopped task reference
217+
em._received_event = self._received_event
216218

217219
def register_events_from_module(
218220
self, module, prefix="", ignore_not_compatible=True
@@ -467,6 +469,8 @@ def send(self, *events):
467469
if event:
468470
self._queue.append(event)
469471

472+
self._received_event.set()
473+
470474
async def wait(self, timeout: float = 10.0):
471475
"""
472476
Wait for all queued events to be processed.
@@ -486,7 +490,7 @@ async def wait(self, timeout: float = 10.0):
486490
def _start_processing_task(self):
487491
"""Start the background event processing task."""
488492
if self._processing_task and not self._processing_task.done():
489-
return # Already running
493+
return
490494

491495
loop = asyncio.get_running_loop()
492496
self._processing_task = loop.create_task(self._process_events_loop())
@@ -521,13 +525,15 @@ async def _process_events_loop(self):
521525
)
522526
for task_id in cleanup_ids:
523527
self._handler_tasks.pop(task_id)
524-
await asyncio.sleep(0.0001)
528+
529+
await self._received_event.wait()
530+
self._received_event.clear()
525531

526532
async def _run_handler(self, handler, event):
527533
try:
528534
return await handler(event)
529535
except Exception as exc:
530-
self._queue.appendleft(ExceptionEvent(exc, handler)) # type: ignore[arg-type]
536+
self.send(ExceptionEvent(exc, handler))
531537
module_name = getattr(handler, "__module__", "unknown")
532538
logger.exception(
533539
f"Error calling handler {handler.__name__} from {module_name} for event {event.type}"
@@ -536,10 +542,9 @@ async def _run_handler(self, handler, event):
536542
async def _process_single_event(self, event):
537543
"""Process a single event."""
538544
for handler in self._handlers.get(event.type, []):
539-
#module_name = getattr(handler, '__module__', 'unknown')
545+
module_name = getattr(handler, '__module__', 'unknown')
540546
if event.type not in self._silent_events:
541-
pass
542-
#logger.info(f"Called handler {handler.__name__} from {module_name} for event {event.type}")
547+
logger.debug(f"Called handler {handler.__name__} from {module_name} for event {event.type}")
543548

544549
loop = asyncio.get_running_loop()
545550
handler_task = loop.create_task(self._run_handler(handler, event))

0 commit comments

Comments
 (0)