Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 84 additions & 11 deletions src/agents/tracing/processor_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,125 @@


class TracingProcessor(abc.ABC):
"""Interface for processing spans."""
"""Interface for processing and monitoring traces and spans in the OpenAI Agents system.

This abstract class defines the interface that all tracing processors must implement.
Processors receive notifications when traces and spans start and end, allowing them
to collect, process, and export tracing data.

Example:
```python
class CustomProcessor(TracingProcessor):
def __init__(self):
self.active_traces = {}
self.active_spans = {}

def on_trace_start(self, trace):
self.active_traces[trace.trace_id] = trace

def on_trace_end(self, trace):
# Process completed trace
del self.active_traces[trace.trace_id]

def on_span_start(self, span):
self.active_spans[span.span_id] = span

def on_span_end(self, span):
# Process completed span
del self.active_spans[span.span_id]

def shutdown(self):
# Clean up resources
self.active_traces.clear()
self.active_spans.clear()

def force_flush(self):
# Force processing of any queued items
pass
```

Notes:
- All methods should be thread-safe
- Methods should not block for long periods
- Handle errors gracefully to prevent disrupting agent execution
"""

@abc.abstractmethod
def on_trace_start(self, trace: "Trace") -> None:
"""Called when a trace is started.
"""Called when a new trace begins execution.

Args:
trace: The trace that started.
trace: The trace that started. Contains workflow name and metadata.

Notes:
- Called synchronously on trace start
- Should return quickly to avoid blocking execution
- Any errors should be caught and handled internally
"""
pass

@abc.abstractmethod
def on_trace_end(self, trace: "Trace") -> None:
"""Called when a trace is finished.
"""Called when a trace completes execution.

Args:
trace: The trace that finished.
trace: The completed trace containing all spans and results.

Notes:
- Called synchronously when trace finishes
- Good time to export/process the complete trace
- Should handle cleanup of any trace-specific resources
"""
pass

@abc.abstractmethod
def on_span_start(self, span: "Span[Any]") -> None:
"""Called when a span is started.
"""Called when a new span begins execution.

Args:
span: The span that started.
span: The span that started. Contains operation details and context.

Notes:
- Called synchronously on span start
- Should return quickly to avoid blocking execution
- Spans are automatically nested under current trace/span
"""
pass

@abc.abstractmethod
def on_span_end(self, span: "Span[Any]") -> None:
"""Called when a span is finished. Should not block or raise exceptions.
"""Called when a span completes execution.

Args:
span: The span that finished.
span: The completed span containing execution results.

Notes:
- Called synchronously when span finishes
- Should not block or raise exceptions
- Good time to export/process the individual span
"""
pass

@abc.abstractmethod
def shutdown(self) -> None:
"""Called when the application stops."""
"""Called when the application stops to clean up resources.

Should perform any necessary cleanup like:
- Flushing queued traces/spans
- Closing connections
- Releasing resources
"""
pass

@abc.abstractmethod
def force_flush(self) -> None:
"""Forces an immediate flush of all queued spans/traces."""
"""Forces immediate processing of any queued traces/spans.

Notes:
- Should process all queued items before returning
- Useful before shutdown or when immediate processing is needed
- May block while processing completes
"""
pass


Expand Down
87 changes: 87 additions & 0 deletions src/agents/tracing/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,83 @@


class SpanError(TypedDict):
"""Represents an error that occurred during span execution.

Attributes:
message: A human-readable error description
data: Optional dictionary containing additional error context
"""
message: str
data: dict[str, Any] | None


class Span(abc.ABC, Generic[TSpanData]):
"""Base class for representing traceable operations with timing and context.

A span represents a single operation within a trace (e.g., an LLM call, tool execution,
or agent run). Spans track timing, relationships between operations, and operation-specific
data.

Type Args:
TSpanData: The type of span-specific data this span contains.

Example:
```python
# Creating a custom span
with custom_span("database_query", {
"operation": "SELECT",
"table": "users"
}) as span:
results = await db.query("SELECT * FROM users")
span.set_output({"count": len(results)})

# Handling errors in spans
with custom_span("risky_operation") as span:
try:
result = perform_risky_operation()
except Exception as e:
span.set_error({
"message": str(e),
"data": {"operation": "risky_operation"}
})
raise
```

Notes:
- Spans automatically nest under the current trace
- Use context managers for reliable start/finish
- Include relevant data but avoid sensitive information
- Handle errors properly using set_error()
"""

@property
@abc.abstractmethod
def trace_id(self) -> str:
"""The ID of the trace this span belongs to.

Returns:
str: Unique identifier of the parent trace.
"""
pass

@property
@abc.abstractmethod
def span_id(self) -> str:
"""Unique identifier for this span.

Returns:
str: The span's unique ID within its trace.
"""
pass

@property
@abc.abstractmethod
def span_data(self) -> TSpanData:
"""Operation-specific data for this span.

Returns:
TSpanData: Data specific to this type of span (e.g., LLM generation data).
"""
pass

@abc.abstractmethod
Expand Down Expand Up @@ -67,6 +126,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
@property
@abc.abstractmethod
def parent_id(self) -> str | None:
"""ID of the parent span, if any.

Returns:
str | None: The parent span's ID, or None if this is a root span.
"""
pass

@abc.abstractmethod
Expand All @@ -76,6 +140,11 @@ def set_error(self, error: SpanError) -> None:
@property
@abc.abstractmethod
def error(self) -> SpanError | None:
"""Any error that occurred during span execution.

Returns:
SpanError | None: Error details if an error occurred, None otherwise.
"""
pass

@abc.abstractmethod
Expand All @@ -85,15 +154,33 @@ def export(self) -> dict[str, Any] | None:
@property
@abc.abstractmethod
def started_at(self) -> str | None:
"""When the span started execution.

Returns:
str | None: ISO format timestamp of span start, None if not started.
"""
pass

@property
@abc.abstractmethod
def ended_at(self) -> str | None:
"""When the span finished execution.

Returns:
str | None: ISO format timestamp of span end, None if not finished.
"""
pass


class NoOpSpan(Span[TSpanData]):
"""A no-op implementation of Span that doesn't record any data.

Used when tracing is disabled but span operations still need to work.

Args:
span_data: The operation-specific data for this span.
"""

__slots__ = ("_span_data", "_prev_span_token")

def __init__(self, span_data: TSpanData):
Expand Down
Loading