From 252254e3d2bd5950487a75b15c9ba97cbbb64f4b Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 16 Jul 2025 10:54:10 -0700 Subject: [PATCH 1/6] Fixing tracing issues. Propagate span parentage, remove startActivity --- .../openai_agents/_temporal_trace_provider.py | 1 + .../openai_agents/_trace_interceptor.py | 43 +++++++++++-------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/temporalio/contrib/openai_agents/_temporal_trace_provider.py b/temporalio/contrib/openai_agents/_temporal_trace_provider.py index 1d9b09866..a00c84a1c 100644 --- a/temporalio/contrib/openai_agents/_temporal_trace_provider.py +++ b/temporalio/contrib/openai_agents/_temporal_trace_provider.py @@ -1,5 +1,6 @@ """Provides support for integration with OpenAI Agents SDK tracing across workflows""" +import traceback import uuid from typing import Any, Optional, Union, cast diff --git a/temporalio/contrib/openai_agents/_trace_interceptor.py b/temporalio/contrib/openai_agents/_trace_interceptor.py index 483e01147..c6f3a0569 100644 --- a/temporalio/contrib/openai_agents/_trace_interceptor.py +++ b/temporalio/contrib/openai_agents/_trace_interceptor.py @@ -5,11 +5,13 @@ from contextlib import contextmanager from typing import Any, Mapping, Protocol, Type -from agents import custom_span, get_current_span, trace +from agents import CustomSpanData, custom_span, get_current_span, trace from agents.tracing import ( get_trace_provider, ) -from agents.tracing.spans import NoOpSpan +from agents.tracing.provider import DefaultTraceProvider +from agents.tracing.scope import Scope +from agents.tracing.spans import NoOpSpan, SpanImpl import temporalio.activity import temporalio.api.common.v1 @@ -69,7 +71,8 @@ def context_from_header( if activity.in_activity() else None ) - if get_trace_provider().get_current_trace() is None: + current_trace = get_trace_provider().get_current_trace() + if current_trace is None: metadata = { "temporal:workflowId": activity.info().workflow_id if activity.in_activity() @@ -79,16 +82,21 @@ def context_from_header( else workflow.info().run_id, "temporal:workflowType": workflow_type, } - with trace( + current_trace = trace( span_info["traceName"], trace_id=span_info["traceId"], metadata=metadata, - ) as t: - with custom_span(name=span_name, parent=t, data=data): - yield - else: - with custom_span(name=span_name, parent=None, data=data): - yield + ) + Scope.set_current_trace(current_trace) + current_span = get_trace_provider().get_current_span() + if current_span is None: + current_span = get_trace_provider().create_span( + span_data=CustomSpanData(name="", data={}), span_id=span_info["spanId"] + ) + Scope.set_current_span(current_span) + + with custom_span(name=span_name, parent=current_span, data=data): + yield class OpenAIAgentsTracingInterceptor( @@ -115,7 +123,7 @@ class OpenAIAgentsTracingInterceptor( worker = Worker(client, task_queue="my-task-queue", interceptors=[interceptor]) """ - def __init__( # type: ignore[reportMissingSuperCall] + def __init__( self, payload_converter: temporalio.converter.PayloadConverter = temporalio.converter.default().payload_converter, ) -> None: @@ -188,7 +196,7 @@ async def start_workflow( **({"temporal:workflowId": input.id} if input.id else {}), } data = {"workflowId": input.id} if input.id else None - span_name = "temporal:startWorkflow" + span_name = f"temporal:startWorkflow" if get_trace_provider().get_current_trace() is None: with trace( span_name + ":" + input.workflow, metadata=metadata, group_id=input.id @@ -207,7 +215,7 @@ async def query_workflow(self, input: temporalio.client.QueryWorkflowInput) -> A **({"temporal:workflowId": input.id} if input.id else {}), } data = {"workflowId": input.id, "query": input.query} - span_name = "temporal:queryWorkflow" + span_name = f"temporal:queryWorkflow" if get_trace_provider().get_current_trace() is None: with trace(span_name, metadata=metadata, group_id=input.id): with custom_span(name=span_name, data=data): @@ -226,7 +234,7 @@ async def signal_workflow( **({"temporal:workflowId": input.id} if input.id else {}), } data = {"workflowId": input.id, "signal": input.signal} - span_name = "temporal:signalWorkflow" + span_name = f"temporal:signalWorkflow" if get_trace_provider().get_current_trace() is None: with trace(span_name, metadata=metadata, group_id=input.id): with custom_span(name=span_name, data=data): @@ -337,11 +345,8 @@ async def signal_external_workflow( def start_activity( self, input: temporalio.worker.StartActivityInput ) -> temporalio.workflow.ActivityHandle: - with custom_span( - name=f"temporal:startActivity:{input.activity}", - ): - set_header_from_context(input, temporalio.workflow.payload_converter()) - return self.next.start_activity(input) + set_header_from_context(input, temporalio.workflow.payload_converter()) + return self.next.start_activity(input) async def start_child_workflow( self, input: temporalio.worker.StartChildWorkflowInput From 40602bbec9becbe9e87894591c31fabdc66d8ec0 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 16 Jul 2025 10:56:22 -0700 Subject: [PATCH 2/6] Remove traceback --- temporalio/contrib/openai_agents/_temporal_trace_provider.py | 1 - 1 file changed, 1 deletion(-) diff --git a/temporalio/contrib/openai_agents/_temporal_trace_provider.py b/temporalio/contrib/openai_agents/_temporal_trace_provider.py index a00c84a1c..1d9b09866 100644 --- a/temporalio/contrib/openai_agents/_temporal_trace_provider.py +++ b/temporalio/contrib/openai_agents/_temporal_trace_provider.py @@ -1,6 +1,5 @@ """Provides support for integration with OpenAI Agents SDK tracing across workflows""" -import traceback import uuid from typing import Any, Optional, Union, cast From 8476649f96db0f2a04b4085435c3f9fd5f950f99 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 17 Jul 2025 08:40:30 -0700 Subject: [PATCH 3/6] Add start traces with correct durations --- .../openai_agents/_trace_interceptor.py | 47 +++++++++++++++---- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/temporalio/contrib/openai_agents/_trace_interceptor.py b/temporalio/contrib/openai_agents/_trace_interceptor.py index c6f3a0569..94b063dd3 100644 --- a/temporalio/contrib/openai_agents/_trace_interceptor.py +++ b/temporalio/contrib/openai_agents/_trace_interceptor.py @@ -3,7 +3,7 @@ from __future__ import annotations from contextlib import contextmanager -from typing import Any, Mapping, Protocol, Type +from typing import Any, Mapping, Protocol, Type, cast from agents import CustomSpanData, custom_span, get_current_span, trace from agents.tracing import ( @@ -67,7 +67,10 @@ def context_from_header( else workflow.info().workflow_type ) data = ( - {"activityId": activity.info().activity_id} + { + "activityId": activity.info().activity_id, + "activity": activity.info().activity_type, + } if activity.in_activity() else None ) @@ -333,29 +336,55 @@ class _ContextPropagationWorkflowOutboundInterceptor( async def signal_child_workflow( self, input: temporalio.worker.SignalChildWorkflowInput ) -> None: - set_header_from_context(input, temporalio.workflow.payload_converter()) - return await self.next.signal_child_workflow(input) + with custom_span( + name=f"temporal:signalChildWorkflow", + data={"workflowId": input.child_workflow_id}, + ): + set_header_from_context(input, temporalio.workflow.payload_converter()) + await self.next.signal_child_workflow(input) async def signal_external_workflow( self, input: temporalio.worker.SignalExternalWorkflowInput ) -> None: - set_header_from_context(input, temporalio.workflow.payload_converter()) - return await self.next.signal_external_workflow(input) + with custom_span( + name=f"temporal:signalExternalWorkflow", + data={"workflowId": input.workflow_id}, + ): + set_header_from_context(input, temporalio.workflow.payload_converter()) + await self.next.signal_external_workflow(input) def start_activity( self, input: temporalio.worker.StartActivityInput ) -> temporalio.workflow.ActivityHandle: + span = custom_span( + name=f"temporal:startActivity", data={"activity": input.activity} + ) + span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) - return self.next.start_activity(input) + handle = self.next.start_activity(input) + handle.add_done_callback(lambda _: span.finish()) + return handle async def start_child_workflow( self, input: temporalio.worker.StartChildWorkflowInput ) -> temporalio.workflow.ChildWorkflowHandle: + span = custom_span( + name=f"temporal:startChildWorkflow", data={"workflow": input.workflow} + ) + span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) - return await self.next.start_child_workflow(input) + handle = await self.next.start_child_workflow(input) + handle.add_done_callback(lambda _: span.finish()) + return handle def start_local_activity( self, input: temporalio.worker.StartLocalActivityInput ) -> temporalio.workflow.ActivityHandle: + span = custom_span( + name=f"temporal:startLocalActivity", data={"activity": input.activity} + ) + span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) - return self.next.start_local_activity(input) + handle = self.next.start_local_activity(input) + handle.add_done_callback(lambda _: span.finish()) + return handle From 5fb122f9b64c70ec9918f6ed4a34cb3d5fdda9ff Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 17 Jul 2025 16:56:28 -0700 Subject: [PATCH 4/6] Add tracing test --- .../openai_agents/_trace_interceptor.py | 16 +- .../openai_agents/test_openai_tracing.py | 164 ++++++++++++++++++ 2 files changed, 172 insertions(+), 8 deletions(-) create mode 100644 tests/contrib/openai_agents/test_openai_tracing.py diff --git a/temporalio/contrib/openai_agents/_trace_interceptor.py b/temporalio/contrib/openai_agents/_trace_interceptor.py index 94b063dd3..b96f2f30d 100644 --- a/temporalio/contrib/openai_agents/_trace_interceptor.py +++ b/temporalio/contrib/openai_agents/_trace_interceptor.py @@ -199,7 +199,7 @@ async def start_workflow( **({"temporal:workflowId": input.id} if input.id else {}), } data = {"workflowId": input.id} if input.id else None - span_name = f"temporal:startWorkflow" + span_name = "temporal:startWorkflow" if get_trace_provider().get_current_trace() is None: with trace( span_name + ":" + input.workflow, metadata=metadata, group_id=input.id @@ -218,7 +218,7 @@ async def query_workflow(self, input: temporalio.client.QueryWorkflowInput) -> A **({"temporal:workflowId": input.id} if input.id else {}), } data = {"workflowId": input.id, "query": input.query} - span_name = f"temporal:queryWorkflow" + span_name = "temporal:queryWorkflow" if get_trace_provider().get_current_trace() is None: with trace(span_name, metadata=metadata, group_id=input.id): with custom_span(name=span_name, data=data): @@ -237,7 +237,7 @@ async def signal_workflow( **({"temporal:workflowId": input.id} if input.id else {}), } data = {"workflowId": input.id, "signal": input.signal} - span_name = f"temporal:signalWorkflow" + span_name = "temporal:signalWorkflow" if get_trace_provider().get_current_trace() is None: with trace(span_name, metadata=metadata, group_id=input.id): with custom_span(name=span_name, data=data): @@ -337,7 +337,7 @@ async def signal_child_workflow( self, input: temporalio.worker.SignalChildWorkflowInput ) -> None: with custom_span( - name=f"temporal:signalChildWorkflow", + name="temporal:signalChildWorkflow", data={"workflowId": input.child_workflow_id}, ): set_header_from_context(input, temporalio.workflow.payload_converter()) @@ -347,7 +347,7 @@ async def signal_external_workflow( self, input: temporalio.worker.SignalExternalWorkflowInput ) -> None: with custom_span( - name=f"temporal:signalExternalWorkflow", + name="temporal:signalExternalWorkflow", data={"workflowId": input.workflow_id}, ): set_header_from_context(input, temporalio.workflow.payload_converter()) @@ -357,7 +357,7 @@ def start_activity( self, input: temporalio.worker.StartActivityInput ) -> temporalio.workflow.ActivityHandle: span = custom_span( - name=f"temporal:startActivity", data={"activity": input.activity} + name="temporal:startActivity", data={"activity": input.activity} ) span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) @@ -369,7 +369,7 @@ async def start_child_workflow( self, input: temporalio.worker.StartChildWorkflowInput ) -> temporalio.workflow.ChildWorkflowHandle: span = custom_span( - name=f"temporal:startChildWorkflow", data={"workflow": input.workflow} + name="temporal:startChildWorkflow", data={"workflow": input.workflow} ) span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) @@ -381,7 +381,7 @@ def start_local_activity( self, input: temporalio.worker.StartLocalActivityInput ) -> temporalio.workflow.ActivityHandle: span = custom_span( - name=f"temporal:startLocalActivity", data={"activity": input.activity} + name="temporal:startLocalActivity", data={"activity": input.activity} ) span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) diff --git a/tests/contrib/openai_agents/test_openai_tracing.py b/tests/contrib/openai_agents/test_openai_tracing.py new file mode 100644 index 000000000..f28e3c2b8 --- /dev/null +++ b/tests/contrib/openai_agents/test_openai_tracing.py @@ -0,0 +1,164 @@ +import datetime +import uuid +from datetime import timedelta +from typing import Any, Optional, cast + +from agents import Span, Trace, TracingProcessor +from agents.tracing import get_trace_provider + +from temporalio.client import Client +from temporalio.contrib.openai_agents import ( + ModelActivity, + OpenAIAgentsTracingInterceptor, + TestModelProvider, + set_open_ai_agent_temporal_overrides, +) +from temporalio.contrib.openai_agents._temporal_trace_provider import ( + TemporalTraceProvider, +) +from temporalio.contrib.pydantic import pydantic_data_converter +from tests.contrib.openai_agents.test_openai import ResearchWorkflow, TestResearchModel +from tests.helpers import new_worker + + +class MemoryTracingProcessor(TracingProcessor): + # True for start events, false for end + trace_events: list[tuple[Trace, bool]] = [] + span_events: list[tuple[Span, bool]] = [] + + def on_trace_start(self, trace: Trace) -> None: + self.trace_events.append((trace, True)) + + def on_trace_end(self, trace: Trace) -> None: + self.trace_events.append((trace, False)) + + def on_span_start(self, span: Span[Any]) -> None: + self.span_events.append((span, True)) + + def on_span_end(self, span: Span[Any]) -> None: + self.span_events.append((span, False)) + + def shutdown(self) -> None: + pass + + def force_flush(self) -> None: + pass + + +async def test_tracing(client: Client): + new_config = client.config() + new_config["data_converter"] = pydantic_data_converter + client = Client(**new_config) + + with set_open_ai_agent_temporal_overrides(): + provider = cast(TemporalTraceProvider, get_trace_provider()) + + processor = MemoryTracingProcessor() + provider.set_processors([processor]) + + model_activity = ModelActivity(TestModelProvider(TestResearchModel())) + async with new_worker( + client, + ResearchWorkflow, + activities=[model_activity.invoke_model_activity], + interceptors=[OpenAIAgentsTracingInterceptor()], + ) as worker: + workflow_handle = await client.start_workflow( + ResearchWorkflow.run, + "Caribbean vacation spots in April, optimizing for surfing, hiking and water sports", + id=f"research-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + result = await workflow_handle.result() + + # There is one closed root trace + assert len(processor.trace_events) == 2 + assert ( + processor.trace_events[0][0].trace_id + == processor.trace_events[1][0].trace_id + ) + assert processor.trace_events[0][1] + assert not processor.trace_events[1][1] + + def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None: + assert a[0].trace_id == b[0].trace_id + assert a[1] + assert not b[1] + + # Initial planner spans - There are only 3 because we don't make an actual model call + paired_span(processor.span_events[0], processor.span_events[5]) + assert ( + processor.span_events[0][0].span_data.export().get("name") == "PlannerAgent" + ) + + paired_span(processor.span_events[1], processor.span_events[4]) + assert ( + processor.span_events[1][0].span_data.export().get("name") + == "temporal:startActivity" + ) + + paired_span(processor.span_events[2], processor.span_events[3]) + assert ( + processor.span_events[2][0].span_data.export().get("name") + == "temporal:executeActivity" + ) + + for span, start in processor.span_events[6:-6]: + span_data = span.span_data.export() + + # All spans should be closed + if start: + assert any( + span.span_id == s.span_id and not s_start + for (s, s_start) in processor.span_events + ) + + def to_time(time: Optional[str]) -> datetime.datetime: + assert time is not None + return datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%S.%f%z") + + # Start activity is always parented to an agent + if span_data.get("name") == "temporal:startActivity": + parents = [ + s for (s, _) in processor.span_events if s.span_id == span.parent_id + ] + assert ( + len(parents) == 2 + and parents[0].span_data.export()["type"] == "agent" + ) + + assert to_time(span.started_at) >= to_time(parents[0].started_at) + assert to_time(span.started_at) <= to_time(parents[1].ended_at) + + # Execute is parented to start + if span_data.get("name") == "temporal:executeActivity": + parents = [ + s for (s, _) in processor.span_events if s.span_id == span.parent_id + ] + assert ( + len(parents) == 2 + and parents[0].span_data.export()["name"] + == "temporal:startActivity" + ) + + assert to_time(span.started_at) >= to_time(parents[0].started_at) + assert to_time(span.started_at) <= to_time(parents[1].ended_at) + + # Final writer spans - There are only 3 because we don't make an actual model call + paired_span(processor.span_events[-6], processor.span_events[-1]) + assert ( + processor.span_events[-6][0].span_data.export().get("name") == "WriterAgent" + ) + + paired_span(processor.span_events[-5], processor.span_events[-2]) + assert ( + processor.span_events[-5][0].span_data.export().get("name") + == "temporal:startActivity" + ) + + paired_span(processor.span_events[-4], processor.span_events[-3]) + assert ( + processor.span_events[-4][0].span_data.export().get("name") + == "temporal:executeActivity" + ) From c93069880a9b48a8b1dbcd82f0401dee8dd03060 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 17 Jul 2025 16:58:09 -0700 Subject: [PATCH 5/6] Remove cast --- tests/contrib/openai_agents/test_openai_tracing.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/contrib/openai_agents/test_openai_tracing.py b/tests/contrib/openai_agents/test_openai_tracing.py index f28e3c2b8..86ae9a93c 100644 --- a/tests/contrib/openai_agents/test_openai_tracing.py +++ b/tests/contrib/openai_agents/test_openai_tracing.py @@ -1,7 +1,7 @@ import datetime import uuid from datetime import timedelta -from typing import Any, Optional, cast +from typing import Any, Optional from agents import Span, Trace, TracingProcessor from agents.tracing import get_trace_provider @@ -13,9 +13,6 @@ TestModelProvider, set_open_ai_agent_temporal_overrides, ) -from temporalio.contrib.openai_agents._temporal_trace_provider import ( - TemporalTraceProvider, -) from temporalio.contrib.pydantic import pydantic_data_converter from tests.contrib.openai_agents.test_openai import ResearchWorkflow, TestResearchModel from tests.helpers import new_worker @@ -51,7 +48,7 @@ async def test_tracing(client: Client): client = Client(**new_config) with set_open_ai_agent_temporal_overrides(): - provider = cast(TemporalTraceProvider, get_trace_provider()) + provider = get_trace_provider() processor = MemoryTracingProcessor() provider.set_processors([processor]) From d2c8784ce4ba415bb07299c3c30e941dc41c4931 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Fri, 18 Jul 2025 08:48:17 -0700 Subject: [PATCH 6/6] Remove flaky test check --- tests/contrib/openai_agents/test_openai_tracing.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/contrib/openai_agents/test_openai_tracing.py b/tests/contrib/openai_agents/test_openai_tracing.py index 86ae9a93c..5a7d03785 100644 --- a/tests/contrib/openai_agents/test_openai_tracing.py +++ b/tests/contrib/openai_agents/test_openai_tracing.py @@ -111,10 +111,6 @@ def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None: for (s, s_start) in processor.span_events ) - def to_time(time: Optional[str]) -> datetime.datetime: - assert time is not None - return datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%S.%f%z") - # Start activity is always parented to an agent if span_data.get("name") == "temporal:startActivity": parents = [ @@ -125,9 +121,6 @@ def to_time(time: Optional[str]) -> datetime.datetime: and parents[0].span_data.export()["type"] == "agent" ) - assert to_time(span.started_at) >= to_time(parents[0].started_at) - assert to_time(span.started_at) <= to_time(parents[1].ended_at) - # Execute is parented to start if span_data.get("name") == "temporal:executeActivity": parents = [ @@ -139,9 +132,6 @@ def to_time(time: Optional[str]) -> datetime.datetime: == "temporal:startActivity" ) - assert to_time(span.started_at) >= to_time(parents[0].started_at) - assert to_time(span.started_at) <= to_time(parents[1].ended_at) - # Final writer spans - There are only 3 because we don't make an actual model call paired_span(processor.span_events[-6], processor.span_events[-1]) assert (