Skip to content

Commit 1500161

Browse files
committed
Add better tracing for sync_provider
1 parent a08a0a5 commit 1500161

File tree

1 file changed

+196
-19
lines changed

1 file changed

+196
-19
lines changed

src/agentex/lib/adk/providers/_modules/sync_provider.py

Lines changed: 196 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,12 @@ async def get_response(
109109

110110
response = await self.original_model.get_response(**kwargs)
111111

112-
# Set span output
113-
if span:
112+
# Set span output with structured data
113+
if span and response:
114+
new_items, final_output = _extract_response_items(response)
114115
span.output = {
115-
"response": str(response) if response else None,
116+
"new_items": new_items,
117+
"final_output": final_output if final_output else None,
116118
}
117119

118120
return response
@@ -160,7 +162,9 @@ async def stream_response(
160162
# Wrap the streaming in a tracing span if tracer is available
161163
if self.tracer and self.trace_id:
162164
trace = self.tracer.trace(self.trace_id)
163-
async with trace.span(
165+
166+
# Manually start the span instead of using context manager
167+
span = await trace.start_span(
164168
parent_id=self.parent_span_id,
165169
name="run_agent_streamed",
166170
input={
@@ -172,7 +176,9 @@ async def stream_response(
172176
"handoffs": [str(h) for h in handoffs] if handoffs else [],
173177
"previous_response_id": previous_response_id,
174178
},
175-
) as span:
179+
)
180+
181+
try:
176182
# Get the stream from the original model
177183
stream_kwargs = {
178184
"system_instructions": system_instructions,
@@ -193,23 +199,110 @@ async def stream_response(
193199
# Get the stream response from the original model and yield each event
194200
stream_response = self.original_model.stream_response(**stream_kwargs)
195201

196-
# Pass through each event from the original stream
197-
event_count = 0
198-
final_output = None
202+
# Pass through each event from the original stream and track items
203+
new_items = []
204+
final_response_text = ""
205+
current_text_item = None
206+
tool_call_map = {} # Map call_id to tool name
207+
199208
async for event in stream_response:
200-
event_count += 1
201-
# Track the final output if available
202-
if hasattr(event, 'type') and event.type == 'raw_response_event':
203-
if hasattr(event.data, 'output'):
204-
final_output = event.data.output
209+
# Track reasoning, tool calls, and responses from run_item_stream_event
210+
if hasattr(event, 'type') and event.type == 'run_item_stream_event':
211+
if hasattr(event, 'item'):
212+
item = event.item
213+
214+
# Handle reasoning items
215+
if item.type == 'reasoning_item':
216+
reasoning_summary = []
217+
if hasattr(item, 'raw_item') and hasattr(item.raw_item, 'summary'):
218+
for summary_part in item.raw_item.summary:
219+
if hasattr(summary_part, 'text'):
220+
reasoning_summary.append({
221+
"text": summary_part.text,
222+
"type": "summary_text"
223+
})
224+
225+
new_items.append({
226+
"id": getattr(item.raw_item, 'id', None),
227+
"type": "reasoning",
228+
"status": getattr(item.raw_item, 'status', None),
229+
"content": None,
230+
"summary": reasoning_summary if reasoning_summary else None,
231+
})
232+
233+
# Handle tool call items
234+
elif item.type == 'tool_call_item':
235+
call_id, tool_name, tool_arguments = _extract_tool_call_info(item.raw_item)
236+
tool_call_map[call_id] = tool_name
237+
238+
new_items.append({
239+
"id": getattr(item.raw_item, 'id', None),
240+
"name": tool_name,
241+
"type": "function_call",
242+
"status": getattr(item.raw_item, 'status', 'completed'),
243+
"call_id": call_id,
244+
"arguments": str(tool_arguments) if isinstance(tool_arguments, dict) else tool_arguments,
245+
})
246+
247+
# Handle tool output items
248+
elif item.type == 'tool_call_output_item':
249+
call_id, tool_name, content = _extract_tool_response_info(tool_call_map, item.raw_item)
250+
251+
new_items.append({
252+
"type": "function_call_output",
253+
"output": content,
254+
"call_id": call_id,
255+
})
256+
257+
# Accumulate text deltas to build final response
258+
# Note: OpenAI Agents SDK can emit events in different formats
259+
if hasattr(event, 'type') and event.type == 'response.output_text.delta':
260+
# Direct event type from OpenAI Agents SDK (observed in practice)
261+
if hasattr(event, 'delta'):
262+
final_response_text += event.delta
263+
264+
# Handle raw_response_event wrapper (alternative event format, kept for compatibility)
265+
elif hasattr(event, 'type') and event.type == 'raw_response_event':
266+
if hasattr(event, 'data'):
267+
raw_event = event.data
268+
269+
# Track when output items are added
270+
if isinstance(raw_event, ResponseOutputItemAddedEvent):
271+
if hasattr(raw_event, 'item') and raw_event.item.type == 'message':
272+
current_text_item = {
273+
"id": getattr(raw_event.item, 'id', None),
274+
"role": getattr(raw_event.item, 'role', 'assistant'),
275+
"type": "message",
276+
"status": "in_progress",
277+
"content": []
278+
}
279+
280+
# Check if this is a text delta event
281+
elif isinstance(raw_event, ResponseTextDeltaEvent):
282+
if hasattr(raw_event, 'delta') and raw_event.delta:
283+
final_response_text += raw_event.delta
284+
285+
# Track when output items are done
286+
elif isinstance(raw_event, ResponseOutputItemDoneEvent):
287+
if current_text_item and final_response_text:
288+
current_text_item["status"] = "completed"
289+
current_text_item["content"] = [{
290+
"text": final_response_text,
291+
"type": "output_text",
292+
}]
293+
new_items.append(current_text_item)
294+
current_text_item = None
295+
205296
yield event
206297

207-
# Set span output
208-
if span:
209-
span.output = {
210-
"event_count": event_count,
211-
"final_output": str(final_output) if final_output else None,
212-
}
298+
# Set span output with structured data including tool calls and final response
299+
span.output = {
300+
"new_items": new_items,
301+
"final_output": final_response_text if final_response_text else None,
302+
}
303+
finally:
304+
# End the span after all events have been yielded
305+
await trace.end_span(span)
213306
else:
214307
# No tracing, just stream normally
215308
# Get the stream from the original model
@@ -275,6 +368,90 @@ def get_model(self, model_name: Optional[str] = None) -> Model:
275368
return wrapped_model
276369

277370

371+
def _extract_response_items(response: Any) -> tuple[list[dict[str, Any]], str]:
372+
"""
373+
Extract new_items and final_output from a ModelResponse object.
374+
375+
Args:
376+
response: The ModelResponse object to extract from
377+
378+
Returns:
379+
A tuple of (new_items, final_output)
380+
"""
381+
new_items = []
382+
final_output = ""
383+
384+
# Extract final output text first - try multiple sources
385+
if hasattr(response, 'final_output') and response.final_output:
386+
final_output = response.final_output
387+
elif hasattr(response, 'text') and response.text:
388+
final_output = response.text
389+
elif hasattr(response, 'content') and response.content:
390+
final_output = response.content
391+
392+
# Extract items from the response
393+
if hasattr(response, 'new_items') and response.new_items:
394+
for item in response.new_items:
395+
# Handle reasoning items
396+
if hasattr(item, 'type') and item.type == 'reasoning':
397+
reasoning_summary = []
398+
if hasattr(item, 'summary') and item.summary:
399+
for summary_part in item.summary:
400+
if hasattr(summary_part, 'text'):
401+
reasoning_summary.append({
402+
"text": summary_part.text,
403+
"type": "summary_text"
404+
})
405+
406+
new_items.append({
407+
"id": getattr(item, 'id', None),
408+
"type": "reasoning",
409+
"status": getattr(item, 'status', None),
410+
"content": None,
411+
"summary": reasoning_summary if reasoning_summary else None,
412+
})
413+
414+
# Handle tool call items
415+
elif hasattr(item, 'type') and item.type == 'function_call':
416+
new_items.append({
417+
"id": getattr(item, 'id', None),
418+
"name": getattr(item, 'name', None),
419+
"type": "function_call",
420+
"status": getattr(item, 'status', 'completed'),
421+
"call_id": getattr(item, 'call_id', None),
422+
"arguments": getattr(item, 'arguments', None),
423+
})
424+
425+
# Handle tool output items
426+
elif hasattr(item, 'type') and item.type == 'function_call_output':
427+
new_items.append({
428+
"type": "function_call_output",
429+
"output": getattr(item, 'output', None),
430+
"call_id": getattr(item, 'call_id', None),
431+
})
432+
433+
# Handle message items
434+
elif hasattr(item, 'type') and item.type == 'message':
435+
content = []
436+
if hasattr(item, 'content') and item.content:
437+
for content_part in item.content:
438+
if hasattr(content_part, 'text'):
439+
content.append({
440+
"text": content_part.text,
441+
"type": "output_text",
442+
})
443+
444+
new_items.append({
445+
"id": getattr(item, 'id', None),
446+
"role": getattr(item, 'role', 'assistant'),
447+
"type": "message",
448+
"status": getattr(item, 'status', 'completed'),
449+
"content": content,
450+
})
451+
452+
return new_items, final_output
453+
454+
278455
def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, Any]]:
279456
"""
280457
Extract call_id, tool_name, and tool_arguments from a tool call item.

0 commit comments

Comments
 (0)