Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f5c1458
fix session decorator resource management
dot-agi May 28, 2025
5fa1cb8
fix api consistency issues in `decorators/__init__.py`
dot-agi May 28, 2025
9d46fba
deprecation warning when using session decorator
dot-agi May 28, 2025
811bffa
make `int` as `Required`
dot-agi May 28, 2025
3b9585d
use method to format trace id
dot-agi May 28, 2025
9f4d307
improved error handling in span processor flushing
dot-agi May 28, 2025
4148fe5
prevent resource leak in `LiveSpanProcessor`
dot-agi May 28, 2025
339b3d1
better method to create span
dot-agi May 28, 2025
bf850af
Revert "fix api consistency issues in `decorators/__init__.py`"
dot-agi May 28, 2025
c9a0532
Revert "better method to create span"
dot-agi May 28, 2025
d7f20df
add some tests
dot-agi May 28, 2025
d4fd257
remove `Annotated`
dot-agi May 28, 2025
f2dcc41
bad GitHub copilot
dot-agi May 28, 2025
3cc4bf8
add compatibility for both Python <3.11 and >=3.11
dot-agi May 28, 2025
9881aa5
Merge branch 'main' into fix/stable-sdk-fixes
Dwij1704 May 29, 2025
de37a13
use `logger` instead of `warnings`
dot-agi May 29, 2025
4829179
purge `LiveSpanProcessor` from codebase
dot-agi May 29, 2025
c184be2
add `dev-llm` to make @Dwij1704 happy :)
dot-agi May 29, 2025
9ecfeb9
Revert "add `dev-llm` to make @Dwij1704 happy :)"
dot-agi May 29, 2025
96629d9
Merge branch 'main' into fix/stable-sdk-fixes
dot-agi Jun 2, 2025
58bbb7a
Merge branch 'main' into fix/stable-sdk-fixes
dot-agi Jun 2, 2025
21bf90d
import `functools`
dot-agi Jun 2, 2025
7d90392
Merge branch 'main' into fix/stable-sdk-fixes
dot-agi Jun 2, 2025
dc35730
@bboynton97 use the new tracer variable pls #1030
dot-agi Jun 4, 2025
c7cd7e3
Merge branch 'main' into fix/stable-sdk-fixes
dot-agi Jun 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions agentops/sdk/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,20 @@ def camel_to_snake(text: str) -> str:

text = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", text).lower()


def format_trace_id(trace_id: int) -> str:
"""
Format trace ID consistently as hex string with error handling.

Args:
trace_id: The trace ID integer to format

Returns:
Formatted trace ID as hex string
"""
try:
return f"{trace_id:x}"
except (TypeError, ValueError):
# Handle case where trace_id is not a valid integer
return str(trace_id)
33 changes: 18 additions & 15 deletions agentops/sdk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from agentops.sdk.types import TracingConfig
from agentops.semconv import ResourceAttributes, SpanKind, SpanAttributes, CoreAttributes
from agentops.helpers.dashboard import log_trace_url
from agentops.sdk.converters import format_trace_id

# No need to create shortcuts since we're using our own ResourceAttributes class now

Expand Down Expand Up @@ -330,16 +331,26 @@ def shutdown(self) -> None:
logger.debug("Tracing core shut down")

def _flush_span_processors(self) -> None:
"""Helper to force flush all span processors."""
if not self._provider or not hasattr(self._provider, "force_flush"):
logger.debug("No provider or provider cannot force_flush.")
"""Helper to force flush all span processors with comprehensive error handling."""
if not self._provider:
logger.debug("No provider available for force_flush.")
return

if not hasattr(self._provider, "force_flush"):
logger.debug("Provider does not support force_flush.")
return

try:
logger.debug("Attempting to force flush span processors...")
self._provider.force_flush() # type: ignore
logger.debug("Provider force_flush completed.")
logger.debug("Provider force_flush completed successfully.")
except AttributeError as e:
logger.warning(f"Provider force_flush method not available: {e}")
except RuntimeError as e:
logger.warning(f"Runtime error during force_flush (provider may be shutting down): {e}")
except Exception as e:
logger.warning(f"Failed to force flush provider's span processors: {e}", exc_info=True)
logger.error(f"Unexpected error during force_flush: {e}", exc_info=True)
# Continue execution - don't let flush failures break the application

def get_tracer(self, name: str = "agentops") -> trace.Tracer:
"""
Expand Down Expand Up @@ -444,11 +455,7 @@ def start_trace(

# Track the active trace
with self._traces_lock:
try:
trace_id = f"{span.get_span_context().trace_id:x}"
except (TypeError, ValueError):
# Handle case where span is mocked or trace_id is not a valid integer
trace_id = str(span.get_span_context().trace_id)
trace_id = format_trace_id(span.get_span_context().trace_id)
self._active_traces[trace_id] = trace_context
logger.debug(f"Added trace {trace_id} to active traces. Total active: {len(self._active_traces)}")

Expand Down Expand Up @@ -496,11 +503,7 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None

span = trace_context.span
token = trace_context.token
try:
trace_id = f"{span.get_span_context().trace_id:x}"
except (TypeError, ValueError):
# Handle case where span is mocked or trace_id is not a valid integer
trace_id = str(span.get_span_context().trace_id)
trace_id = format_trace_id(span.get_span_context().trace_id)

logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}")

Expand Down
15 changes: 10 additions & 5 deletions agentops/sdk/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Provides @trace for creating trace-level spans (sessions) and other decorators for nested spans.
"""

