-
Notifications
You must be signed in to change notification settings - Fork 137
Open
Labels
enhancementNew feature or requestNew feature or request
Description
I'd like to add a custom tracing provider for the OpenAI Agents SDK. Importantly, this tracing provider is supported by the OpenAI Agents SDK itself.
The solution I'd like
I'd like to be able to pass the tracing provider to the OpenAI Agents Plugin when initializing the client e.g.
from temporalio.client import Client
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
from custom_tracing_provider import CustomTraceProvider
plugin = OpenAIAgentsPlugin(
trace_provider=CustomTraceProvider()
)
client_config = {
"namespace": NAMESPACE,
"plugins": [plugin],
"API_KEY": os.environ.get("TEMPORAL_API_KEY")
}
_client_instance = await Client.connect(ADDRESS, **client_config)Given the temporal tracing provider today is implemented on top of the DefaultTracingProvider class provided by OpenAI as shown below, perhaps it wouldn't be too hard to just create a factory that takes any supported tracing provider and returns a Temporal supported one.
class TemporalTraceProvider(DefaultTraceProvider):
"""A trace provider that integrates with Temporal workflows."""
def __init__(self, auto_close_in_workflows: bool = False):
"""Initialize the TemporalTraceProvider."""
super().__init__()
self._original_provider = cast(DefaultTraceProvider, get_trace_provider())
self._multi_processor = _TemporalTracingProcessor(
self._original_provider._multi_processor,
auto_close_in_workflows,
)
def time_iso(self) -> str:
"""Return the current deterministic time in ISO 8601 format."""
if workflow.in_workflow():
return workflow.now().isoformat()
return super().time_iso()
def gen_trace_id(self) -> str:
"""Generate a new trace ID."""
if workflow.in_workflow():
try:
"""Generate a new trace ID."""
return f"trace_{_workflow_uuid()}"
except ReadOnlyContextError:
return f"trace_{uuid.uuid4().hex}"
return super().gen_trace_id()
def gen_span_id(self) -> str:
"""Generate a span ID."""
if workflow.in_workflow():
try:
"""Generate a deterministic span ID."""
return f"span_{_workflow_uuid()}"
except ReadOnlyContextError:
return f"span_{uuid.uuid4().hex[:24]}"
return super().gen_span_id()
def gen_group_id(self) -> str:
"""Generate a group ID."""
if workflow.in_workflow():
try:
"""Generate a deterministic group ID."""
return f"group_{_workflow_uuid()}"
except ReadOnlyContextError:
return f"group_{uuid.uuid4().hex[:24]}"
return super().gen_group_id()
def __enter__(self):
"""Enter the context of the Temporal trace provider."""
return self
def __exit__(
self,
exc_type: type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
):
"""Exit the context of the Temporal trace provider."""
self._multi_processor.shutdown()Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request