Skip to content

Commit 7119604

Browse files
committed
test: add comprehensive tests for EventQueue.close immediate parameter and clear_events method
- Add tests for close(immediate=True) behavior and child queue propagation - Add tests for clear_events() with various scenarios (empty, closed, with children) - Add tests for concurrent execution and exception handling in clear_events - Remove redundant test_close_with_immediate_false as existing tests already cover this path - Fix linter errors in Python version-specific test mocks Ensures proper test coverage for the new queue shutdown and cleanup functionality.
1 parent 2af22c3 commit 7119604

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

src/a2a/server/events/event_queue.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ async def close(self, immediate: bool = False) -> None:
156156
else:
157157
if immediate:
158158
await self.clear_events(True)
159+
for child in self._children:
160+
await child.close(immediate)
159161
return
160162
tasks = [asyncio.create_task(self.queue.join())]
161163
for child in self._children:

tests/server/events/test_event_queue.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,130 @@ async def test_is_closed_reflects_state(event_queue: EventQueue) -> None:
355355
await event_queue.close()
356356

357357
assert event_queue.is_closed() is True # Closed after calling close()
358+
359+
360+
@pytest.mark.asyncio
361+
async def test_close_with_immediate_true(event_queue: EventQueue) -> None:
362+
"""Test close with immediate=True clears events immediately."""
363+
# Add some events to the queue
364+
event1 = Message(**MESSAGE_PAYLOAD)
365+
event2 = Task(**MINIMAL_TASK)
366+
await event_queue.enqueue_event(event1)
367+
await event_queue.enqueue_event(event2)
368+
369+
# Verify events are in queue
370+
assert not event_queue.queue.empty()
371+
372+
# Close with immediate=True
373+
await event_queue.close(immediate=True)
374+
375+
# Verify queue is closed and empty
376+
assert event_queue.is_closed() is True
377+
assert event_queue.queue.empty()
378+
379+
380+
@pytest.mark.asyncio
381+
async def test_close_immediate_propagates_to_children(
382+
event_queue: EventQueue,
383+
) -> None:
384+
"""Test that immediate parameter is propagated to child queues."""
385+
386+
child_queue = event_queue.tap()
387+
388+
# Add events to both parent and child
389+
event = Message(**MESSAGE_PAYLOAD)
390+
await event_queue.enqueue_event(event)
391+
392+
assert child_queue.is_closed() is False
393+
assert child_queue.queue.empty() is False
394+
395+
# close event queue
396+
await event_queue.close(immediate=True)
397+
398+
# Verify child queue was called and empty with immediate=True
399+
assert child_queue.is_closed() is True
400+
assert child_queue.queue.empty()
401+
402+
403+
@pytest.mark.asyncio
404+
async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None:
405+
"""Test clear_events clears only the current queue when clear_child_queues=False."""
406+
407+
child_queue = event_queue.tap()
408+
event1 = Message(**MESSAGE_PAYLOAD)
409+
event2 = Task(**MINIMAL_TASK)
410+
await event_queue.enqueue_event(event1)
411+
await event_queue.enqueue_event(event2)
412+
413+
# Clear only parent queue
414+
await event_queue.clear_events(clear_child_queues=False)
415+
416+
# Verify parent queue is empty
417+
assert event_queue.queue.empty()
418+
419+
# Verify child queue still has its event
420+
assert not child_queue.queue.empty()
421+
assert child_queue.is_closed() is False
422+
423+
dequeued_child_event = await child_queue.dequeue_event(no_wait=True)
424+
assert dequeued_child_event == event1
425+
426+
427+
@pytest.mark.asyncio
428+
async def test_clear_events_with_children(event_queue: EventQueue) -> None:
429+
"""Test clear_events clears both current queue and child queues."""
430+
431+
# Create child queues and add events
432+
child_queue1 = event_queue.tap()
433+
child_queue2 = event_queue.tap()
434+
435+
# Add events to parent queue
436+
event1 = Message(**MESSAGE_PAYLOAD)
437+
event2 = Task(**MINIMAL_TASK)
438+
await event_queue.enqueue_event(event1)
439+
await event_queue.enqueue_event(event2)
440+
441+
# Clear all queues
442+
await event_queue.clear_events(clear_child_queues=True)
443+
444+
# Verify all queues are empty
445+
assert event_queue.queue.empty()
446+
assert child_queue1.queue.empty()
447+
assert child_queue2.queue.empty()
448+
449+
450+
@pytest.mark.asyncio
451+
async def test_clear_events_empty_queue(event_queue: EventQueue) -> None:
452+
"""Test clear_events works correctly with empty queue."""
453+
# Verify queue is initially empty
454+
assert event_queue.queue.empty()
455+
456+
# Clear events from empty queue
457+
await event_queue.clear_events()
458+
459+
# Verify queue remains empty
460+
assert event_queue.queue.empty()
461+
462+
463+
@pytest.mark.asyncio
464+
async def test_clear_events_closed_queue(event_queue: EventQueue) -> None:
465+
"""Test clear_events works correctly with closed queue."""
466+
# Add events and close queue
467+
468+
with patch('sys.version_info', (3, 12, 0)): # Simulate older Python
469+
# Mock queue.join as it's called in older versions
470+
event_queue.queue.join = AsyncMock()
471+
472+
event = Message(**MESSAGE_PAYLOAD)
473+
await event_queue.enqueue_event(event)
474+
await event_queue.close()
475+
476+
# Verify queue is closed but not empty
477+
assert event_queue.is_closed() is True
478+
assert not event_queue.queue.empty()
479+
480+
# Clear events from closed queue
481+
await event_queue.clear_events()
482+
483+
# Verify queue is now empty
484+
assert event_queue.queue.empty()

0 commit comments

Comments
 (0)