Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 19 additions & 36 deletions temporalio/contrib/openai_agents/_trace_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,77 +371,60 @@ async def signal_child_workflow(
self, input: temporalio.worker.SignalChildWorkflowInput
) -> None:
trace = get_trace_provider().get_current_trace()
set_header_from_context(input, temporalio.workflow.payload_converter())
if trace:
with custom_span(
name="temporal:signalChildWorkflow",
data={"workflowId": input.child_workflow_id},
):
set_header_from_context(input, temporalio.workflow.payload_converter())
Copy link
Member

@cretz cretz Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this code changed here and in a few places. IIRC, it was intentional that we did set_header_from_context inside the with custom_span so the header has that current span serialized (and therefore spans that occur inside workflow will have this as a parent).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was, but I think it wasn't actually correct to do. Since the signal span would more or less immediately end, I think it is misleading to parent the child workflow execution to the signal span. Rather, the child workflow is parented to the parent workflow's span (or whatever user defined span is active at that point in the workflow).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same change I am making for startActivity essentially, the activity execution isn't parented to the startActivity, it's parented to the workflow, because really the start has already completed.

Copy link
Member

@cretz cretz Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is misleading to parent the child workflow execution to the signal span

It's not parenting a child workflow execution, it's parenting the child signal handler in this case (though in some cases it does). Not sure I agree here necessarily. At least in OTel at TracingWorkflowInboundInterceptor.handle_signal, we have chosen to model the signal span as "linked", not the parent, but in the absence of a concept of linking, I think the hierarchy should represent the most targeted span out there regardless of span duration.

It doesn't make sense to me that you have a bunch of orphan outbound spans just on a common workflow span and orphan inbound spans just on a common workflow span that don't relate to each other when they actually do relate to each other. It is totally reasonable in tracing contexts to have a starting span be the parent of the thing it starts even if the start was quick.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue that execution just isn't a child of the start in the normal sense. If we wanted to make a new span which covered the entire lifetime of the activity and was the parent of both start and execute, we could, though I don't really see the benefit that would provide. Doing so will also get us back in the detach handling game.

Copy link
Member

@cretz cretz Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a child in the normal sense, a child when you start workflow (top-level from client or child workflow from workflow), a child when you start activity (top-level on standalone activities or regular activity from workflow), a child when you start Nexus operation, etc, etc. This is how OTel tracing is done.

I think in this case, the consistency of matching SDK behavior elsewhere is valuable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Would you agree with the other portion of the change that even if startActivity is the parent of executeActivity, it should terminate when the scheduling is completed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In OTel, since we can't have an in-workflow duration, it's completed even before scheduling completed. There is no concept of "start completed", so IMO it makes sense for startActivity to span the entire run of the activity including all attempts, and executeActivity is actually per attempt (you can have 0 or many executeActivity spans in a startActivity).

So it's hard to say since OpenAI tracing is unique in that it lets us represent durations accurately. May be worth checking what TypeScript does here in OTel since that's the only other SDK I know of that lets us track span durations accurately. IMO, spans for starting activity, child, nexus op, signaling, etc should last their entire duration if they can, even though it is called "start" in the name to match our OTel span naming conventions. But it's not a strong opinion.

await self.next.signal_child_workflow(input)
else:
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_child_workflow(input)
await self.next.signal_child_workflow(input)

async def signal_external_workflow(
self, input: temporalio.worker.SignalExternalWorkflowInput
) -> None:
trace = get_trace_provider().get_current_trace()
set_header_from_context(input, temporalio.workflow.payload_converter())
if trace:
with custom_span(
name="temporal:signalExternalWorkflow",
data={"workflowId": input.workflow_id},
):
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_external_workflow(input)
else:
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_external_workflow(input)
await self.next.signal_external_workflow(input)

def start_activity(
self, input: temporalio.worker.StartActivityInput
) -> temporalio.workflow.ActivityHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
set_header_from_context(input, temporalio.workflow.payload_converter())
if trace:
span = custom_span(
with custom_span(
name="temporal:startActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)

set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_activity(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
):
return self.next.start_activity(input)
return self.next.start_activity(input)

async def start_child_workflow(
self, input: temporalio.worker.StartChildWorkflowInput
) -> temporalio.workflow.ChildWorkflowHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
set_header_from_context(input, temporalio.workflow.payload_converter())
if trace:
span = custom_span(
with custom_span(
name="temporal:startChildWorkflow", data={"workflow": input.workflow}
)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = await self.next.start_child_workflow(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
):
return await self.next.start_child_workflow(input)
return await self.next.start_child_workflow(input)

def start_local_activity(
self, input: temporalio.worker.StartLocalActivityInput
) -> temporalio.workflow.ActivityHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
set_header_from_context(input, temporalio.workflow.payload_converter())
if trace:
span = custom_span(
with custom_span(
name="temporal:startLocalActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_local_activity(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
):
return self.next.start_local_activity(input)
return self.next.start_local_activity(input)
21 changes: 12 additions & 9 deletions tests/contrib/openai_agents/test_openai_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,22 @@ def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
# Initial planner spans - There are only 3 because we don't make an actual model call
paired_span(processor.span_events[0], processor.span_events[5])
assert processor.span_events[0][0].span_data.export().get("name") == "PlannerAgent"
print("Planner span:", processor.span_events[0][0].span_id)

paired_span(processor.span_events[1], processor.span_events[4])
# startActivity happens before executeActivity
paired_span(processor.span_events[1], processor.span_events[2])
assert (
processor.span_events[1][0].span_data.export().get("name")
== "temporal:startActivity"
)
print("StartActivity span:", processor.span_events[1][0].span_id)

paired_span(processor.span_events[2], processor.span_events[3])
paired_span(processor.span_events[3], processor.span_events[4])
assert (
processor.span_events[2][0].span_data.export().get("name")
processor.span_events[3][0].span_data.export().get("name")
== "temporal:executeActivity"
)
print("Execute Span parent:", processor.span_events[3][0].parent_id)

for span, start in processor.span_events[6:-6]:
span_data = span.span_data.export()
Expand All @@ -113,28 +117,27 @@ def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
len(parents) == 2 and parents[0].span_data.export()["type"] == "agent"
)

# Execute is parented to start
# Execute is parented to the agent as well
if span_data.get("name") == "temporal:executeActivity":
parents = [
s for (s, _) in processor.span_events if s.span_id == span.parent_id
]
assert (
len(parents) == 2
and parents[0].span_data.export()["name"] == "temporal:startActivity"
len(parents) == 2 and parents[0].span_data.export()["type"] == "agent"
)

# Final writer spans - There are only 3 because we don't make an actual model call
paired_span(processor.span_events[-6], processor.span_events[-1])
assert processor.span_events[-6][0].span_data.export().get("name") == "WriterAgent"

paired_span(processor.span_events[-5], processor.span_events[-2])
paired_span(processor.span_events[-5], processor.span_events[-4])
assert (
processor.span_events[-5][0].span_data.export().get("name")
== "temporal:startActivity"
)

paired_span(processor.span_events[-4], processor.span_events[-3])
paired_span(processor.span_events[-3], processor.span_events[-2])
assert (
processor.span_events[-4][0].span_data.export().get("name")
processor.span_events[-3][0].span_data.export().get("name")
== "temporal:executeActivity"
)