Skip to content

Commit 9a56c3b

Browse files
Fix/issue 400 (#499)
* Fix Claude Sonnet 4 streaming detection issue (#400) Replace fragile usage_metadata-based logic with robust streaming detection that checks multiple explicit streaming indicators. **Problem:** The original logic relied on `not adk_event.usage_metadata` to determine if an event should be processed as streaming. This was fragile because Claude models can include usage_metadata even in streaming chunks, causing responses to disappear. **Solution:** Implement comprehensive streaming detection that checks: - `partial` attribute (explicitly marked as partial) - `turn_complete` attribute (live streaming completion status) - `is_final_response()` method (final response indicator) - `finish_reason` attribute (fallback for content without finish reason) This ensures all streaming content is captured regardless of usage_metadata presence, fixing compatibility with Claude Sonnet 4 and other models. **Testing:** ✅ All 277 tests pass ✅ Streaming detection works across different model providers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * test: ensure partial final chunks use streaming translation * test: cover turn complete fallback in ADK agent * fix: correct event type in partial final chunk test Change TextMessageContentEvent to TextMessageChunkEvent in test to match actual AG-UI protocol event types. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Add test for streaming finish reason fallback * Fix test_streaming_finish_reason_fallback: set is_final_response=False for streaming event * fix(e2e): resolve flaky haiku display test by checking only last haiku The Tool Based Generative UI haiku test was exhibiting flaky behavior where it would sometimes pass and sometimes fail with the same test conditions. The test was more reliable when run with --headed than when run headless, suggesting a timing-related issue. Root cause: The extractMainDisplayHaikuContent() method was concatenating ALL visible haiku lines from the main display, while the chat extraction only captured the most recent haiku. When multiple haikus were displayed simultaneously (due to rendering timing), this caused mismatches. Fix: Modified extractMainDisplayHaikuContent() to extract only the last 3 lines (the most recent haiku), matching the behavior of the chat extraction and eliminating timing-related flakiness. This affects all 10 platform integration tests that use ToolBaseGenUIPage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Update dojo-e2e.yml Setup Workload Identity Federation (cherry picked from commit 979b3dc) * Reverting Workload Identity Federation * Re-adding linefeed so the file matches up exactly. * test: update dojo-e2e workflow 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Re-adding temporary removals. * fix: properly close streaming messages when finish_reason is present Add fallback logic to detect streaming completion using finish_reason when is_final_response returns False but finish_reason is set. **Problem:** Gemini returns events with partial=True and is_final_response()=False even on the final chunk that contains finish_reason="STOP". This caused streaming messages to remain open and require force-closing, resulting in warnings. **Solution:** Enhanced should_send_end logic to check for finish_reason as a fallback: - Check if finish_reason attribute exists and is truthy - If streaming is active and finish_reason is present, emit TEXT_MESSAGE_END - Formula: should_send_end = (is_final_response and not is_partial) or (has_finish_reason and self._is_streaming) **Testing:** ✅ All 277 tests pass ✅ Added test_partial_with_finish_reason to verify the fix ✅ Eliminates "Force-closing unterminated streaming message" warnings ✅ Properly emits TEXT_MESSAGE_END for events with finish_reason 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * tests: make function-call detection assertion semantic * tests: align EventTranslator streaming expectations * tests: reconcile EventTranslator comprehensive expectations * ADK middleware: prefer LRO routing and harden translator; add tests - Prefer LRO routing in ADKAgent when long‑running tool call IDs are present in event.content.parts (prevents misrouting into streaming path and tool loops; preserves HITL pause) - Force‑close any active streaming text before emitting LRO tool events (guarantees TEXT_MESSAGE_END precedes TOOL_CALL_START) - Harden EventTranslator.translate to filter out long‑running tool calls from the general path; only emit non‑LRO calls (avoids duplicate tool events) - Add tests: * test_lro_filtering.py (translator‑level filtering + LRO‑only emission) * test_integration_mixed_partials.py (streaming → non‑LRO → final LRO: order, no duplicates, correct IDs) * fix: restore LRO routing guard and streaming tests * tests: stabilize ToolBaseGenUIPage haiku comparison * test(adk): restore SystemMessage between tests * Fix Claude Sonnet 4 streaming detection issue (#400) Replace fragile usage_metadata-based logic with robust streaming detection that checks multiple explicit streaming indicators. **Problem:** The original logic relied on `not adk_event.usage_metadata` to determine if an event should be processed as streaming. This was fragile because Claude models can include usage_metadata even in streaming chunks, causing responses to disappear. **Solution:** Implement comprehensive streaming detection that checks: - `partial` attribute (explicitly marked as partial) - `turn_complete` attribute (live streaming completion status) - `is_final_response()` method (final response indicator) - `finish_reason` attribute (fallback for content without finish reason) This ensures all streaming content is captured regardless of usage_metadata presence, fixing compatibility with Claude Sonnet 4 and other models. **Testing:** ✅ All 277 tests pass ✅ Streaming detection works across different model providers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * test: cover turn complete fallback in ADK agent * test: ensure partial final chunks use streaming translation * fix: correct event type in partial final chunk test Change TextMessageContentEvent to TextMessageChunkEvent in test to match actual AG-UI protocol event types. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Add test for streaming finish reason fallback * Fix test_streaming_finish_reason_fallback: set is_final_response=False for streaming event * fix(e2e): resolve flaky haiku display test by checking only last haiku The Tool Based Generative UI haiku test was exhibiting flaky behavior where it would sometimes pass and sometimes fail with the same test conditions. The test was more reliable when run with --headed than when run headless, suggesting a timing-related issue. Root cause: The extractMainDisplayHaikuContent() method was concatenating ALL visible haiku lines from the main display, while the chat extraction only captured the most recent haiku. When multiple haikus were displayed simultaneously (due to rendering timing), this caused mismatches. Fix: Modified extractMainDisplayHaikuContent() to extract only the last 3 lines (the most recent haiku), matching the behavior of the chat extraction and eliminating timing-related flakiness. This affects all 10 platform integration tests that use ToolBaseGenUIPage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Update dojo-e2e.yml Setup Workload Identity Federation (cherry picked from commit 979b3dc) * Reverting Workload Identity Federation * Re-adding linefeed so the file matches up exactly. * test: update dojo-e2e workflow 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Re-adding temporary removals. * fix: properly close streaming messages when finish_reason is present Add fallback logic to detect streaming completion using finish_reason when is_final_response returns False but finish_reason is set. **Problem:** Gemini returns events with partial=True and is_final_response()=False even on the final chunk that contains finish_reason="STOP". This caused streaming messages to remain open and require force-closing, resulting in warnings. **Solution:** Enhanced should_send_end logic to check for finish_reason as a fallback: - Check if finish_reason attribute exists and is truthy - If streaming is active and finish_reason is present, emit TEXT_MESSAGE_END - Formula: should_send_end = (is_final_response and not is_partial) or (has_finish_reason and self._is_streaming) **Testing:** ✅ All 277 tests pass ✅ Added test_partial_with_finish_reason to verify the fix ✅ Eliminates "Force-closing unterminated streaming message" warnings ✅ Properly emits TEXT_MESSAGE_END for events with finish_reason 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * tests: make function-call detection assertion semantic * tests: align EventTranslator streaming expectations * tests: reconcile EventTranslator comprehensive expectations * ADK middleware: prefer LRO routing and harden translator; add tests - Prefer LRO routing in ADKAgent when long‑running tool call IDs are present in event.content.parts (prevents misrouting into streaming path and tool loops; preserves HITL pause) - Force‑close any active streaming text before emitting LRO tool events (guarantees TEXT_MESSAGE_END precedes TOOL_CALL_START) - Harden EventTranslator.translate to filter out long‑running tool calls from the general path; only emit non‑LRO calls (avoids duplicate tool events) - Add tests: * test_lro_filtering.py (translator‑level filtering + LRO‑only emission) * test_integration_mixed_partials.py (streaming → non‑LRO → final LRO: order, no duplicates, correct IDs) * fix: restore LRO routing guard and streaming tests * tests: stabilize ToolBaseGenUIPage haiku comparison * test(adk): restore SystemMessage between tests * Replacing ToolBaseGenUIPage.ts with version from main. * Update ToolBaseGenUIPage.ts Attempting to resync with main again. --------- Co-authored-by: Claude <[email protected]>
1 parent a29e571 commit 9a56c3b

File tree

9 files changed

+665
-41
lines changed

9 files changed

+665
-41
lines changed

typescript-sdk/integrations/adk-middleware/python/src/ag_ui_adk/__init__.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,64 @@
55
This middleware enables Google ADK agents to be used with the AG-UI protocol.
66
"""
77

8+
from __future__ import annotations
9+
10+
import logging
11+
import os
12+
from typing import Dict, Iterable
13+
814
from .adk_agent import ADKAgent
915
from .event_translator import EventTranslator
1016
from .session_manager import SessionManager
1117
from .endpoint import add_adk_fastapi_endpoint, create_adk_app
1218

1319
__all__ = ['ADKAgent', 'add_adk_fastapi_endpoint', 'create_adk_app', 'EventTranslator', 'SessionManager']
1420

15-
__version__ = "0.1.0"
21+
__version__ = "0.1.0"
22+
23+
24+
def _configure_logging_from_env() -> None:
25+
"""Configure component loggers based on environment variables."""
26+
27+
root_level = os.getenv('LOG_ROOT_LEVEL')
28+
if root_level:
29+
try:
30+
level = getattr(logging, root_level.upper())
31+
except AttributeError:
32+
logging.getLogger(__name__).warning(
33+
"Invalid LOG_ROOT_LEVEL value '%s'", root_level
34+
)
35+
else:
36+
logging.basicConfig(level=level, force=True)
37+
38+
component_levels: Dict[str, Iterable[str]] = {
39+
'LOG_ADK_AGENT': ('ag_ui_adk.adk_agent',),
40+
'LOG_EVENT_TRANSLATOR': (
41+
'ag_ui_adk.event_translator',
42+
'event_translator',
43+
),
44+
'LOG_ENDPOINT': ('ag_ui_adk.endpoint', 'endpoint'),
45+
'LOG_SESSION_MANAGER': (
46+
'ag_ui_adk.session_manager',
47+
'session_manager',
48+
),
49+
}
50+
51+
for env_var, logger_names in component_levels.items():
52+
level_name = os.getenv(env_var)
53+
if not level_name:
54+
continue
55+
56+
try:
57+
level = getattr(logging, level_name.upper())
58+
except AttributeError:
59+
logging.getLogger(__name__).warning(
60+
"Invalid value '%s' for %s", level_name, env_var
61+
)
62+
continue
63+
64+
for logger_name in logger_names:
65+
logging.getLogger(logger_name).setLevel(level)
66+
67+
68+
_configure_logging_from_env()

typescript-sdk/integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -916,19 +916,48 @@ async def _run_adk_in_background(
916916
final_response = adk_event.is_final_response()
917917
has_content = adk_event.content and hasattr(adk_event.content, 'parts') and adk_event.content.parts
918918

919-
if not final_response or (not adk_event.usage_metadata and has_content):
920-
# Translate and emit events
919+
# Check if this is a streaming chunk that needs regular processing
920+
is_streaming_chunk = (
921+
getattr(adk_event, 'partial', False) or # Explicitly marked as partial
922+
(not getattr(adk_event, 'turn_complete', True)) or # Live streaming not complete
923+
(not final_response) # Not marked as final by is_final_response()
924+
)
925+
926+
# Prefer LRO routing when a long-running tool call is present
927+
has_lro_function_call = False
928+
try:
929+
lro_ids = set(getattr(adk_event, 'long_running_tool_ids', []) or [])
930+
if lro_ids and adk_event.content and getattr(adk_event.content, 'parts', None):
931+
for part in adk_event.content.parts:
932+
func = getattr(part, 'function_call', None)
933+
func_id = getattr(func, 'id', None) if func else None
934+
if func_id and func_id in lro_ids:
935+
has_lro_function_call = True
936+
break
937+
except Exception:
938+
# Be conservative: if detection fails, do not block streaming path
939+
has_lro_function_call = False
940+
941+
# Process as streaming if it's a chunk OR if it has content but no finish_reason,
942+
# but only when there is no LRO function call present (LRO takes precedence)
943+
if (not has_lro_function_call) and (is_streaming_chunk or (has_content and not getattr(adk_event, 'finish_reason', None))):
944+
# Regular translation path
921945
async for ag_ui_event in event_translator.translate(
922946
adk_event,
923947
input.thread_id,
924948
input.run_id
925949
):
926-
950+
927951
logger.debug(f"Emitting event to queue: {type(ag_ui_event).__name__} (thread {input.thread_id}, queue size before: {event_queue.qsize()})")
928952
await event_queue.put(ag_ui_event)
929953
logger.debug(f"Event queued: {type(ag_ui_event).__name__} (thread {input.thread_id}, queue size after: {event_queue.qsize()})")
930954
else:
931-
# LongRunning Tool events are usually emmitted in final response
955+
# LongRunning Tool events are usually emitted in final response
956+
# Ensure any active streaming text message is closed BEFORE tool calls
957+
async for end_event in event_translator.force_close_streaming_message():
958+
await event_queue.put(end_event)
959+
logger.debug(f"Event queued (forced close): {type(end_event).__name__} (thread {input.thread_id}, queue size after: {event_queue.qsize()})")
960+
932961
async for ag_ui_event in event_translator.translate_lro_function_calls(
933962
adk_event
934963
):
@@ -994,4 +1023,4 @@ async def close(self):
9941023
self._session_lookup_cache.clear()
9951024

9961025
# Stop session manager cleanup task
997-
await self._session_manager.stop_cleanup_task()
1026+
await self._session_manager.stop_cleanup_task()

typescript-sdk/integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,24 @@ async def translate(
189189
if hasattr(adk_event, 'get_function_calls'):
190190
function_calls = adk_event.get_function_calls()
191191
if function_calls:
192-
logger.debug(f"ADK function calls detected: {len(function_calls)} calls")
193-
194-
# CRITICAL FIX: End any active text message stream before starting tool calls
195-
# Per AG-UI protocol: TEXT_MESSAGE_END must be sent before TOOL_CALL_START
196-
async for event in self.force_close_streaming_message():
197-
yield event
198-
199-
# NOW ACTUALLY YIELD THE EVENTS
200-
async for event in self._translate_function_calls(function_calls):
201-
yield event
192+
# Filter out long-running tool calls; those are handled by translate_lro_function_calls
193+
try:
194+
lro_ids = set(getattr(adk_event, 'long_running_tool_ids', []) or [])
195+
except Exception:
196+
lro_ids = set()
197+
198+
non_lro_calls = [fc for fc in function_calls if getattr(fc, 'id', None) not in lro_ids]
199+
200+
if non_lro_calls:
201+
logger.debug(f"ADK function calls detected (non-LRO): {len(non_lro_calls)} of {len(function_calls)} total")
202+
# CRITICAL FIX: End any active text message stream before starting tool calls
203+
# Per AG-UI protocol: TEXT_MESSAGE_END must be sent before TOOL_CALL_START
204+
async for event in self.force_close_streaming_message():
205+
yield event
206+
207+
# Yield only non-LRO function call events
208+
async for event in self._translate_function_calls(non_lro_calls):
209+
yield event
202210

203211
# Handle function responses and yield the tool response event
204212
# this is essential for scenerios when user has to render function response at frontend
@@ -266,12 +274,17 @@ async def _translate_text_content(
266274
elif hasattr(adk_event, 'is_final_response'):
267275
is_final_response = adk_event.is_final_response
268276

269-
# Handle None values: if is_final_response=True, it means streaming should end
270-
should_send_end = is_final_response and not is_partial
271-
277+
# Handle None values: if a turn is complete or a final chunk arrives, end streaming
278+
has_finish_reason = bool(getattr(adk_event, 'finish_reason', None))
279+
should_send_end = (
280+
(turn_complete and not is_partial)
281+
or (is_final_response and not is_partial)
282+
or (has_finish_reason and self._is_streaming)
283+
)
284+
272285
logger.info(f"📥 Text event - partial={is_partial}, turn_complete={turn_complete}, "
273-
f"is_final_response={is_final_response}, should_send_end={should_send_end}, "
274-
f"currently_streaming={self._is_streaming}")
286+
f"is_final_response={is_final_response}, has_finish_reason={has_finish_reason}, "
287+
f"should_send_end={should_send_end}, currently_streaming={self._is_streaming}")
275288

276289
if is_final_response:
277290

@@ -566,4 +579,4 @@ def reset(self):
566579
self._streaming_message_id = None
567580
self._is_streaming = False
568581
self.long_running_tool_ids.clear()
569-
logger.debug("Reset EventTranslator state (including streaming state)")
582+
logger.debug("Reset EventTranslator state (including streaming state)")
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""Shared pytest fixtures for ADK middleware tests."""
2+
3+
from __future__ import annotations
4+
5+
import pytest
6+
7+
from ag_ui.core import SystemMessage as CoreSystemMessage
8+
9+
import ag_ui_adk.adk_agent as adk_agent_module
10+
11+
12+
@pytest.fixture(autouse=True)
13+
def restore_system_message_class():
14+
"""Ensure every test starts and ends with the real SystemMessage type."""
15+
16+
adk_agent_module.SystemMessage = CoreSystemMessage
17+
try:
18+
yield
19+
finally:
20+
adk_agent_module.SystemMessage = CoreSystemMessage

0 commit comments

Comments
 (0)