Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5475f4d
Fix Claude Sonnet 4 streaming detection issue (#400)
contextablemark Oct 2, 2025
603e294
Merge branch 'ag-ui-protocol:main' into fix/issue-400-clean
contextablemark Oct 2, 2025
67d3fe5
test: ensure partial final chunks use streaming translation
contextablemark Oct 3, 2025
ed8d02c
Merge pull request #79 from Contextable/codex/add-asyncio-test-for-ad…
contextablemark Oct 3, 2025
2b47630
test: cover turn complete fallback in ADK agent
contextablemark Oct 3, 2025
d7e2fb9
fix: correct event type in partial final chunk test
contextablemark Oct 3, 2025
b76bcde
Merge branch 'fix/issue-400-clean' into codex/add-async-test-for-even…
contextablemark Oct 3, 2025
e38aaf0
Merge pull request #80 from Contextable/codex/add-async-test-for-even…
contextablemark Oct 3, 2025
014d05b
Add test for streaming finish reason fallback
contextablemark Oct 3, 2025
660b564
Fix test_streaming_finish_reason_fallback: set is_final_response=Fals…
contextablemark Oct 3, 2025
4a51bff
fix(e2e): resolve flaky haiku display test by checking only last haiku
contextablemark Oct 4, 2025
748fec6
Update dojo-e2e.yml
contextablemark Oct 4, 2025
0bbcd9e
Reverting Workload Identity Federation
contextablemark Oct 4, 2025
81f057c
Re-adding linefeed so the file matches up exactly.
contextablemark Oct 4, 2025
c2db02d
test: update dojo-e2e workflow
contextablemark Oct 5, 2025
d8fadcc
Re-adding temporary removals.
contextablemark Oct 5, 2025
812ab7f
fix: properly close streaming messages when finish_reason is present
contextablemark Oct 5, 2025
eb79c97
tests: make function-call detection assertion semantic
contextablemark Oct 5, 2025
4dde1b0
tests: align EventTranslator streaming expectations
contextablemark Oct 5, 2025
0479cb6
tests: reconcile EventTranslator comprehensive expectations
contextablemark Oct 5, 2025
769ff0f
ADK middleware: prefer LRO routing and harden translator; add tests
contextablemark Oct 5, 2025
a61605a
fix: restore LRO routing guard and streaming tests
contextablemark Oct 5, 2025
01cab61
tests: stabilize ToolBaseGenUIPage haiku comparison
contextablemark Oct 5, 2025
5aca8b3
test(adk): restore SystemMessage between tests
contextablemark Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 41 additions & 28 deletions typescript-sdk/apps/dojo/e2e/featurePages/ToolBaseGenUIPage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,42 +73,55 @@ export class ToolBaseGenUIPage {
}

async extractMainDisplayHaikuContent(page: Page): Promise<string> {
const mainDisplayLines = page.locator('[data-testid="main-haiku-line"]');
const mainCount = await mainDisplayLines.count();
const lines: string[] = [];

if (mainCount > 0) {
for (let i = 0; i < mainCount; i++) {
const haikuLine = mainDisplayLines.nth(i);
const japaneseText = await haikuLine.locator('p').first().innerText();
lines.push(japaneseText);
const activeCard = page.locator('[data-testid="main-haiku-display"].active').last();

try {
await activeCard.waitFor({ state: 'visible', timeout: 5000 });
} catch (error) {
// Fallback to any visible haiku lines if the active card isn't available yet
const fallbackLines = page.locator('[data-testid="main-haiku-line"]');
const fallbackCount = await fallbackLines.count();
if (fallbackCount === 0) {
return '';
}
}

const mainHaikuContent = lines.join('').replace(/\s/g, '');
return mainHaikuContent;
}

async checkHaikuDisplay(page: Page): Promise<void> {
const chatHaikuContent = await this.extractChatHaikuContent(page);
const fallbackLineTexts: string[] = [];
for (let i = 0; i < fallbackCount; i++) {
const fallbackLine = fallbackLines.nth(i);
const japaneseText = await fallbackLine.locator('p').first().innerText();
fallbackLineTexts.push(japaneseText);
}

await page.waitForTimeout(5000);
return fallbackLineTexts.join('').replace(/\s/g, '');
}

const mainHaikuContent = await this.extractMainDisplayHaikuContent(page);
const mainDisplayLines = activeCard.locator('[data-testid="main-haiku-line"]');
const count = await mainDisplayLines.count();
if (count === 0) {
return '';
}

if (mainHaikuContent === '') {
expect(chatHaikuContent.length).toBeGreaterThan(0);
return;
const lines: string[] = [];
for (let i = 0; i < count; i++) {
const haikuLine = mainDisplayLines.nth(i);
const japaneseText = await haikuLine.locator('p').first().innerText();
lines.push(japaneseText);
}

if (chatHaikuContent === mainHaikuContent) {
expect(mainHaikuContent).toBe(chatHaikuContent);
} else {
await page.waitForTimeout(3000);
return lines.join('').replace(/\s/g, '');
}

const updatedMainContent = await this.extractMainDisplayHaikuContent(page);
async checkHaikuDisplay(page: Page): Promise<void> {
const chatHaikuContent = await this.extractChatHaikuContent(page);

expect(updatedMainContent).toBe(chatHaikuContent);
}
await expect
.poll(async () => {
const content = await this.extractMainDisplayHaikuContent(page);
return content;
}, {
timeout: 10000,
message: 'Main display did not match the latest chat haiku',
})
.toBe(chatHaikuContent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,64 @@
This middleware enables Google ADK agents to be used with the AG-UI protocol.
"""

from __future__ import annotations

import logging
import os
from typing import Dict, Iterable

from .adk_agent import ADKAgent
from .event_translator import EventTranslator
from .session_manager import SessionManager
from .endpoint import add_adk_fastapi_endpoint, create_adk_app

__all__ = ['ADKAgent', 'add_adk_fastapi_endpoint', 'create_adk_app', 'EventTranslator', 'SessionManager']

__version__ = "0.1.0"
__version__ = "0.1.0"


def _configure_logging_from_env() -> None:
"""Configure component loggers based on environment variables."""

root_level = os.getenv('LOG_ROOT_LEVEL')
if root_level:
try:
level = getattr(logging, root_level.upper())
except AttributeError:
logging.getLogger(__name__).warning(
"Invalid LOG_ROOT_LEVEL value '%s'", root_level
)
else:
logging.basicConfig(level=level, force=True)

component_levels: Dict[str, Iterable[str]] = {
'LOG_ADK_AGENT': ('ag_ui_adk.adk_agent',),
'LOG_EVENT_TRANSLATOR': (
'ag_ui_adk.event_translator',
'event_translator',
),
'LOG_ENDPOINT': ('ag_ui_adk.endpoint', 'endpoint'),
'LOG_SESSION_MANAGER': (
'ag_ui_adk.session_manager',
'session_manager',
),
}

for env_var, logger_names in component_levels.items():
level_name = os.getenv(env_var)
if not level_name:
continue

try:
level = getattr(logging, level_name.upper())
except AttributeError:
logging.getLogger(__name__).warning(
"Invalid value '%s' for %s", level_name, env_var
)
continue

for logger_name in logger_names:
logging.getLogger(logger_name).setLevel(level)


_configure_logging_from_env()
Original file line number Diff line number Diff line change
Expand Up @@ -916,19 +916,48 @@ async def _run_adk_in_background(
final_response = adk_event.is_final_response()
has_content = adk_event.content and hasattr(adk_event.content, 'parts') and adk_event.content.parts

if not final_response or (not adk_event.usage_metadata and has_content):
# Translate and emit events
# Check if this is a streaming chunk that needs regular processing
is_streaming_chunk = (
getattr(adk_event, 'partial', False) or # Explicitly marked as partial
(not getattr(adk_event, 'turn_complete', True)) or # Live streaming not complete
(not final_response) # Not marked as final by is_final_response()
)

# Prefer LRO routing when a long-running tool call is present
has_lro_function_call = False
try:
lro_ids = set(getattr(adk_event, 'long_running_tool_ids', []) or [])
if lro_ids and adk_event.content and getattr(adk_event.content, 'parts', None):
for part in adk_event.content.parts:
func = getattr(part, 'function_call', None)
func_id = getattr(func, 'id', None) if func else None
if func_id and func_id in lro_ids:
has_lro_function_call = True
break
except Exception:
# Be conservative: if detection fails, do not block streaming path
has_lro_function_call = False

# Process as streaming if it's a chunk OR if it has content but no finish_reason,
# but only when there is no LRO function call present (LRO takes precedence)
if (not has_lro_function_call) and (is_streaming_chunk or (has_content and not getattr(adk_event, 'finish_reason', None))):
# Regular translation path
async for ag_ui_event in event_translator.translate(
adk_event,
input.thread_id,
input.run_id
):

logger.debug(f"Emitting event to queue: {type(ag_ui_event).__name__} (thread {input.thread_id}, queue size before: {event_queue.qsize()})")
await event_queue.put(ag_ui_event)
logger.debug(f"Event queued: {type(ag_ui_event).__name__} (thread {input.thread_id}, queue size after: {event_queue.qsize()})")
else:
# LongRunning Tool events are usually emmitted in final response
# LongRunning Tool events are usually emitted in final response
# Ensure any active streaming text message is closed BEFORE tool calls
async for end_event in event_translator.force_close_streaming_message():
await event_queue.put(end_event)
logger.debug(f"Event queued (forced close): {type(end_event).__name__} (thread {input.thread_id}, queue size after: {event_queue.qsize()})")

async for ag_ui_event in event_translator.translate_lro_function_calls(
adk_event
):
Expand Down Expand Up @@ -994,4 +1023,4 @@ async def close(self):
self._session_lookup_cache.clear()

# Stop session manager cleanup task
await self._session_manager.stop_cleanup_task()
await self._session_manager.stop_cleanup_task()
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,24 @@ async def translate(
if hasattr(adk_event, 'get_function_calls'):
function_calls = adk_event.get_function_calls()
if function_calls:
logger.debug(f"ADK function calls detected: {len(function_calls)} calls")

# CRITICAL FIX: End any active text message stream before starting tool calls
# Per AG-UI protocol: TEXT_MESSAGE_END must be sent before TOOL_CALL_START
async for event in self.force_close_streaming_message():
yield event

# NOW ACTUALLY YIELD THE EVENTS
async for event in self._translate_function_calls(function_calls):
yield event
# Filter out long-running tool calls; those are handled by translate_lro_function_calls
try:
lro_ids = set(getattr(adk_event, 'long_running_tool_ids', []) or [])
except Exception:
lro_ids = set()

non_lro_calls = [fc for fc in function_calls if getattr(fc, 'id', None) not in lro_ids]

if non_lro_calls:
logger.debug(f"ADK function calls detected (non-LRO): {len(non_lro_calls)} of {len(function_calls)} total")
# CRITICAL FIX: End any active text message stream before starting tool calls
# Per AG-UI protocol: TEXT_MESSAGE_END must be sent before TOOL_CALL_START
async for event in self.force_close_streaming_message():
yield event

# Yield only non-LRO function call events
async for event in self._translate_function_calls(non_lro_calls):
yield event

# Handle function responses and yield the tool response event
# this is essential for scenerios when user has to render function response at frontend
Expand Down Expand Up @@ -164,12 +172,17 @@ async def _translate_text_content(
elif hasattr(adk_event, 'is_final_response'):
is_final_response = adk_event.is_final_response

# Handle None values: if is_final_response=True, it means streaming should end
should_send_end = is_final_response and not is_partial

# Handle None values: if a turn is complete or a final chunk arrives, end streaming
has_finish_reason = bool(getattr(adk_event, 'finish_reason', None))
should_send_end = (
(turn_complete and not is_partial)
or (is_final_response and not is_partial)
or (has_finish_reason and self._is_streaming)
)

logger.info(f"📥 Text event - partial={is_partial}, turn_complete={turn_complete}, "
f"is_final_response={is_final_response}, should_send_end={should_send_end}, "
f"currently_streaming={self._is_streaming}")
f"is_final_response={is_final_response}, has_finish_reason={has_finish_reason}, "
f"should_send_end={should_send_end}, currently_streaming={self._is_streaming}")

if is_final_response:

Expand Down Expand Up @@ -464,4 +477,4 @@ def reset(self):
self._streaming_message_id = None
self._is_streaming = False
self.long_running_tool_ids.clear()
logger.debug("Reset EventTranslator state (including streaming state)")
logger.debug("Reset EventTranslator state (including streaming state)")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Shared pytest fixtures for ADK middleware tests."""

from __future__ import annotations

import pytest

from ag_ui.core import SystemMessage as CoreSystemMessage

import ag_ui_adk.adk_agent as adk_agent_module


@pytest.fixture(autouse=True)
def restore_system_message_class():
"""Ensure every test starts and ends with the real SystemMessage type."""

adk_agent_module.SystemMessage = CoreSystemMessage
try:
yield
finally:
adk_agent_module.SystemMessage = CoreSystemMessage
Loading
Loading