Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -834,20 +834,21 @@ async def _run_adk_in_background(
app_name: App name
event_queue: Queue for emitting events
"""
runner: Optional[Runner] = None
try:
# Agent is already prepared with tools and SystemMessage instructions (if any)
# from _start_background_execution, so no additional agent copying needed here

# Create runner
runner = self._create_runner(
adk_agent=adk_agent,
user_id=user_id,
app_name=app_name
)

# Create RunConfig
run_config = self._run_config_factory(input)

# Ensure session exists
await self._ensure_session_exists(
app_name, user_id, input.thread_id, input.state
Expand Down Expand Up @@ -994,9 +995,20 @@ async def _run_adk_in_background(
await event_queue.put(None)
finally:
# Background task cleanup completed
# Note: toolset cleanup is handled by garbage collection
# since toolset is now embedded in the agent's tools
pass
# Ensure the ADK runner releases any resources (e.g. toolsets)
if runner is not None:
close_method = getattr(runner, "close", None)
if close_method is not None:
try:
close_result = close_method()
if inspect.isawaitable(close_result):
await close_result
except Exception as close_error:
logger.warning(
"Error while closing ADK runner for thread %s: %s",
input.thread_id,
close_error,
)

async def _cleanup_stale_executions(self):
"""Clean up stale executions."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ async def test_run_basic_flow(self, adk_agent, sample_input, mock_agent):
with patch.object(adk_agent, '_create_runner') as mock_create_runner:
# Create a mock runner
mock_runner = AsyncMock()
mock_runner.close = AsyncMock()
mock_event = Mock()
mock_event.id = "event1"
mock_event.author = "test_agent"
Expand All @@ -139,6 +140,31 @@ async def mock_run_async(*args, **kwargs):
assert len(events) >= 2 # At least RUN_STARTED and RUN_FINISHED
assert events[0].type == EventType.RUN_STARTED
assert events[-1].type == EventType.RUN_FINISHED
mock_runner.close.assert_awaited_once()

@pytest.mark.asyncio
async def test_runner_close_called_on_run_error(self, adk_agent, sample_input):
"""Runner.close should still be awaited when execution errors."""

with patch.object(adk_agent, '_create_runner') as mock_create_runner:
mock_runner = AsyncMock()
mock_runner.close = AsyncMock()

async def failing_run_async(*args, **kwargs):
if False: # pragma: no cover - keep async generator semantics
yield None
raise RuntimeError("boom")

mock_runner.run_async = failing_run_async
mock_create_runner.return_value = mock_runner

events = []
async for event in adk_agent.run(sample_input):
events.append(event)

# Ensure RUN_ERROR emitted and runner closed
assert any(event.type == EventType.RUN_ERROR for event in events)
mock_runner.close.assert_awaited_once()

@pytest.mark.asyncio
async def test_turn_complete_falls_back_to_streaming_translator(
Expand Down
Loading