import functools
from termcolor import colored

from agentops.logging import logger
Expand All @@ -16,15 +15,21 @@
operation_decorator = create_entity_decorator(SpanKind.OPERATION)
workflow = create_entity_decorator(SpanKind.WORKFLOW)
trace = create_entity_decorator(SpanKind.SESSION)
session = create_entity_decorator(SpanKind.SESSION)
tool = create_entity_decorator(SpanKind.TOOL)
operation = task

# For backward compatibility: @session decorator calls @trace decorator
@functools.wraps(trace)

# For backward compatibility: @session decorator calls @trace decorator with deprecation warning
def session(*args, **kwargs):
"""@deprecated Use @agentops.trace instead. Wraps the @trace decorator for backward compatibility."""
import warnings

warnings.warn(
"@agentops.session decorator is deprecated. Please use @agentops.trace instead.",
DeprecationWarning,
stacklevel=2,
)
logger.info(colored("@agentops.session decorator is deprecated. Please use @agentops.trace instead.", "yellow"))

# If called as @session or @session(...)
if not args or not callable(args[0]): # called with kwargs like @session(name=...)
return trace(*args, **kwargs)
Expand Down
172 changes: 110 additions & 62 deletions agentops/sdk/decorators/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,114 @@
)


def _handle_session_trace_sync(
operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any]
) -> Any:
"""Helper function to handle SESSION trace lifecycle for sync functions with proper cleanup"""
trace_context: Optional[TraceContext] = None
trace_ended = False

try:
# Start trace
trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags)
if not trace_context:
logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.")
return wrapped_func(*args, **kwargs)

Check warning on line 36 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L35-L36

Added lines #L35 - L36 were not covered by tests

# Record input
try:
_record_entity_input(trace_context.span, args, kwargs)
except Exception as e:
logger.warning(f"Input recording failed for @trace '{operation_name}': {e}")

Check warning on line 42 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L41-L42

Added lines #L41 - L42 were not covered by tests

# Execute function
result = wrapped_func(*args, **kwargs)

# Record output
try:
_record_entity_output(trace_context.span, result)
except Exception as e:
logger.warning(f"Output recording failed for @trace '{operation_name}': {e}")

Check warning on line 51 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L50-L51

Added lines #L50 - L51 were not covered by tests

# End trace successfully
TracingCore.get_instance().end_trace(trace_context, "Success")
trace_ended = True
return result

except Exception:
# End trace with failure if not already ended
if trace_context and not trace_ended:
try:
TracingCore.get_instance().end_trace(trace_context, "Failure")
trace_ended = True
except Exception as cleanup_error:
logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}")

Check warning on line 65 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L64-L65

Added lines #L64 - L65 were not covered by tests
raise

finally:
# Safety net - only end if not already ended and still recording
if trace_context and not trace_ended and trace_context.span.is_recording():
try:
TracingCore.get_instance().end_trace(trace_context, "Unknown")
logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.")
except Exception as cleanup_error:
logger.error(f"Failed to end trace in finally block: {cleanup_error}")

Check warning on line 75 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L71-L75

Added lines #L71 - L75 were not covered by tests


async def _handle_session_trace_async(
operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any]
) -> Any:
"""Helper function to handle SESSION trace lifecycle for async functions with proper cleanup"""
trace_context: Optional[TraceContext] = None
trace_ended = False

try:
# Start trace
trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags)
if not trace_context:
logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.")
return await wrapped_func(*args, **kwargs)

Check warning on line 90 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L89-L90

Added lines #L89 - L90 were not covered by tests

# Record input
try:
_record_entity_input(trace_context.span, args, kwargs)
except Exception as e:
logger.warning(f"Input recording failed for @trace '{operation_name}': {e}")

Check warning on line 96 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L95-L96

Added lines #L95 - L96 were not covered by tests

# Execute function
result = await wrapped_func(*args, **kwargs)

# Record output
try:
_record_entity_output(trace_context.span, result)
except Exception as e:
logger.warning(f"Output recording failed for @trace '{operation_name}': {e}")

Check warning on line 105 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L104-L105

Added lines #L104 - L105 were not covered by tests

# End trace successfully
TracingCore.get_instance().end_trace(trace_context, "Success")
trace_ended = True
return result

except Exception:

Check warning on line 112 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L112

Added line #L112 was not covered by tests
# End trace with failure if not already ended
if trace_context and not trace_ended:
try:
TracingCore.get_instance().end_trace(trace_context, "Failure")
trace_ended = True
except Exception as cleanup_error:
logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}")
raise

