Skip to content

Commit 8b67663

Browse files
Reasoning is printed out in dev notebook, need to find out why final response is not printed
1 parent be754ab commit 8b67663

File tree

6 files changed

+173
-72
lines changed

6 files changed

+173
-72
lines changed

examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/workflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,12 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
119119
You have access to sequential thinking and web search capabilities through MCP servers.
120120
Use these tools when appropriate to provide accurate and well-reasoned responses.""",
121121
parent_span_id=span.id if span else None,
122-
model="o4-mini",
122+
model="gpt-5-mini",
123123
model_settings=ModelSettings(
124124
# Include reasoning items in the response (IDs, summaries)
125125
# response_include=["reasoning.encrypted_content"],
126126
# Ask the model to include a short reasoning summary
127-
reasoning=Reasoning(effort="medium", summary="auto"),
127+
reasoning=Reasoning(effort="high", summary="detailed"),
128128
)
129129
)
130130
self._state.input_list = run_result.final_input_list

src/agentex/lib/core/services/adk/providers/litellm.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ async def chat_completion_auto_send(
112112
update=StreamTaskMessageFull(
113113
parent_task_message=streaming_context.task_message,
114114
content=final_content,
115+
type="full",
115116
),
116117
)
117118
else:
@@ -221,7 +222,8 @@ async def chat_completion_stream_auto_send(
221222
await streaming_context.stream_update(
222223
update=StreamTaskMessageDelta(
223224
parent_task_message=streaming_context.task_message,
224-
delta=TextDelta(text_delta=delta),
225+
delta=TextDelta(text_delta=delta, type="text"),
226+
type="delta",
225227
),
226228
)
227229
heartbeat_if_in_workflow("content chunk streamed")
@@ -244,6 +246,7 @@ async def chat_completion_stream_auto_send(
244246
update=StreamTaskMessageFull(
245247
parent_task_message=streaming_context.task_message,
246248
content=final_content,
249+
type="full",
247250
),
248251
)
249252

src/agentex/lib/core/services/adk/providers/openai.py

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ async def run_agent_auto_send(
325325
update=StreamTaskMessageFull(
326326
parent_task_message=streaming_context.task_message,
327327
content=text_content,
328+
type="full",
328329
),
329330
)
330331

@@ -349,6 +350,7 @@ async def run_agent_auto_send(
349350
update=StreamTaskMessageFull(
350351
parent_task_message=streaming_context.task_message,
351352
content=tool_request_content,
353+
type="full",
352354
),
353355
)
354356

@@ -372,6 +374,7 @@ async def run_agent_auto_send(
372374
update=StreamTaskMessageFull(
373375
parent_task_message=streaming_context.task_message,
374376
content=tool_response_content,
377+
type="full",
375378
),
376379
)
377380

@@ -657,37 +660,61 @@ async def run_agent_streamed_auto_send(
657660
),
658661
)
659662

663+
elif event.item.type == "message":
664+
# Handle message items - these are the actual text responses
665+
# Skip for now, will be handled by streaming deltas
666+
logger.info(f"Received message item with id: {event.item.raw_item.id if hasattr(event.item, 'raw_item') and hasattr(event.item.raw_item, 'id') else 'unknown'}")
667+
pass
668+
660669
elif event.item.type == "reasoning_item":
661-
# Handle reasoning items
670+
# Handle reasoning items
662671
reasoning_item = event.item.raw_item
663672

664-
reasoning_content = ReasoningContent(
665-
author="agent",
666-
style="static",
667-
summary=[summary.text for summary in reasoning_item.summary],
668-
content=[content.text for content in reasoning_item.content] if hasattr(reasoning_item, "content") and reasoning_item.content else None,
669-
type="reasoning",
670-
)
671-
672-
# Create reasoning content using streaming context (immediate completion)
673-
async with (
674-
self.streaming_service.streaming_task_message_context(
675-
task_id=task_id,
676-
initial_content=reasoning_content,
677-
) as streaming_context
678-
):
679-
# The message has already been persisted, but we still need to send an update
680-
await streaming_context.stream_update(
681-
update=StreamTaskMessageFull(
682-
parent_task_message=streaming_context.task_message,
683-
content=reasoning_content,
684-
),
673+
# Extract summaries
674+
summaries = []
675+
if hasattr(reasoning_item, "summary") and reasoning_item.summary:
676+
for summary in reasoning_item.summary:
677+
if hasattr(summary, "text") and summary.text:
678+
summaries.append(summary.text)
679+
680+
# Extract content
681+
contents = []
682+
if hasattr(reasoning_item, "content") and reasoning_item.content:
683+
for content in reasoning_item.content:
684+
if hasattr(content, "text") and content.text:
685+
contents.append(content.text)
686+
687+
# Only create reasoning message if we have content
688+
if summaries or contents:
689+
reasoning_content = ReasoningContent(
690+
author="agent",
691+
style="static",
692+
summary=summaries,
693+
content=contents if contents else None,
694+
type="reasoning",
685695
)
686696

697+
# Create reasoning content using streaming context (immediate completion)
698+
async with (
699+
self.streaming_service.streaming_task_message_context(
700+
task_id=task_id,
701+
initial_content=reasoning_content,
702+
) as streaming_context
703+
):
704+
# The message has already been persisted, but we still need to send an update
705+
await streaming_context.stream_update(
706+
update=StreamTaskMessageFull(
707+
parent_task_message=streaming_context.task_message,
708+
content=reasoning_content,
709+
type="full",
710+
),
711+
)
712+
687713
elif event.type == "raw_response_event":
688714
if isinstance(event.data, ResponseTextDeltaEvent):
689715
# Handle text delta
690716
item_id = event.data.item_id
717+
logger.debug(f"Text delta for item_id={item_id}, delta_len={len(event.data.delta) if event.data.delta else 0}")
691718

692719
# Check if we already have a streaming context for this item
693720
if item_id not in item_id_to_streaming_context:
@@ -713,7 +740,8 @@ async def run_agent_streamed_auto_send(
713740
await streaming_context.stream_update(
714741
update=StreamTaskMessageDelta(
715742
parent_task_message=streaming_context.task_message,
716-
delta=TextDelta(text_delta=event.data.delta),
743+
delta=TextDelta(text_delta=event.data.delta, type="text"),
744+
type="delta",
717745
),
718746
)
719747

@@ -731,6 +759,8 @@ async def run_agent_streamed_auto_send(
731759
author="agent",
732760
summary=[],
733761
content=[],
762+
type="reasoning",
763+
style="active",
734764
),
735765
)
736766
# Open the streaming context
@@ -750,7 +780,9 @@ async def run_agent_streamed_auto_send(
750780
delta=ReasoningSummaryDelta(
751781
summary_index=summary_index,
752782
summary_delta=event.data.delta,
783+
type="reasoning_summary",
753784
),
785+
type="delta",
754786
),
755787
)
756788

@@ -768,6 +800,8 @@ async def run_agent_streamed_auto_send(
768800
author="agent",
769801
summary=[],
770802
content=[],
803+
type="reasoning",
804+
style="active",
771805
),
772806
)
773807
# Open the streaming context
@@ -787,7 +821,9 @@ async def run_agent_streamed_auto_send(
787821
delta=ReasoningContentDelta(
788822
content_index=content_index,
789823
content_delta=event.data.delta,
824+
type="reasoning_content",
790825
),
826+
type="delta",
791827
),
792828
)
793829

src/agentex/lib/core/services/adk/streaming.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from typing import Literal, cast
2+
from typing import Literal
33

44
from agentex import AsyncAgentex
55
from agentex.lib.core.adapters.streams.port import StreamRepository
@@ -16,6 +16,8 @@
1616
DataDelta,
1717
ToolRequestDelta,
1818
ToolResponseDelta,
19+
ReasoningSummaryDelta,
20+
ReasoningContentDelta,
1921
)
2022
from agentex.lib.utils.logging import make_logger
2123
from agentex.types.data_content import DataContent
@@ -26,6 +28,7 @@
2628
from agentex.types.text_content import TextContent
2729
from agentex.types.tool_request_content import ToolRequestContent
2830
from agentex.types.tool_response_content import ToolResponseContent
31+
from agentex.types.reasoning_content import ReasoningContent
2932

3033
logger = make_logger(__name__)
3134

@@ -37,7 +40,10 @@ def _get_stream_topic(task_id: str) -> str:
3740
class DeltaAccumulator:
3841
def __init__(self):
3942
self._accumulated_deltas: list[TaskMessageDelta] = []
40-
self._delta_type: Literal["text", "data", "tool_request", "tool_response"] | None = None
43+
self._delta_type: Literal["text", "data", "tool_request", "tool_response", "reasoning"] | None = None
44+
# For reasoning, we need to track both summary and content deltas
45+
self._reasoning_summaries: dict[int, str] = {}
46+
self._reasoning_contents: dict[int, str] = {}
4147

4248
def add_delta(self, delta: TaskMessageDelta):
4349
if self._delta_type is None:
@@ -49,15 +55,35 @@ def add_delta(self, delta: TaskMessageDelta):
4955
self._delta_type = "tool_request"
5056
elif delta.type == "tool_response":
5157
self._delta_type = "tool_response"
58+
elif delta.type in ["reasoning_summary", "reasoning_content"]:
59+
self._delta_type = "reasoning"
5260
else:
5361
raise ValueError(f"Unknown delta type: {delta.type}")
5462
else:
55-
if self._delta_type != delta.type:
63+
# For reasoning, we allow both summary and content deltas
64+
if self._delta_type == "reasoning":
65+
if delta.type not in ["reasoning_summary", "reasoning_content"]:
66+
raise ValueError(
67+
f"Expected reasoning delta but got: {delta.type}"
68+
)
69+
elif self._delta_type != delta.type:
5670
raise ValueError(
5771
f"Delta type mismatch: {self._delta_type} != {delta.type}"
5872
)
5973

60-
self._accumulated_deltas.append(delta)
74+
# Handle reasoning deltas specially
75+
if delta.type == "reasoning_summary":
76+
if isinstance(delta, ReasoningSummaryDelta):
77+
if delta.summary_index not in self._reasoning_summaries:
78+
self._reasoning_summaries[delta.summary_index] = ""
79+
self._reasoning_summaries[delta.summary_index] += delta.summary_delta or ""
80+
elif delta.type == "reasoning_content":
81+
if isinstance(delta, ReasoningContentDelta):
82+
if delta.content_index not in self._reasoning_contents:
83+
self._reasoning_contents[delta.content_index] = ""
84+
self._reasoning_contents[delta.content_index] += delta.content_delta or ""
85+
else:
86+
self._accumulated_deltas.append(delta)
6187

6288
def convert_to_content(self) -> TaskMessageContent:
6389
if self._delta_type == "text":
@@ -66,6 +92,7 @@ def convert_to_content(self) -> TaskMessageContent:
6692
text_content_str = "".join(
6793
[delta.text_delta or "" for delta in text_deltas]
6894
)
95+
logger.info(f"Converting text deltas: {len(text_deltas)} deltas, total length: {len(text_content_str)}")
6996
return TextContent(
7097
author="agent",
7198
content=text_content_str,
@@ -108,14 +135,34 @@ def convert_to_content(self) -> TaskMessageContent:
108135
# Type assertion: we know all deltas are ToolResponseDelta when _delta_type is TOOL_RESPONSE
109136
tool_response_deltas = [delta for delta in self._accumulated_deltas if isinstance(delta, ToolResponseDelta)]
110137
tool_response_content_str = "".join(
111-
[delta.tool_response_delta or "" for delta in tool_response_deltas]
138+
[delta.content_delta or "" for delta in tool_response_deltas]
112139
)
113140
return ToolResponseContent(
114141
author="agent",
115142
tool_call_id=tool_response_deltas[0].tool_call_id,
116143
name=tool_response_deltas[0].name,
117144
content=tool_response_content_str,
118145
)
146+
elif self._delta_type == "reasoning":
147+
# Convert accumulated reasoning deltas to ReasoningContent
148+
# Sort by index to maintain order
149+
summary_list = [self._reasoning_summaries[i] for i in sorted(self._reasoning_summaries.keys()) if self._reasoning_summaries[i]]
150+
content_list = [self._reasoning_contents[i] for i in sorted(self._reasoning_contents.keys()) if self._reasoning_contents[i]]
151+
152+
# Only return reasoning content if we have non-empty summaries or content
153+
if summary_list or content_list:
154+
return ReasoningContent(
155+
author="agent",
156+
summary=summary_list,
157+
content=content_list if content_list else None,
158+
type="reasoning",
159+
)
160+
else:
161+
# Return empty text content instead of empty reasoning
162+
return TextContent(
163+
author="agent",
164+
content="",
165+
)
119166
else:
120167
raise ValueError(f"Unknown delta type: {self._delta_type}")
121168

@@ -155,6 +202,7 @@ async def open(self) -> "StreamingTaskMessageContext":
155202
start_event = StreamTaskMessageStart(
156203
parent_task_message=self.task_message,
157204
content=self.initial_content,
205+
type="start",
158206
)
159207
await self._streaming_service.stream_update(start_event)
160208

@@ -169,11 +217,19 @@ async def close(self) -> TaskMessage:
169217
return self.task_message # Already done
170218

171219
# Send the DONE event
172-
done_event = StreamTaskMessageDone(parent_task_message=self.task_message)
220+
done_event = StreamTaskMessageDone(
221+
parent_task_message=self.task_message,
222+
type="done",
223+
)
173224
await self._streaming_service.stream_update(done_event)
174225

175226
# Update the task message with the final content
176-
if self._delta_accumulator._accumulated_deltas:
227+
has_deltas = (
228+
self._delta_accumulator._accumulated_deltas or
229+
self._delta_accumulator._reasoning_summaries or
230+
self._delta_accumulator._reasoning_contents
231+
)
232+
if has_deltas:
177233
self.task_message.content = self._delta_accumulator.convert_to_content()
178234

179235
await self._agentex_client.messages.update(

0 commit comments

Comments
 (0)