Skip to content

Commit 50c7a68

Browse files
Filter duplicate final payloads without extra hashing
1 parent a71fbb7 commit 50c7a68

File tree

2 files changed

+198
-22
lines changed

2 files changed

+198
-22
lines changed

integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py

Lines changed: 77 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ def __init__(self):
137137
# Track streaming message state
138138
self._streaming_message_id: Optional[str] = None # Current streaming message ID
139139
self._is_streaming: bool = False # Whether we're currently streaming a message
140+
self._current_stream_text: str = "" # Accumulates text for the active stream
141+
self._last_streamed_text: Optional[str] = None # Snapshot of most recently streamed text
142+
self._last_streamed_run_id: Optional[str] = None # Run identifier for the last streamed text
140143
self.long_running_tool_ids: List[str] = [] # Track the long running tool IDs
141144

142145
async def translate(
@@ -261,8 +264,9 @@ async def _translate_text_content(
261264

262265
if not text_parts:
263266
return
264-
265-
267+
268+
combined_text = "".join(text_parts)
269+
266270
# Use proper ADK streaming detection (handle None values)
267271
is_partial = getattr(adk_event, 'partial', False)
268272
turn_complete = getattr(adk_event, 'turn_complete', False)
@@ -288,57 +292,100 @@ async def _translate_text_content(
288292

289293
if is_final_response:
290294

291-
# If a final text response wasn't streamed (not generated by an LLM) then deliver it in 3 events
292295
if not self._is_streaming and not adk_event.usage_metadata and should_send_end:
293-
logger.info(f"⏭️ Deliver non-llm response via message events "
294-
f"event_id={adk_event.id}")
296+
logger.info(
297+
f"⏭️ Deliver non-llm response via message events event_id={adk_event.id}"
298+
)
295299

296-
combined_text = "".join(text_parts)
297300
message_events = [
298301
TextMessageStartEvent(
299302
type=EventType.TEXT_MESSAGE_START,
300303
message_id=adk_event.id,
301-
role="assistant"
304+
role="assistant",
302305
),
303306
TextMessageContentEvent(
304307
type=EventType.TEXT_MESSAGE_CONTENT,
305308
message_id=adk_event.id,
306-
delta=combined_text
309+
delta=combined_text,
307310
),
308311
TextMessageEndEvent(
309312
type=EventType.TEXT_MESSAGE_END,
310-
message_id=adk_event.id
311-
)
313+
message_id=adk_event.id,
314+
),
312315
]
313316
for msg in message_events:
314317
yield msg
315318

319+
self._current_stream_text = ""
320+
self._last_streamed_text = None
321+
self._last_streamed_run_id = None
322+
return
323+
324+
if not self._is_streaming and adk_event.usage_metadata:
325+
if (
326+
self._last_streamed_text is None
327+
or self._last_streamed_run_id != run_id
328+
or combined_text != self._last_streamed_text
329+
):
330+
logger.info(
331+
f"⏩ Deliver final response after stream due to new content event_id={adk_event.id}"
332+
)
333+
message_events = [
334+
TextMessageStartEvent(
335+
type=EventType.TEXT_MESSAGE_START,
336+
message_id=adk_event.id,
337+
role="assistant",
338+
),
339+
TextMessageContentEvent(
340+
type=EventType.TEXT_MESSAGE_CONTENT,
341+
message_id=adk_event.id,
342+
delta=combined_text,
343+
),
344+
TextMessageEndEvent(
345+
type=EventType.TEXT_MESSAGE_END,
346+
message_id=adk_event.id,
347+
),
348+
]
349+
for msg in message_events:
350+
yield msg
351+
else:
352+
logger.info(
353+
"⏭️ Skipping final response event (duplicate content detected)"
354+
)
355+
356+
self._current_stream_text = ""
357+
self._last_streamed_text = None
358+
self._last_streamed_run_id = None
359+
return
360+
316361
logger.info("⏭️ Skipping final response event (content already streamed)")
317-
318-
# If we're currently streaming, this final response means we should end the stream
362+
319363
if self._is_streaming and self._streaming_message_id:
364+
if self._current_stream_text:
365+
self._last_streamed_text = self._current_stream_text
366+
self._last_streamed_run_id = run_id
367+
self._current_stream_text = ""
368+
320369
end_event = TextMessageEndEvent(
321370
type=EventType.TEXT_MESSAGE_END,
322371
message_id=self._streaming_message_id
323372
)
324373
logger.info(f"📤 TEXT_MESSAGE_END (from final response): {end_event.model_dump_json()}")
325374
yield end_event
326-
327-
# Reset streaming state
375+
328376
self._streaming_message_id = None
329377
self._is_streaming = False
330378
logger.info("🏁 Streaming completed via final response")
331-
379+
332380
return
333381

334-
combined_text = "".join(text_parts) # Don't add newlines for streaming
335-
336382
# Handle streaming logic
337383
if not self._is_streaming:
338384
# Start of new message - emit START event
339385
self._streaming_message_id = str(uuid.uuid4())
340386
self._is_streaming = True
341-
387+
self._current_stream_text = ""
388+
342389
start_event = TextMessageStartEvent(
343390
type=EventType.TEXT_MESSAGE_START,
344391
message_id=self._streaming_message_id,
@@ -349,6 +396,7 @@ async def _translate_text_content(
349396

350397
# Always emit content (unless empty)
351398
if combined_text:
399+
self._current_stream_text += combined_text
352400
content_event = TextMessageContentEvent(
353401
type=EventType.TEXT_MESSAGE_CONTENT,
354402
message_id=self._streaming_message_id,
@@ -365,8 +413,12 @@ async def _translate_text_content(
365413
)
366414
logger.info(f"📤 TEXT_MESSAGE_END: {end_event.model_dump_json()}")
367415
yield end_event
368-
416+
369417
# Reset streaming state
418+
if self._current_stream_text:
419+
self._last_streamed_text = self._current_stream_text
420+
self._last_streamed_run_id = run_id
421+
self._current_stream_text = ""
370422
self._streaming_message_id = None
371423
self._is_streaming = False
372424
logger.info("🏁 Streaming completed, state reset")
@@ -556,15 +608,16 @@ async def force_close_streaming_message(self) -> AsyncGenerator[BaseEvent, None]
556608
"""
557609
if self._is_streaming and self._streaming_message_id:
558610
logger.warning(f"🚨 Force-closing unterminated streaming message: {self._streaming_message_id}")
559-
611+
560612
end_event = TextMessageEndEvent(
561613
type=EventType.TEXT_MESSAGE_END,
562614
message_id=self._streaming_message_id
563615
)
564616
logger.info(f"📤 TEXT_MESSAGE_END (forced): {end_event.model_dump_json()}")
565617
yield end_event
566-
618+
567619
# Reset streaming state
620+
self._current_stream_text = ""
568621
self._streaming_message_id = None
569622
self._is_streaming = False
570623
logger.info("🔄 Streaming state reset after force-close")
@@ -578,5 +631,8 @@ def reset(self):
578631
self._active_tool_calls.clear()
579632
self._streaming_message_id = None
580633
self._is_streaming = False
634+
self._current_stream_text = ""
635+
self._last_streamed_text = None
636+
self._last_streamed_run_id = None
581637
self.long_running_tool_ids.clear()
582638
logger.debug("Reset EventTranslator state (including streaming state)")

integrations/adk-middleware/python/tests/test_event_translator_comprehensive.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,10 @@ async def test_translate_text_content_final_response_no_streaming(self, translat
341341
async for event in translator.translate(mock_adk_event_with_content, "thread_1", "run_1"):
342342
events.append(event)
343343

344-
assert len(events) == 0 # No events
344+
assert len(events) == 3 # START, CONTENT, END for first final payload
345+
assert isinstance(events[0], TextMessageStartEvent)
346+
assert isinstance(events[1], TextMessageContentEvent)
347+
assert isinstance(events[2], TextMessageEndEvent)
345348

346349
@pytest.mark.asyncio
347350
async def test_translate_text_content_final_response_from_agent_callback(self, translator, mock_adk_event_with_content):
@@ -362,6 +365,123 @@ async def test_translate_text_content_final_response_from_agent_callback(self, t
362365
assert events[1].delta == mock_adk_event_with_content.content.parts[0].text
363366
assert isinstance(events[2], TextMessageEndEvent)
364367

368+
@pytest.mark.asyncio
369+
async def test_translate_text_content_final_response_after_stream_duplicate_suppressed(self, translator):
370+
"""Final LLM payload matching streamed text should be suppressed."""
371+
372+
stream_event = MagicMock(spec=ADKEvent)
373+
stream_event.id = "event-1"
374+
stream_event.author = "model"
375+
stream_event.content = MagicMock()
376+
stream_part = MagicMock()
377+
stream_part.text = "Hello"
378+
stream_event.content.parts = [stream_part]
379+
stream_event.partial = False
380+
stream_event.turn_complete = False
381+
stream_event.is_final_response = False
382+
stream_event.usage_metadata = {"tokens": 1}
383+
384+
events = []
385+
async for event in translator.translate(stream_event, "thread_1", "run_1"):
386+
events.append(event)
387+
388+
assert len(events) == 2 # START + CONTENT
389+
assert isinstance(events[0], TextMessageStartEvent)
390+
assert isinstance(events[1], TextMessageContentEvent)
391+
392+
final_stream_event = MagicMock(spec=ADKEvent)
393+
final_stream_event.id = "event-2"
394+
final_stream_event.author = "model"
395+
final_stream_event.content = MagicMock()
396+
final_stream_part = MagicMock()
397+
final_stream_part.text = ""
398+
final_stream_event.content.parts = [final_stream_part]
399+
final_stream_event.partial = False
400+
final_stream_event.turn_complete = True
401+
final_stream_event.is_final_response = True
402+
final_stream_event.usage_metadata = {"tokens": 1}
403+
404+
events = []
405+
async for event in translator.translate(final_stream_event, "thread_1", "run_1"):
406+
events.append(event)
407+
408+
assert len(events) == 1 # END only
409+
assert isinstance(events[0], TextMessageEndEvent)
410+
411+
final_payload = MagicMock(spec=ADKEvent)
412+
final_payload.id = "event-3"
413+
final_payload.author = "model"
414+
final_payload.content = MagicMock()
415+
final_payload_part = MagicMock()
416+
final_payload_part.text = "Hello"
417+
final_payload.content.parts = [final_payload_part]
418+
final_payload.partial = False
419+
final_payload.turn_complete = True
420+
final_payload.is_final_response = True
421+
final_payload.usage_metadata = {"tokens": 2}
422+
423+
events = []
424+
async for event in translator.translate(final_payload, "thread_1", "run_1"):
425+
events.append(event)
426+
427+
assert events == [] # duplicate suppressed
428+
429+
@pytest.mark.asyncio
430+
async def test_translate_text_content_final_response_after_stream_new_content(self, translator):
431+
"""Final LLM payload with new content should be emitted."""
432+
433+
stream_event = MagicMock(spec=ADKEvent)
434+
stream_event.id = "event-1"
435+
stream_event.author = "model"
436+
stream_event.content = MagicMock()
437+
stream_part = MagicMock()
438+
stream_part.text = "Hello"
439+
stream_event.content.parts = [stream_part]
440+
stream_event.partial = False
441+
stream_event.turn_complete = False
442+
stream_event.is_final_response = False
443+
stream_event.usage_metadata = {"tokens": 1}
444+
445+
async for _ in translator.translate(stream_event, "thread_1", "run_1"):
446+
pass
447+
448+
final_stream_event = MagicMock(spec=ADKEvent)
449+
final_stream_event.id = "event-2"
450+
final_stream_event.author = "model"
451+
final_stream_event.content = MagicMock()
452+
final_stream_part = MagicMock()
453+
final_stream_part.text = ""
454+
final_stream_event.content.parts = [final_stream_part]
455+
final_stream_event.partial = False
456+
final_stream_event.turn_complete = True
457+
final_stream_event.is_final_response = True
458+
final_stream_event.usage_metadata = {"tokens": 1}
459+
460+
async for _ in translator.translate(final_stream_event, "thread_1", "run_1"):
461+
pass
462+
463+
final_payload = MagicMock(spec=ADKEvent)
464+
final_payload.id = "event-3"
465+
final_payload.author = "model"
466+
final_payload.content = MagicMock()
467+
final_payload_part = MagicMock()
468+
final_payload_part.text = "Hello again"
469+
final_payload.content.parts = [final_payload_part]
470+
final_payload.partial = False
471+
final_payload.turn_complete = True
472+
final_payload.is_final_response = True
473+
final_payload.usage_metadata = {"tokens": 2}
474+
475+
events = []
476+
async for event in translator.translate(final_payload, "thread_1", "run_1"):
477+
events.append(event)
478+
479+
assert len(events) == 3
480+
assert isinstance(events[0], TextMessageStartEvent)
481+
assert isinstance(events[1], TextMessageContentEvent)
482+
assert events[1].delta == "Hello again"
483+
assert isinstance(events[2], TextMessageEndEvent)
484+
365485
@pytest.mark.asyncio
366486
async def test_translate_text_content_empty_text(self, translator, mock_adk_event):
367487
"""Test text content with empty text."""

0 commit comments

Comments
 (0)