Skip to content

Commit 8c45391

Browse files
committed
Add better tracing for sync_provider
1 parent 7046381 commit 8c45391

File tree

1 file changed

+231
-19
lines changed

1 file changed

+231
-19
lines changed

src/agentex/lib/adk/providers/_modules/sync_provider.py

Lines changed: 231 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,86 @@ async def get_response(
109109

110110
response = await self.original_model.get_response(**kwargs)
111111

112-
# Set span output
113-
if span:
112+
# Set span output with structured data
113+
if span and response:
114+
new_items = []
115+
final_output = None
116+
117+
# Extract final output text from response
118+
if hasattr(response, 'final_output') and response.final_output:
119+
final_output = response.final_output
120+
121+
# Extract items from the response output
122+
if hasattr(response, 'output') and response.output:
123+
output_items = response.output if isinstance(response.output, list) else [response.output]
124+
125+
for item in output_items:
126+
# Handle reasoning items
127+
if hasattr(item, 'type') and item.type == 'reasoning':
128+
reasoning_summary = []
129+
if hasattr(item, 'summary') and item.summary:
130+
for summary_part in item.summary:
131+
if hasattr(summary_part, 'text'):
132+
reasoning_summary.append({
133+
"text": summary_part.text,
134+
"type": "summary_text"
135+
})
136+
137+
new_items.append({
138+
"id": getattr(item, 'id', None),
139+
"type": "reasoning",
140+
"status": getattr(item, 'status', None),
141+
"content": None,
142+
"summary": reasoning_summary if reasoning_summary else None,
143+
})
144+
145+
# Handle tool call items
146+
elif hasattr(item, 'type') and item.type == 'function_call':
147+
new_items.append({
148+
"id": getattr(item, 'id', None),
149+
"name": getattr(item, 'name', None),
150+
"type": "function_call",
151+
"status": getattr(item, 'status', 'completed'),
152+
"call_id": getattr(item, 'call_id', None),
153+
"arguments": getattr(item, 'arguments', None),
154+
})
155+
156+
# Handle tool output items
157+
elif hasattr(item, 'type') and item.type == 'function_call_output':
158+
new_items.append({
159+
"type": "function_call_output",
160+
"output": getattr(item, 'output', None),
161+
"call_id": getattr(item, 'call_id', None),
162+
})
163+
164+
# Handle message items
165+
elif hasattr(item, 'type') and item.type == 'message':
166+
content = []
167+
message_text = ""
168+
if hasattr(item, 'content') and item.content:
169+
for content_part in item.content:
170+
if hasattr(content_part, 'text'):
171+
content.append({
172+
"text": content_part.text,
173+
"type": "output_text",
174+
})
175+
message_text = content_part.text
176+
177+
new_items.append({
178+
"id": getattr(item, 'id', None),
179+
"role": getattr(item, 'role', 'assistant'),
180+
"type": "message",
181+
"status": getattr(item, 'status', 'completed'),
182+
"content": content,
183+
})
184+
185+
# Use message text as final output if we have it
186+
if message_text and not final_output:
187+
final_output = message_text
188+
114189
span.output = {
115-
"response": str(response) if response else None,
190+
"new_items": new_items,
191+
"final_output": final_output,
116192
}
117193

118194
return response
@@ -160,7 +236,9 @@ async def stream_response(
160236
# Wrap the streaming in a tracing span if tracer is available
161237
if self.tracer and self.trace_id:
162238
trace = self.tracer.trace(self.trace_id)
163-
async with trace.span(
239+
240+
# Manually start the span instead of using context manager
241+
span = await trace.start_span(
164242
parent_id=self.parent_span_id,
165243
name="run_agent_streamed",
166244
input={
@@ -172,7 +250,9 @@ async def stream_response(
172250
"handoffs": [str(h) for h in handoffs] if handoffs else [],
173251
"previous_response_id": previous_response_id,
174252
},
175-
) as span:
253+
)
254+
255+
try:
176256
# Get the stream from the original model
177257
stream_kwargs = {
178258
"system_instructions": system_instructions,
@@ -193,23 +273,155 @@ async def stream_response(
193273
# Get the stream response from the original model and yield each event
194274
stream_response = self.original_model.stream_response(**stream_kwargs)
195275

196-
# Pass through each event from the original stream
197-
event_count = 0
198-
final_output = None
276+
# Pass through each event from the original stream and track items
277+
new_items = []
278+
final_response_text = ""
279+
current_text_item = None
280+
tool_call_map = {} # Map call_id to tool name
281+
199282
async for event in stream_response:
200-
event_count += 1
201-
# Track the final output if available
202-
if hasattr(event, 'type') and event.type == 'raw_response_event':
203-
if hasattr(event.data, 'output'):
204-
final_output = event.data.output
283+
event_type = getattr(event, 'type', 'no-type')
284+
285+
# Handle response.output_item.done events which contain completed items
286+
if event_type == 'response.output_item.done':
287+
if hasattr(event, 'item'):
288+
item = event.item
289+
290+
# Handle function call (tool request)
291+
if hasattr(item, 'type') and item.type == 'function_call':
292+
new_items.append({
293+
"id": getattr(item, 'id', None),
294+
"name": getattr(item, 'name', None),
295+
"type": "function_call",
296+
"status": getattr(item, 'status', 'completed'),
297+
"call_id": getattr(item, 'call_id', None),
298+
"arguments": getattr(item, 'arguments', None),
299+
})
300+
tool_call_map[item.call_id] = item.name
301+
302+
# Handle completed message items (final text output)
303+
elif hasattr(item, 'type') and item.type == 'message':
304+
content = []
305+
message_text = ""
306+
if hasattr(item, 'content') and item.content:
307+
for content_part in item.content:
308+
if hasattr(content_part, 'text'):
309+
content.append({
310+
"text": content_part.text,
311+
"type": "output_text",
312+
})
313+
# Use the complete text from the message as final_output
314+
message_text = content_part.text
315+
316+
new_items.append({
317+
"id": getattr(item, 'id', None),
318+
"role": getattr(item, 'role', 'assistant'),
319+
"type": "message",
320+
"status": getattr(item, 'status', 'completed'),
321+
"content": content,
322+
})
323+
324+
# Update final_response_text with the complete message text
325+
if message_text:
326+
final_response_text = message_text
327+
328+
# Track reasoning, tool calls, and responses from run_item_stream_event (kept for compatibility)
329+
if hasattr(event, 'type') and event.type == 'run_item_stream_event':
330+
if hasattr(event, 'item'):
331+
item = event.item
332+
333+
# Handle reasoning items
334+
if item.type == 'reasoning_item':
335+
reasoning_summary = []
336+
if hasattr(item, 'raw_item') and hasattr(item.raw_item, 'summary'):
337+
for summary_part in item.raw_item.summary:
338+
if hasattr(summary_part, 'text'):
339+
reasoning_summary.append({
340+
"text": summary_part.text,
341+
"type": "summary_text"
342+
})
343+
344+
new_items.append({
345+
"id": getattr(item.raw_item, 'id', None),
346+
"type": "reasoning",
347+
"status": getattr(item.raw_item, 'status', None),
348+
"content": None,
349+
"summary": reasoning_summary if reasoning_summary else None,
350+
})
351+
352+
# Handle tool call items
353+
elif item.type == 'tool_call_item':
354+
call_id, tool_name, tool_arguments = _extract_tool_call_info(item.raw_item)
355+
tool_call_map[call_id] = tool_name
356+
357+
new_items.append({
358+
"id": getattr(item.raw_item, 'id', None),
359+
"name": tool_name,
360+
"type": "function_call",
361+
"status": getattr(item.raw_item, 'status', 'completed'),
362+
"call_id": call_id,
363+
"arguments": str(tool_arguments) if isinstance(tool_arguments, dict) else tool_arguments,
364+
})
365+
366+
# Handle tool output items
367+
elif item.type == 'tool_call_output_item':
368+
call_id, tool_name, content = _extract_tool_response_info(tool_call_map, item.raw_item)
369+
370+
new_items.append({
371+
"type": "function_call_output",
372+
"output": content,
373+
"call_id": call_id,
374+
})
375+
376+
# Accumulate text deltas to build final response
377+
# Note: OpenAI Agents SDK can emit events in different formats
378+
if hasattr(event, 'type') and event.type == 'response.output_text.delta':
379+
# Direct event type from OpenAI Agents SDK (observed in practice)
380+
if hasattr(event, 'delta'):
381+
final_response_text += event.delta
382+
383+
# Handle raw_response_event wrapper (alternative event format, kept for compatibility)
384+
elif hasattr(event, 'type') and event.type == 'raw_response_event':
385+
if hasattr(event, 'data'):
386+
raw_event = event.data
387+
388+
# Track when output items are added
389+
if isinstance(raw_event, ResponseOutputItemAddedEvent):
390+
if hasattr(raw_event, 'item') and raw_event.item.type == 'message':
391+
current_text_item = {
392+
"id": getattr(raw_event.item, 'id', None),
393+
"role": getattr(raw_event.item, 'role', 'assistant'),
394+
"type": "message",
395+
"status": "in_progress",
396+
"content": []
397+
}
398+
399+
# Check if this is a text delta event
400+
elif isinstance(raw_event, ResponseTextDeltaEvent):
401+
if hasattr(raw_event, 'delta') and raw_event.delta:
402+
final_response_text += raw_event.delta
403+
404+
# Track when output items are done
405+
elif isinstance(raw_event, ResponseOutputItemDoneEvent):
406+
if current_text_item and final_response_text:
407+
current_text_item["status"] = "completed"
408+
current_text_item["content"] = [{
409+
"text": final_response_text,
410+
"type": "output_text",
411+
}]
412+
new_items.append(current_text_item)
413+
current_text_item = None
414+
205415
yield event
206416

207-
# Set span output
208-
if span:
209-
span.output = {
210-
"event_count": event_count,
211-
"final_output": str(final_output) if final_output else None,
212-
}
417+
# Set span output with structured data including tool calls and final response
418+
span.output = {
419+
"new_items": new_items,
420+
"final_output": final_response_text if final_response_text else None,
421+
}
422+
finally:
423+
# End the span after all events have been yielded
424+
await trace.end_span(span)
213425
else:
214426
# No tracing, just stream normally
215427
# Get the stream from the original model

0 commit comments

Comments
 (0)