Skip to content

Commit f77d68b

Browse files
Re-addressing issue 400 (#615)
* Filter duplicate final payloads without extra hashing * Refactoring of event_translator.
1 parent 9b266e3 commit f77d68b

File tree

2 files changed

+226
-45
lines changed

2 files changed

+226
-45
lines changed

integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py

Lines changed: 105 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ def __init__(self):
137137
# Track streaming message state
138138
self._streaming_message_id: Optional[str] = None # Current streaming message ID
139139
self._is_streaming: bool = False # Whether we're currently streaming a message
140+
self._current_stream_text: str = "" # Accumulates text for the active stream
141+
self._last_streamed_text: Optional[str] = None # Snapshot of most recently streamed text
142+
self._last_streamed_run_id: Optional[str] = None # Run identifier for the last streamed text
140143
self.long_running_tool_ids: List[str] = [] # Track the long running tool IDs
141144

142145
async def translate(
@@ -179,6 +182,7 @@ async def translate(
179182
return
180183

181184
# Handle text content
185+
# --- THIS IS THE RESTORED LINE ---
182186
if adk_event.content and hasattr(adk_event.content, 'parts') and adk_event.content.parts:
183187
async for event in self._translate_text_content(
184188
adk_event, thread_id, run_id
@@ -253,26 +257,34 @@ async def _translate_text_content(
253257
Yields:
254258
Text message events (START, CONTENT, END)
255259
"""
260+
261+
# Check for is_final_response *before* checking for text.
262+
# An empty final response is a valid stream-closing signal.
263+
is_final_response = False
264+
if hasattr(adk_event, 'is_final_response') and callable(adk_event.is_final_response):
265+
is_final_response = adk_event.is_final_response()
266+
elif hasattr(adk_event, 'is_final_response'):
267+
is_final_response = adk_event.is_final_response
268+
256269
# Extract text from all parts
257270
text_parts = []
271+
# The check for adk_event.content.parts happens in the main translate method
258272
for part in adk_event.content.parts:
259-
if part.text:
273+
if part.text: # Note: part.text == "" is False
260274
text_parts.append(part.text)
261275

262-
if not text_parts:
276+
# If no text AND it's not a final response, we can safely skip.
277+
# Otherwise, we must continue to process the final_response signal.
278+
if not text_parts and not is_final_response:
263279
return
264-
265-
280+
281+
combined_text = "".join(text_parts)
282+
266283
# Use proper ADK streaming detection (handle None values)
267284
is_partial = getattr(adk_event, 'partial', False)
268285
turn_complete = getattr(adk_event, 'turn_complete', False)
269286

270-
# Check if this is the final response (complete message - skip to avoid duplication)
271-
is_final_response = False
272-
if hasattr(adk_event, 'is_final_response') and callable(adk_event.is_final_response):
273-
is_final_response = adk_event.is_final_response()
274-
elif hasattr(adk_event, 'is_final_response'):
275-
is_final_response = adk_event.is_final_response
287+
# (is_final_response is already calculated above)
276288

277289
# Handle None values: if a turn is complete or a final chunk arrives, end streaming
278290
has_finish_reason = bool(getattr(adk_event, 'finish_reason', None))
@@ -287,58 +299,83 @@ async def _translate_text_content(
287299
f"should_send_end={should_send_end}, currently_streaming={self._is_streaming}")
288300

289301
if is_final_response:
302+
# This is the final, complete message event.
290303

291-
# If a final text response wasn't streamed (not generated by an LLM) then deliver it in 3 events
292-
if not self._is_streaming and not adk_event.usage_metadata and should_send_end:
293-
logger.info(f"⏭️ Deliver non-llm response via message events "
294-
f"event_id={adk_event.id}")
304+
# Case 1: A stream is actively running. We must close it.
305+
if self._is_streaming and self._streaming_message_id:
306+
logger.info("⏭️ Final response event received. Closing active stream.")
307+
308+
if self._current_stream_text:
309+
# Save the complete streamed text for de-duplication
310+
self._last_streamed_text = self._current_stream_text
311+
self._last_streamed_run_id = run_id
312+
self._current_stream_text = ""
313+
314+
end_event = TextMessageEndEvent(
315+
type=EventType.TEXT_MESSAGE_END,
316+
message_id=self._streaming_message_id
317+
)
318+
logger.info(f"📤 TEXT_MESSAGE_END (from final response): {end_event.model_dump_json()}")
319+
yield end_event
320+
321+
self._streaming_message_id = None
322+
self._is_streaming = False
323+
logger.info("🏁 Streaming completed via final response")
324+
return # We are done.
325+
326+
# Case 2: No stream is active.
327+
# This event contains the *entire* message.
328+
# We must send it, *unless* it's a duplicate of a stream that *just* finished.
329+
330+
# Check for duplicates from a *previous* stream in this *same run*.
331+
is_duplicate = (
332+
self._last_streamed_run_id == run_id and
333+
self._last_streamed_text is not None and
334+
combined_text == self._last_streamed_text
335+
)
295336

296-
combined_text = "".join(text_parts)
337+
if is_duplicate:
338+
logger.info(
339+
"⏭️ Skipping final response event (duplicate content detected from finished stream)"
340+
)
341+
else:
342+
# Not a duplicate, or no previous stream. Send the full message.
343+
logger.info(
344+
f"⏩ Delivering complete non-streamed message or final content event_id={adk_event.id}"
345+
)
297346
message_events = [
298347
TextMessageStartEvent(
299348
type=EventType.TEXT_MESSAGE_START,
300-
message_id=adk_event.id,
301-
role="assistant"
349+
message_id=adk_event.id, # Use event ID for non-streamed
350+
role="assistant",
302351
),
303352
TextMessageContentEvent(
304353
type=EventType.TEXT_MESSAGE_CONTENT,
305354
message_id=adk_event.id,
306-
delta=combined_text
355+
delta=combined_text,
307356
),
308357
TextMessageEndEvent(
309358
type=EventType.TEXT_MESSAGE_END,
310-
message_id=adk_event.id
311-
)
359+
message_id=adk_event.id,
360+
),
312361
]
313362
for msg in message_events:
314363
yield msg
315364

316-
logger.info("⏭️ Skipping final response event (content already streamed)")
317-
318-
# If we're currently streaming, this final response means we should end the stream
319-
if self._is_streaming and self._streaming_message_id:
320-
end_event = TextMessageEndEvent(
321-
type=EventType.TEXT_MESSAGE_END,
322-
message_id=self._streaming_message_id
323-
)
324-
logger.info(f"📤 TEXT_MESSAGE_END (from final response): {end_event.model_dump_json()}")
325-
yield end_event
326-
327-
# Reset streaming state
328-
self._streaming_message_id = None
329-
self._is_streaming = False
330-
logger.info("🏁 Streaming completed via final response")
331-
365+
# Clean up state regardless, as this is the end of the line for text.
366+
self._current_stream_text = ""
367+
self._last_streamed_text = None
368+
self._last_streamed_run_id = None
332369
return
370+
333371

334-
combined_text = "".join(text_parts) # Don't add newlines for streaming
335-
336-
# Handle streaming logic
372+
# Handle streaming logic (if not is_final_response)
337373
if not self._is_streaming:
338374
# Start of new message - emit START event
339375
self._streaming_message_id = str(uuid.uuid4())
340376
self._is_streaming = True
341-
377+
self._current_stream_text = ""
378+
342379
start_event = TextMessageStartEvent(
343380
type=EventType.TEXT_MESSAGE_START,
344381
message_id=self._streaming_message_id,
@@ -349,6 +386,7 @@ async def _translate_text_content(
349386

350387
# Always emit content (unless empty)
351388
if combined_text:
389+
self._current_stream_text += combined_text
352390
content_event = TextMessageContentEvent(
353391
type=EventType.TEXT_MESSAGE_CONTENT,
354392
message_id=self._streaming_message_id,
@@ -365,8 +403,12 @@ async def _translate_text_content(
365403
)
366404
logger.info(f"📤 TEXT_MESSAGE_END: {end_event.model_dump_json()}")
367405
yield end_event
368-
406+
369407
# Reset streaming state
408+
if self._current_stream_text:
409+
self._last_streamed_text = self._current_stream_text
410+
self._last_streamed_run_id = run_id
411+
self._current_stream_text = ""
370412
self._streaming_message_id = None
371413
self._is_streaming = False
372414
logger.info("🏁 Streaming completed, state reset")
@@ -541,9 +583,23 @@ def _create_state_snapshot_event(
541583
A StateSnapshotEvent
542584
"""
543585

586+
FullSnapShot = {
587+
"context": {
588+
"conversation": [],
589+
"user": {
590+
"name": state_snapshot.get("user_name", ""),
591+
"timezone": state_snapshot.get("timezone", "UTC")
592+
},
593+
"app": {
594+
"version": state_snapshot.get("app_version", "unknown")
595+
}
596+
},
597+
"state": state_snapshot.get("custom_state", {})
598+
}
599+
544600
return StateSnapshotEvent(
545601
type=EventType.STATE_SNAPSHOT,
546-
snapshot=state_snapshot
602+
snapshot=FullSnapShot
547603
)
548604

549605
async def force_close_streaming_message(self) -> AsyncGenerator[BaseEvent, None]:
@@ -556,15 +612,16 @@ async def force_close_streaming_message(self) -> AsyncGenerator[BaseEvent, None]
556612
"""
557613
if self._is_streaming and self._streaming_message_id:
558614
logger.warning(f"🚨 Force-closing unterminated streaming message: {self._streaming_message_id}")
559-
615+
560616
end_event = TextMessageEndEvent(
561617
type=EventType.TEXT_MESSAGE_END,
562618
message_id=self._streaming_message_id
563619
)
564620
logger.info(f"📤 TEXT_MESSAGE_END (forced): {end_event.model_dump_json()}")
565621
yield end_event
566-
622+
567623
# Reset streaming state
624+
self._current_stream_text = ""
568625
self._streaming_message_id = None
569626
self._is_streaming = False
570627
logger.info("🔄 Streaming state reset after force-close")
@@ -578,5 +635,9 @@ def reset(self):
578635
self._active_tool_calls.clear()
579636
self._streaming_message_id = None
580637
self._is_streaming = False
638+
self._current_stream_text = ""
639+
self._last_streamed_text = None
640+
self._last_streamed_run_id = None
581641
self.long_running_tool_ids.clear()
582642
logger.debug("Reset EventTranslator state (including streaming state)")
643+

integrations/adk-middleware/python/tests/test_event_translator_comprehensive.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,10 @@ async def test_translate_text_content_final_response_no_streaming(self, translat
341341
async for event in translator.translate(mock_adk_event_with_content, "thread_1", "run_1"):
342342
events.append(event)
343343

344-
assert len(events) == 0 # No events
344+
assert len(events) == 3 # START, CONTENT, END for first final payload
345+
assert isinstance(events[0], TextMessageStartEvent)
346+
assert isinstance(events[1], TextMessageContentEvent)
347+
assert isinstance(events[2], TextMessageEndEvent)
345348

346349
@pytest.mark.asyncio
347350
async def test_translate_text_content_final_response_from_agent_callback(self, translator, mock_adk_event_with_content):
@@ -362,6 +365,123 @@ async def test_translate_text_content_final_response_from_agent_callback(self, t
362365
assert events[1].delta == mock_adk_event_with_content.content.parts[0].text
363366
assert isinstance(events[2], TextMessageEndEvent)
364367

368+
@pytest.mark.asyncio
369+
async def test_translate_text_content_final_response_after_stream_duplicate_suppressed(self, translator):
370+
"""Final LLM payload matching streamed text should be suppressed."""
371+
372+
stream_event = MagicMock(spec=ADKEvent)
373+
stream_event.id = "event-1"
374+
stream_event.author = "model"
375+
stream_event.content = MagicMock()
376+
stream_part = MagicMock()
377+
stream_part.text = "Hello"
378+
stream_event.content.parts = [stream_part]
379+
stream_event.partial = False
380+
stream_event.turn_complete = False
381+
stream_event.is_final_response = False
382+
stream_event.usage_metadata = {"tokens": 1}
383+
384+
events = []
385+
async for event in translator.translate(stream_event, "thread_1", "run_1"):
386+
events.append(event)
387+
388+
assert len(events) == 2 # START + CONTENT
389+
assert isinstance(events[0], TextMessageStartEvent)
390+
assert isinstance(events[1], TextMessageContentEvent)
391+
392+
final_stream_event = MagicMock(spec=ADKEvent)
393+
final_stream_event.id = "event-2"
394+
final_stream_event.author = "model"
395+
final_stream_event.content = MagicMock()
396+
final_stream_part = MagicMock()
397+
final_stream_part.text = ""
398+
final_stream_event.content.parts = [final_stream_part]
399+
final_stream_event.partial = False
400+
final_stream_event.turn_complete = True
401+
final_stream_event.is_final_response = True
402+
final_stream_event.usage_metadata = {"tokens": 1}
403+
404+
events = []
405+
async for event in translator.translate(final_stream_event, "thread_1", "run_1"):
406+
events.append(event)
407+
408+
assert len(events) == 1 # END only
409+
assert isinstance(events[0], TextMessageEndEvent)
410+
411+
final_payload = MagicMock(spec=ADKEvent)
412+
final_payload.id = "event-3"
413+
final_payload.author = "model"
414+
final_payload.content = MagicMock()
415+
final_payload_part = MagicMock()
416+
final_payload_part.text = "Hello"
417+
final_payload.content.parts = [final_payload_part]
418+
final_payload.partial = False
419+
final_payload.turn_complete = True
420+
final_payload.is_final_response = True
421+
final_payload.usage_metadata = {"tokens": 2}
422+
423+
events = []
424+
async for event in translator.translate(final_payload, "thread_1", "run_1"):
425+
events.append(event)
426+
427+
assert events == [] # duplicate suppressed
428+
429+
@pytest.mark.asyncio
430+
async def test_translate_text_content_final_response_after_stream_new_content(self, translator):
431+
"""Final LLM payload with new content should be emitted."""
432+
433+
stream_event = MagicMock(spec=ADKEvent)
434+
stream_event.id = "event-1"
435+
stream_event.author = "model"
436+
stream_event.content = MagicMock()
437+
stream_part = MagicMock()
438+
stream_part.text = "Hello"
439+
stream_event.content.parts = [stream_part]
440+
stream_event.partial = False
441+
stream_event.turn_complete = False
442+
stream_event.is_final_response = False
443+
stream_event.usage_metadata = {"tokens": 1}
444+
445+
async for _ in translator.translate(stream_event, "thread_1", "run_1"):
446+
pass
447+
448+
final_stream_event = MagicMock(spec=ADKEvent)
449+
final_stream_event.id = "event-2"
450+
final_stream_event.author = "model"
451+
final_stream_event.content = MagicMock()
452+
final_stream_part = MagicMock()
453+
final_stream_part.text = ""
454+
final_stream_event.content.parts = [final_stream_part]
455+
final_stream_event.partial = False
456+
final_stream_event.turn_complete = True
457+
final_stream_event.is_final_response = True
458+
final_stream_event.usage_metadata = {"tokens": 1}
459+
460+
async for _ in translator.translate(final_stream_event, "thread_1", "run_1"):
461+
pass
462+
463+
final_payload = MagicMock(spec=ADKEvent)
464+
final_payload.id = "event-3"
465+
final_payload.author = "model"
466+
final_payload.content = MagicMock()
467+
final_payload_part = MagicMock()
468+
final_payload_part.text = "Hello again"
469+
final_payload.content.parts = [final_payload_part]
470+
final_payload.partial = False
471+
final_payload.turn_complete = True
472+
final_payload.is_final_response = True
473+
final_payload.usage_metadata = {"tokens": 2}
474+
475+
events = []
476+
async for event in translator.translate(final_payload, "thread_1", "run_1"):
477+
events.append(event)
478+
479+
assert len(events) == 3
480+
assert isinstance(events[0], TextMessageStartEvent)
481+
assert isinstance(events[1], TextMessageContentEvent)
482+
assert events[1].delta == "Hello again"
483+
assert isinstance(events[2], TextMessageEndEvent)
484+
365485
@pytest.mark.asyncio
366486
async def test_translate_text_content_empty_text(self, translator, mock_adk_event):
367487
"""Test text content with empty text."""

0 commit comments

Comments
 (0)