Skip to content

Commit 09eeba6

Browse files
Refactoring of event_translator.
1 parent 50c7a68 commit 09eeba6

File tree

1 file changed

+80
-75
lines changed

1 file changed

+80
-75
lines changed

integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py

Lines changed: 80 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ async def translate(
182182
return
183183

184184
# Handle text content
185+
# --- THIS IS THE RESTORED LINE ---
185186
if adk_event.content and hasattr(adk_event.content, 'parts') and adk_event.content.parts:
186187
async for event in self._translate_text_content(
187188
adk_event, thread_id, run_id
@@ -256,13 +257,25 @@ async def _translate_text_content(
256257
Yields:
257258
Text message events (START, CONTENT, END)
258259
"""
260+
261+
# Check for is_final_response *before* checking for text.
262+
# An empty final response is a valid stream-closing signal.
263+
is_final_response = False
264+
if hasattr(adk_event, 'is_final_response') and callable(adk_event.is_final_response):
265+
is_final_response = adk_event.is_final_response()
266+
elif hasattr(adk_event, 'is_final_response'):
267+
is_final_response = adk_event.is_final_response
268+
259269
# Extract text from all parts
260270
text_parts = []
271+
# The check for adk_event.content.parts happens in the main translate method
261272
for part in adk_event.content.parts:
262-
if part.text:
273+
if part.text: # Note: part.text == "" is False
263274
text_parts.append(part.text)
264275

265-
if not text_parts:
276+
# If no text AND it's not a final response, we can safely skip.
277+
# Otherwise, we must continue to process the final_response signal.
278+
if not text_parts and not is_final_response:
266279
return
267280

268281
combined_text = "".join(text_parts)
@@ -271,12 +284,7 @@ async def _translate_text_content(
271284
is_partial = getattr(adk_event, 'partial', False)
272285
turn_complete = getattr(adk_event, 'turn_complete', False)
273286

274-
# Check if this is the final response (complete message - skip to avoid duplication)
275-
is_final_response = False
276-
if hasattr(adk_event, 'is_final_response') and callable(adk_event.is_final_response):
277-
is_final_response = adk_event.is_final_response()
278-
elif hasattr(adk_event, 'is_final_response'):
279-
is_final_response = adk_event.is_final_response
287+
# (is_final_response is already calculated above)
280288

281289
# Handle None values: if a turn is complete or a final chunk arrives, end streaming
282290
has_finish_reason = bool(getattr(adk_event, 'finish_reason', None))
@@ -291,16 +299,54 @@ async def _translate_text_content(
291299
f"should_send_end={should_send_end}, currently_streaming={self._is_streaming}")
292300

293301
if is_final_response:
302+
# This is the final, complete message event.
294303

295-
if not self._is_streaming and not adk_event.usage_metadata and should_send_end:
296-
logger.info(
297-
f"⏭️ Deliver non-llm response via message events event_id={adk_event.id}"
304+
# Case 1: A stream is actively running. We must close it.
305+
if self._is_streaming and self._streaming_message_id:
306+
logger.info("⏭️ Final response event received. Closing active stream.")
307+
308+
if self._current_stream_text:
309+
# Save the complete streamed text for de-duplication
310+
self._last_streamed_text = self._current_stream_text
311+
self._last_streamed_run_id = run_id
312+
self._current_stream_text = ""
313+
314+
end_event = TextMessageEndEvent(
315+
type=EventType.TEXT_MESSAGE_END,
316+
message_id=self._streaming_message_id
298317
)
318+
logger.info(f"📤 TEXT_MESSAGE_END (from final response): {end_event.model_dump_json()}")
319+
yield end_event
299320

321+
self._streaming_message_id = None
322+
self._is_streaming = False
323+
logger.info("🏁 Streaming completed via final response")
324+
return # We are done.
325+
326+
# Case 2: No stream is active.
327+
# This event contains the *entire* message.
328+
# We must send it, *unless* it's a duplicate of a stream that *just* finished.
329+
330+
# Check for duplicates from a *previous* stream in this *same run*.
331+
is_duplicate = (
332+
self._last_streamed_run_id == run_id and
333+
self._last_streamed_text is not None and
334+
combined_text == self._last_streamed_text
335+
)
336+
337+
if is_duplicate:
338+
logger.info(
339+
"⏭️ Skipping final response event (duplicate content detected from finished stream)"
340+
)
341+
else:
342+
# Not a duplicate, or no previous stream. Send the full message.
343+
logger.info(
344+
f"⏩ Delivering complete non-streamed message or final content event_id={adk_event.id}"
345+
)
300346
message_events = [
301347
TextMessageStartEvent(
302348
type=EventType.TEXT_MESSAGE_START,
303-
message_id=adk_event.id,
349+
message_id=adk_event.id, # Use event ID for non-streamed
304350
role="assistant",
305351
),
306352
TextMessageContentEvent(
@@ -316,70 +362,14 @@ async def _translate_text_content(
316362
for msg in message_events:
317363
yield msg
318364

319-
self._current_stream_text = ""
320-
self._last_streamed_text = None
321-
self._last_streamed_run_id = None
322-
return
323-
324-
if not self._is_streaming and adk_event.usage_metadata:
325-
if (
326-
self._last_streamed_text is None
327-
or self._last_streamed_run_id != run_id
328-
or combined_text != self._last_streamed_text
329-
):
330-
logger.info(
331-
f"⏩ Deliver final response after stream due to new content event_id={adk_event.id}"
332-
)
333-
message_events = [
334-
TextMessageStartEvent(
335-
type=EventType.TEXT_MESSAGE_START,
336-
message_id=adk_event.id,
337-
role="assistant",
338-
),
339-
TextMessageContentEvent(
340-
type=EventType.TEXT_MESSAGE_CONTENT,
341-
message_id=adk_event.id,
342-
delta=combined_text,
343-
),
344-
TextMessageEndEvent(
345-
type=EventType.TEXT_MESSAGE_END,
346-
message_id=adk_event.id,
347-
),
348-
]
349-
for msg in message_events:
350-
yield msg
351-
else:
352-
logger.info(
353-
"⏭️ Skipping final response event (duplicate content detected)"
354-
)
355-
356-
self._current_stream_text = ""
357-
self._last_streamed_text = None
358-
self._last_streamed_run_id = None
359-
return
360-
361-
logger.info("⏭️ Skipping final response event (content already streamed)")
362-
363-
if self._is_streaming and self._streaming_message_id:
364-
if self._current_stream_text:
365-
self._last_streamed_text = self._current_stream_text
366-
self._last_streamed_run_id = run_id
367-
self._current_stream_text = ""
368-
369-
end_event = TextMessageEndEvent(
370-
type=EventType.TEXT_MESSAGE_END,
371-
message_id=self._streaming_message_id
372-
)
373-
logger.info(f"📤 TEXT_MESSAGE_END (from final response): {end_event.model_dump_json()}")
374-
yield end_event
375-
376-
self._streaming_message_id = None
377-
self._is_streaming = False
378-
logger.info("🏁 Streaming completed via final response")
379-
365+
# Clean up state regardless, as this is the end of the line for text.
366+
self._current_stream_text = ""
367+
self._last_streamed_text = None
368+
self._last_streamed_run_id = None
380369
return
370+
381371

382-
# Handle streaming logic
372+
# Handle streaming logic (if not is_final_response)
383373
if not self._is_streaming:
384374
# Start of new message - emit START event
385375
self._streaming_message_id = str(uuid.uuid4())
@@ -593,9 +583,23 @@ def _create_state_snapshot_event(
593583
A StateSnapshotEvent
594584
"""
595585

586+
FullSnapShot = {
587+
"context": {
588+
"conversation": [],
589+
"user": {
590+
"name": state_snapshot.get("user_name", ""),
591+
"timezone": state_snapshot.get("timezone", "UTC")
592+
},
593+
"app": {
594+
"version": state_snapshot.get("app_version", "unknown")
595+
}
596+
},
597+
"state": state_snapshot.get("custom_state", {})
598+
}
599+
596600
return StateSnapshotEvent(
597601
type=EventType.STATE_SNAPSHOT,
598-
snapshot=state_snapshot
602+
snapshot=FullSnapShot
599603
)
600604

601605
async def force_close_streaming_message(self) -> AsyncGenerator[BaseEvent, None]:
@@ -636,3 +640,4 @@ def reset(self):
636640
self._last_streamed_run_id = None
637641
self.long_running_tool_ids.clear()
638642
logger.debug("Reset EventTranslator state (including streaming state)")
643+

0 commit comments

Comments
 (0)