Check warning on line 120 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L114-L120

Added lines #L114 - L120 were not covered by tests

finally:
# Safety net - only end if not already ended and still recording
if trace_context and not trace_ended and trace_context.span.is_recording():
try:
TracingCore.get_instance().end_trace(trace_context, "Unknown")
logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.")
except Exception as cleanup_error:
logger.error(f"Failed to end trace in finally block: {cleanup_error}")

Check warning on line 129 in agentops/sdk/decorators/factory.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/factory.py#L125-L129

Added lines #L125 - L129 were not covered by tests


def create_entity_decorator(entity_kind: str) -> Callable[..., Any]:
"""
Factory that creates decorators for instrumenting functions and classes.
Expand Down Expand Up @@ -96,69 +204,9 @@
)
# Fallthrough to existing generator logic which creates a single span.
elif is_async:

async def _wrapped_session_async() -> Any:
trace_context: Optional[TraceContext] = None
try:
trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags)
if not trace_context:
logger.error(
f"Failed to start trace for @trace '{operation_name}'. Executing without trace."
)
return await wrapped_func(*args, **kwargs)
try:
_record_entity_input(trace_context.span, args, kwargs)
except Exception as e:
logger.warning(f"Input recording failed for @trace '{operation_name}': {e}")
result = await wrapped_func(*args, **kwargs)
try:
_record_entity_output(trace_context.span, result)
except Exception as e:
logger.warning(f"Output recording failed for @trace '{operation_name}': {e}")
TracingCore.get_instance().end_trace(trace_context, "Success")
return result
except Exception:
if trace_context:
TracingCore.get_instance().end_trace(trace_context, "Failure")
raise
finally:
if trace_context and trace_context.span.is_recording():
logger.warning(
f"Trace for @trace '{operation_name}' not explicitly ended. Ending as 'Unknown'."
)
TracingCore.get_instance().end_trace(trace_context, "Unknown")

return _wrapped_session_async()
return _handle_session_trace_async(operation_name, tags, wrapped_func, args, kwargs)
else: # Sync function for SpanKind.SESSION
trace_context: Optional[TraceContext] = None
try:
trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags)
if not trace_context:
logger.error(
f"Failed to start trace for @trace '{operation_name}'. Executing without trace."
)
return wrapped_func(*args, **kwargs)
try:
_record_entity_input(trace_context.span, args, kwargs)
except Exception as e:
logger.warning(f"Input recording failed for @trace '{operation_name}': {e}")
result = wrapped_func(*args, **kwargs)
try:
_record_entity_output(trace_context.span, result)
except Exception as e:
logger.warning(f"Output recording failed for @trace '{operation_name}': {e}")
TracingCore.get_instance().end_trace(trace_context, "Success")
return result
except Exception:
if trace_context:
TracingCore.get_instance().end_trace(trace_context, "Failure")
raise
finally:
if trace_context and trace_context.span.is_recording():
logger.warning(
f"Trace for @trace '{operation_name}' not explicitly ended. Ending as 'Unknown'."
)
TracingCore.get_instance().end_trace(trace_context, "Unknown")
return _handle_session_trace_sync(operation_name, tags, wrapped_func, args, kwargs)

# Logic for non-SESSION kinds or generators under @trace (as per fallthrough)
elif is_generator:
Expand Down
24 changes: 21 additions & 3 deletions agentops/sdk/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,27 @@ def on_end(self, span: ReadableSpan) -> None:
self.span_exporter.export((span,))

def shutdown(self) -> None:
self._stop_event.set()
self._export_thread.join()
self.span_exporter.shutdown()
"""Shutdown the processor with proper thread lifecycle management."""
try:
# Signal the export thread to stop
self._stop_event.set()

# Wait for the thread to finish with a timeout to prevent hanging
if self._export_thread.is_alive():
self._export_thread.join(timeout=5.0)

# If thread is still alive after timeout, log a warning
if self._export_thread.is_alive():
logger.warning("Export thread did not shut down within timeout, continuing shutdown")

except Exception as e:
logger.error(f"Error during thread shutdown: {e}")

# Always attempt to shutdown the exporter
try:
self.span_exporter.shutdown()
except Exception as e:
logger.error(f"Error shutting down span exporter: {e}")

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True
Expand Down
8 changes: 4 additions & 4 deletions agentops/sdk/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Annotated, Optional, TypedDict
from typing import Annotated, Optional, Required, TypedDict

from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
Expand All @@ -16,6 +16,6 @@ class TracingConfig(TypedDict, total=False):
metrics_endpoint: Optional[str]
api_key: Optional[str] # API key for authentication with AgentOps services
project_id: Optional[str] # Project ID to include in resource attributes
max_queue_size: int # Required with a default value
max_wait_time: int # Required with a default value
export_flush_interval: int # Time interval between automatic exports
max_queue_size: Required[int] # Required with a default value
max_wait_time: Required[int] # Required with a default value
export_flush_interval: Required[int] # Time interval between automatic exports
Loading
Loading