Skip to content

Commit 2508a9b

Browse files
authored
fix: gracefully handle task exceptions in event consumer (#383)
# Description This callback shouldn't result in exceptions being raised. From docs on `.exception()`: > The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn't done yet, raises InvalidStateError. Currently, if a task has been cancelled, exceptions are thrown. E.g. the following error was observed when used with `google-adk` ``` ERROR:asyncio:Exception in callback EventConsumer.agent_task_callback() at /app/python/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py:153 handle: <Handle EventConsumer.agent_task_callback() at /app/python/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py:153> Traceback (most recent call last): File "/app/python/.venv/lib/python3.13/site-packages/anyio/streams/memory.py", line 111, in receive return self.receive_nowait() ~~~~~~~~~~~~~~~~~~~^^ File "/app/python/.venv/lib/python3.13/site-packages/anyio/streams/memory.py", line 106, in receive_nowait raise WouldBlock anyio.WouldBlock During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/app/python/.local/share/uv/python/cpython-3.13.5-linux-x86_64-gnu/lib/python3.13/asyncio/events.py", line 89, in _run self._context.run(self._callback, *self._args) ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/python/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 202, in sync_wrapper result = func(*args, **kwargs) File "/app/python/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 163, in agent_task_callback if agent_task.exception() is not None: ~~~~~~~~~~~~~~~~~~~~^^ File "/app/python/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper result = await func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/python/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 172, in _run_event_stream await self.agent_executor.execute(request, queue) File "/app/python/packages/kagent-adk/src/kagent_adk/_agent_executor.py", line 124, in execute await self._handle_request(context, event_queue, runner) File "/app/python/packages/kagent-adk/src/kagent_adk/_agent_executor.py", line 188, in _handle_request async for adk_event in runner.run_async(**run_args): ...<4 lines>... await event_queue.enqueue_event(a2a_event) File "/app/python/.venv/lib/python3.13/site-packages/google/adk/runners.py", line 233, in run_async async for event in self._exec_with_plugin( ...<2 lines>... yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/runners.py", line 273, in _exec_with_plugin async for event in execute_fn(invocation_context): ...<6 lines>... yield (modified_event if modified_event else event) File "/app/python/.venv/lib/python3.13/site-packages/google/adk/runners.py", line 230, in execute async for event in ctx.agent.run_async(ctx): yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/agents/base_agent.py", line 209, in run_async async for event in self._run_async_impl(ctx): yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/agents/llm_agent.py", line 283, in _run_async_impl async for event in self._llm_flow.run_async(ctx): self.__maybe_save_output_to_state(event) yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 290, in run_async async for event in self._run_one_step_async(invocation_context): last_event = event yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 322, in _run_one_step_async async for event in self._postprocess_async( ...<5 lines>... yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 407, in _postprocess_async async for event in self._postprocess_handle_function_calls_async( ...<2 lines>... yield event File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 482, in _postprocess_handle_function_calls_async if function_response_event := await functions.handle_function_calls_async( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ invocation_context, function_call_event, llm_request.tools_dict ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ): ^ File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/functions.py", line 179, in handle_function_calls_async function_response = await __call_tool_async( ^^^^^^^^^^^^^^^^^^^^^^^^ tool, args=function_args, tool_context=tool_context ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/functions.py", line 474, in __call_tool_async return await tool.run_async(args=args, tool_context=tool_context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/python/.venv/lib/python3.13/site-packages/google/adk/tools/base_authenticated_tool.py", line 93, in run_async return await self._run_async_impl( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<3 lines>... ) ^ File "/app/python/.venv/lib/python3.13/site-packages/google/adk/tools/mcp_tool/mcp_session_manager.py", line 128, in wrapper return await func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/python/.venv/lib/python3.13/site-packages/google/adk/tools/mcp_tool/mcp_tool.py", line 133, in _run_async_impl response = await session.call_tool(self.name, arguments=args) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/python/.venv/lib/python3.13/site-packages/mcp/client/session.py", line 293, in call_tool result = await self.send_request( ^^^^^^^^^^^^^^^^^^^^^^^^ ...<12 lines>... ) ^ File "/app/python/.venv/lib/python3.13/site-packages/mcp/shared/session.py", line 272, in send_request response_or_error = await response_stream_reader.receive() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/python/.venv/lib/python3.13/site-packages/anyio/streams/memory.py", line 119, in receive await receive_event.wait() File "/app/python/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1774, in wait await self._event.wait() File "/app/python/.local/share/uv/python/cpython-3.13.5-linux-x86_64-gnu/lib/python3.13/asyncio/locks.py", line 213, in wait await fut asyncio.exceptions.CancelledError: Cancelled by cancel scope 72d19ee594f0 ```
1 parent b6796b9 commit 2508a9b

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,5 +160,5 @@ def agent_task_callback(self, agent_task: asyncio.Task[None]) -> None:
160160
agent_task: The asyncio.Task that completed.
161161
"""
162162
logger.debug('Agent task callback triggered.')
163-
if agent_task.exception() is not None:
163+
if not agent_task.cancelled() and agent_task.done():
164164
self._exception = agent_task.exception()

tests/server/events/test_event_consumer.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,18 +327,22 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
327327
def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
328328
"""Test that agent_task_callback sets _exception if the task had one."""
329329
mock_task = MagicMock(spec=asyncio.Task)
330+
mock_task.cancelled.return_value = False
331+
mock_task.done.return_value = True
330332
sample_exception = ValueError('Task failed')
331333
mock_task.exception.return_value = sample_exception
332334

333335
event_consumer.agent_task_callback(mock_task)
334336

335337
assert event_consumer._exception == sample_exception
336-
# mock_task.exception.assert_called_once() # Removing this, as exception() might be called internally by the check
338+
mock_task.exception.assert_called_once()
337339

338340

339341
def test_agent_task_callback_no_exception(event_consumer: EventConsumer):
340342
"""Test that agent_task_callback does nothing if the task has no exception."""
341343
mock_task = MagicMock(spec=asyncio.Task)
344+
mock_task.cancelled.return_value = False
345+
mock_task.done.return_value = True
342346
mock_task.exception.return_value = None # No exception
343347

344348
event_consumer.agent_task_callback(mock_task)
@@ -347,6 +351,34 @@ def test_agent_task_callback_no_exception(event_consumer: EventConsumer):
347351
mock_task.exception.assert_called_once()
348352

349353

354+
def test_agent_task_callback_cancelled_task(event_consumer: EventConsumer):
355+
"""Test that agent_task_callback does nothing if the task has no exception."""
356+
mock_task = MagicMock(spec=asyncio.Task)
357+
mock_task.cancelled.return_value = True
358+
mock_task.done.return_value = True
359+
sample_exception = ValueError('Task still running')
360+
mock_task.exception.return_value = sample_exception
361+
362+
event_consumer.agent_task_callback(mock_task)
363+
364+
assert event_consumer._exception is None # Should remain None
365+
mock_task.exception.assert_not_called()
366+
367+
368+
def test_agent_task_callback_not_done_task(event_consumer: EventConsumer):
369+
"""Test that agent_task_callback does nothing if the task has no exception."""
370+
mock_task = MagicMock(spec=asyncio.Task)
371+
mock_task.cancelled.return_value = False
372+
mock_task.done.return_value = False
373+
sample_exception = ValueError('Task is cancelled')
374+
mock_task.exception.return_value = sample_exception
375+
376+
event_consumer.agent_task_callback(mock_task)
377+
378+
assert event_consumer._exception is None # Should remain None
379+
mock_task.exception.assert_not_called()
380+
381+
350382
@pytest.mark.asyncio
351383
async def test_consume_all_handles_validation_error(
352384
event_consumer: EventConsumer, mock_event_queue: AsyncMock

0 commit comments

Comments
 (0)