diff --git a/README.md b/README.md index af2832d30..927da0b99 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ pip install agentops ``` -#### Session replays in 2 lines of code +#### Trace replays in 2 lines of code Initialize the AgentOps client and automatically get analytics on all your LLM calls. @@ -97,10 +97,10 @@ agentops.init( < INSERT YOUR API KEY HERE >) ... # End of program -agentops.end_session('Success') +agentops.end_trace('Success') ``` -All your sessions can be viewed on the [AgentOps dashboard](https://app.agentops.ai?ref=gh) +All your traces can be viewed on the [AgentOps dashboard](https://app.agentops.ai?ref=gh)
@@ -117,9 +117,9 @@ All your sessions can be viewed on the [AgentOps dashboard](https://app.agentops
- Session Replays + Trace Replays - Session Replays + Trace Replays
@@ -140,12 +140,12 @@ Add powerful observability to your agents, tools, and functions with as little c Refer to our [documentation](http://docs.agentops.ai) ```python -# Create a session span (root for all other spans) -from agentops.sdk.decorators import session +# Create a trace span (root for all other spans) +from agentops.sdk.decorators import trace -@session +@trace def my_workflow(): - # Your session code here + # Your trace code here return result ``` @@ -183,7 +183,7 @@ def my_workflow(data): ```python # Nest decorators for proper span hierarchy -from agentops.sdk.decorators import session, agent, operation +from agentops.sdk.decorators import trace, agent, operation @agent class MyAgent: @@ -196,8 +196,8 @@ class MyAgent: result = self.nested_operation("test message") return result -@session -def my_session(): +@trace +def my_trace(): agent = MyAgent() return agent.main_operation() ``` diff --git a/agentops/__init__.py b/agentops/__init__.py index 25e28dcf3..45a8cbe2e 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -7,21 +7,22 @@ # Client global instance; one per process runtime _client = Client() + def record(event): """ Legacy function to record an event. This is kept for backward compatibility. - + In the current version, this simply sets the end_timestamp on the event. - + Args: event: The event to record """ from agentops.helpers.time import get_ISO_time - + # TODO: Manual timestamp assignment is a temporary fix; should use proper event lifecycle - if event and hasattr(event, 'end_timestamp'): + if event and hasattr(event, "end_timestamp"): event.end_timestamp = get_ISO_time() - + return event @@ -141,6 +142,7 @@ def configure(**kwargs): _client.configure(**kwargs) + # For backwards compatibility and testing @@ -149,8 +151,7 @@ def get_client() -> Client: return _client - -from agentops.legacy import * # type: ignore +from agentops.legacy import * # type: ignore __all__ = [ "init", diff --git a/agentops/client/api/base.py b/agentops/client/api/base.py index c7654154e..92a162293 100644 --- a/agentops/client/api/base.py +++ b/agentops/client/api/base.py @@ -15,8 +15,7 @@ class TokenFetcher(Protocol): """Protocol for token fetching functions""" - def __call__(self, api_key: str) -> str: - ... + def __call__(self, api_key: str) -> str: ... class BaseApiClient: diff --git a/agentops/client/client.py b/agentops/client/client.py index bce0423ff..1113737ec 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -2,7 +2,7 @@ from agentops.client.api import ApiClient from agentops.config import Config -from agentops.exceptions import AgentOpsClientNotInitializedException, NoApiKeyException, NoSessionException +from agentops.exceptions import AgentOpsClientNotInitializedException, NoApiKeyException, NoTraceException from agentops.instrumentation import instrument_all from agentops.logging import logger from agentops.logging.config import configure_logging, intercept_opentelemetry_logging @@ -58,18 +58,18 @@ def init(self, **kwargs): self.initialized = True - # Start a session if auto_start_session is True - session = None - if self.config.auto_start_session: - from agentops.legacy import start_session + # Start a trace if auto_start_trace is True + trace = None + if self.config.auto_start_trace: + from agentops.legacy import start_trace # Pass default_tags if they exist if self.config.default_tags: - session = start_session(tags=list(self.config.default_tags)) + trace = start_trace(tags=list(self.config.default_tags)) else: - session = start_session() - - return session + trace = start_trace() + + return trace def configure(self, **kwargs): """Update client configuration""" diff --git a/agentops/config.py b/agentops/config.py index 8ee08db22..72202fa60 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -24,9 +24,9 @@ class ConfigDict(TypedDict): max_queue_size: Optional[int] default_tags: Optional[List[str]] instrument_llm_calls: Optional[bool] - auto_start_session: Optional[bool] + auto_start_trace: Optional[bool] auto_init: Optional[bool] - skip_auto_end_session: Optional[bool] + skip_auto_end_trace: Optional[bool] env_data_opt_out: Optional[bool] log_level: Optional[Union[str, int]] fail_safe: Optional[bool] @@ -49,7 +49,7 @@ class Config: default_factory=lambda: get_env_int("AGENTOPS_MAX_WAIT_TIME", 5000), metadata={"description": "Maximum time in milliseconds to wait for API responses"}, ) - + export_flush_interval: int = field( default_factory=lambda: get_env_int("AGENTOPS_EXPORT_FLUSH_INTERVAL", 1000), metadata={"description": "Time interval in milliseconds between automatic exports of telemetry data"}, @@ -70,9 +70,9 @@ class Config: metadata={"description": "Whether to automatically instrument and track LLM API calls"}, ) - auto_start_session: bool = field( - default_factory=lambda: get_env_bool("AGENTOPS_AUTO_START_SESSION", True), - metadata={"description": "Whether to automatically start a session when initializing"}, + auto_start_trace: bool = field( + default_factory=lambda: get_env_bool("AGENTOPS_AUTO_START_TRACE", True), + metadata={"description": "Whether to automatically start a trace when initializing"}, ) auto_init: bool = field( @@ -80,9 +80,9 @@ class Config: metadata={"description": "Whether to automatically initialize the client on import"}, ) - skip_auto_end_session: bool = field( - default_factory=lambda: get_env_bool("AGENTOPS_SKIP_AUTO_END_SESSION", False), - metadata={"description": "Whether to skip automatically ending sessions on program exit"}, + skip_auto_end_trace: bool = field( + default_factory=lambda: get_env_bool("AGENTOPS_SKIP_AUTO_END_TRACE", False), + metadata={"description": "Whether to skip automatically ending traces on program exit"}, ) env_data_opt_out: bool = field( @@ -129,9 +129,9 @@ def configure( max_queue_size: Optional[int] = None, default_tags: Optional[List[str]] = None, instrument_llm_calls: Optional[bool] = None, - auto_start_session: Optional[bool] = None, + auto_start_trace: Optional[bool] = None, auto_init: Optional[bool] = None, - skip_auto_end_session: Optional[bool] = None, + skip_auto_end_trace: Optional[bool] = None, env_data_opt_out: Optional[bool] = None, log_level: Optional[Union[str, int]] = None, fail_safe: Optional[bool] = None, @@ -154,7 +154,7 @@ def configure( if max_wait_time is not None: self.max_wait_time = max_wait_time - + if export_flush_interval is not None: self.export_flush_interval = export_flush_interval @@ -167,14 +167,14 @@ def configure( if instrument_llm_calls is not None: self.instrument_llm_calls = instrument_llm_calls - if auto_start_session is not None: - self.auto_start_session = auto_start_session + if auto_start_trace is not None: + self.auto_start_trace = auto_start_trace if auto_init is not None: self.auto_init = auto_init - if skip_auto_end_session is not None: - self.skip_auto_end_session = skip_auto_end_session + if skip_auto_end_trace is not None: + self.skip_auto_end_trace = skip_auto_end_trace if env_data_opt_out is not None: self.env_data_opt_out = env_data_opt_out @@ -216,9 +216,9 @@ def dict(self): "max_queue_size": self.max_queue_size, "default_tags": self.default_tags, "instrument_llm_calls": self.instrument_llm_calls, - "auto_start_session": self.auto_start_session, + "auto_start_trace": self.auto_start_trace, "auto_init": self.auto_init, - "skip_auto_end_session": self.skip_auto_end_session, + "skip_auto_end_trace": self.skip_auto_end_trace, "env_data_opt_out": self.env_data_opt_out, "log_level": self.log_level, "fail_safe": self.fail_safe, diff --git a/agentops/exceptions.py b/agentops/exceptions.py index 98f4cd6e9..a9bb9e076 100644 --- a/agentops/exceptions.py +++ b/agentops/exceptions.py @@ -1,12 +1,26 @@ from agentops.logging import logger +class MultiTraceException(Exception): + def __init__(self, message): + super().__init__(message) + + +class NoTraceException(Exception): + def __init__(self, message="No trace found"): + super().__init__(message) + + class MultiSessionException(Exception): + """@deprecated Use MultiTraceException instead.""" + def __init__(self, message): super().__init__(message) class NoSessionException(Exception): + """@deprecated Use NoTraceException instead.""" + def __init__(self, message="No session found"): super().__init__(message) diff --git a/agentops/helpers/validation.py b/agentops/helpers/validation.py index 2a0c219cf..78c5d2008 100644 --- a/agentops/helpers/validation.py +++ b/agentops/helpers/validation.py @@ -4,4 +4,5 @@ def is_coroutine_or_generator(fn: Any) -> bool: """Check if a function is asynchronous (coroutine or async generator)""" import inspect + return inspect.iscoroutinefunction(fn) or inspect.isasyncgenfunction(fn) diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index ea097dec1..034962086 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -16,17 +16,17 @@ from agentops.semconv.span_kinds import SpanKind from agentops.exceptions import AgentOpsClientNotInitializedException -_current_session: Optional["Session"] = None +_current_trace: Optional["Trace"] = None -class Session: +class Trace: """ This class provides compatibility with CrewAI >= 0.105.0, which uses an event-based - integration pattern where it calls methods directly on the Session object: - + integration pattern where it calls methods directly on the Trace object: + - create_agent(): Called when a CrewAI agent is created - record(): Called when a CrewAI tool is used - - end_session(): Called when a CrewAI run completes + - end_trace(): Called when a CrewAI run completes """ def __init__(self, span: Any, token: Any): @@ -42,7 +42,7 @@ def __del__(self): def create_agent(self, name: Optional[str] = None, agent_id: Optional[str] = None, **kwargs): """ Method to create an agent for CrewAI >= 0.105.0 compatibility. - + CrewAI >= 0.105.0 calls this with: - name=agent.role - agent_id=str(agent.id) @@ -52,30 +52,40 @@ def create_agent(self, name: Optional[str] = None, agent_id: Optional[str] = Non def record(self, event=None): """ Method to record events for CrewAI >= 0.105.0 compatibility. - + CrewAI >= 0.105.0 calls this with a tool event when a tool is used. """ pass - def end_session(self, **kwargs): + def end_trace(self, **kwargs): """ - Method to end the session for CrewAI >= 0.105.0 compatibility. - + Method to end the trace for CrewAI >= 0.105.0 compatibility. + CrewAI >= 0.105.0 calls this with: - end_state="Success" - end_state_reason="Finished Execution" - + forces a flush to ensure the span is exported immediately. """ _set_span_attributes(self.span, kwargs) self.span.end() _flush_span_processors() + def end_session(self, **kwargs): + """ + @deprecated + Use end_trace instead. + + Method to end the session for CrewAI >= 0.105.0 compatibility. + Maintained for backward compatibility. + """ + return self.end_trace(**kwargs) + def _create_session_span(tags: Union[Dict[str, Any], List[str], None] = None) -> tuple: """ Helper function to create a session span with tags. - + This is an internal function used by start_session() to create the from the SDK to create a span with kind=SpanKind.SESSION. @@ -96,59 +106,60 @@ def _create_session_span(tags: Union[Dict[str, Any], List[str], None] = None) -> return _make_span("session", span_kind=SpanKind.SESSION, attributes=attributes) -def start_session( +def start_trace( tags: Union[Dict[str, Any], List[str], None] = None, -) -> Session: +) -> Trace: """ @deprecated - Start a new AgentOps session manually. + Start a new AgentOps trace manually. + + This function creates and starts a new trace span, which can be used to group + related operations together. The trace will remain active until end_trace + is called either with the Trace object or with kwargs. - This function creates and starts a new session span, which can be used to group - related operations together. The session will remain active until end_session - is called either with the Session object or with kwargs. - Usage patterns: - 1. Standard pattern: session = start_session(); end_session(session) - 2. CrewAI < 0.105.0: start_session(); end_session(end_state="Success", ...) - 3. CrewAI >= 0.105.0: session = start_session(); session.end_session(end_state="Success", ...) - - This function stores the session in a global variable to support the CrewAI - < 0.105.0 pattern where end_session is called without the session object. + 1. Standard pattern: trace = start_trace(); end_trace(trace) + 2. CrewAI < 0.105.0: start_trace(); end_trace(end_state="Success", ...) + 3. CrewAI >= 0.105.0: trace = start_trace(); trace.end_trace(end_state="Success", ...) + + This function stores the trace in a global variable to support the CrewAI + < 0.105.0 pattern where end_trace is called without the trace object. Args: - tags: Optional tags to attach to the session, useful for filtering in the dashboard. + tags: Optional tags to attach to the trace, useful for filtering in the dashboard. Can be a list of strings or a dict of key-value pairs. Returns: - A Session object that should be passed to end_session (except in the - CrewAI < 0.105.0 pattern where end_session is called with kwargs only) + A Trace object that should be passed to end_trace (except in the + CrewAI < 0.105.0 pattern where end_trace is called with kwargs only) Raises: AgentOpsClientNotInitializedException: If the client is not initialized """ - global _current_session - + global _current_trace + if not TracingCore.get_instance().initialized: from agentops import Client + Client().init() - + span, context, token = _create_session_span(tags) - session = Session(span, token) - _current_session = session - return session + trace = Trace(span, token) + _current_trace = trace + return trace def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None: """ Helper to set attributes on a span. - + Args: span: The span to set attributes on attributes: The attributes to set as a dictionary """ if not attributes or not hasattr(span, "set_attribute"): return - + for key, value in attributes.items(): span.set_attribute(f"agentops.status.{key}", str(value)) @@ -159,82 +170,106 @@ def _flush_span_processors() -> None: """ try: from opentelemetry.trace import get_tracer_provider + tracer_provider = get_tracer_provider() tracer_provider.force_flush() # type: ignore except Exception as e: logger.warning(f"Failed to force flush span processor: {e}") - -def end_session(session_or_status: Any = None, **kwargs) -> None: + +def end_trace(trace_or_status: Any = None, **kwargs) -> None: """ @deprecated - End a previously started AgentOps session. + End a previously started AgentOps trace. - This function ends the session span and detaches the context token, - completing the session lifecycle. + This function ends the trace span and detaches the context token, + completing the trace lifecycle. This function supports multiple calling patterns for backward compatibility: - 1. With a Session object: Used by most code and CrewAI >= 0.105.0 event system + 1. With a Trace object: Used by most code and CrewAI >= 0.105.0 event system 2. With named parameters only: Used by CrewAI < 0.105.0 direct integration 3. With a string status: Used by some older code Args: - session_or_status: The session object returned by start_session, + trace_or_status: The trace object returned by start_trace, or a string representing the status (for backwards compatibility) - **kwargs: Additional arguments for CrewAI < 0.105.0 compatibility. + **kwargs: Additional arguments for CrewAI < 0.105.0 compatibility. CrewAI < 0.105.0 passes these named arguments: - end_state="Success" - end_state_reason="Finished Execution" - is_auto_end=True - + When called this way, the function will use the most recently - created session via start_session(). + created trace via start_trace(). """ from agentops.sdk.decorators.utility import _finalize_span - + from agentops.sdk.core import TracingCore + if not TracingCore.get_instance().initialized: - logger.debug("Ignoring end_session call - TracingCore not initialized") + logger.debug("Ignoring end_trace call - TracingCore not initialized") return - # In some old implementations, and in crew < 0.10.5 `end_session` will be - # called with a single string as a positional argument like: "Success" + # In some old implementations, and in crew < 0.10.5 `end_trace` will be + # called with a single string as a positional argument like: "Success" - # Handle the CrewAI < 0.105.0 integration pattern where end_session is called + # Handle the CrewAI < 0.105.0 integration pattern where end_trace is called # with only named parameters. In this pattern, CrewAI does not keep a reference - # to the Session object, instead it calls: + # to the Trace object, instead it calls: # - # agentops.end_session( + # agentops.end_trace( # end_state="Success", # end_state_reason="Finished Execution", # is_auto_end=True # ) - if session_or_status is None and kwargs: - global _current_session - - if _current_session is not None: - _set_span_attributes(_current_session.span, kwargs) - _finalize_span(_current_session.span, _current_session.token) + if trace_or_status is None and kwargs: + global _current_trace + + if _current_trace is not None: + _set_span_attributes(_current_trace.span, kwargs) + _finalize_span(_current_trace.span, _current_trace.token) _flush_span_processors() - _current_session = None + _current_trace = None return - - # Handle the standard pattern and CrewAI >= 0.105.0 pattern where a Session object is passed. - # In both cases, we call _finalize_span with the span and token from the Session. - # This is the most direct and precise way to end a specific session. - if hasattr(session_or_status, 'span') and hasattr(session_or_status, 'token'): - _set_span_attributes(session_or_status.span, kwargs) - _finalize_span(session_or_status.span, session_or_status.token) + + # Handle the standard pattern and CrewAI >= 0.105.0 pattern where a Trace object is passed. + # In both cases, we call _finalize_span with the span and token from the Trace. + # This is the most direct and precise way to end a specific trace. + if hasattr(trace_or_status, "span") and hasattr(trace_or_status, "token"): + _set_span_attributes(trace_or_status.span, kwargs) + _finalize_span(trace_or_status.span, trace_or_status.token) _flush_span_processors() +def end_session(session_or_status: Any = None, **kwargs) -> None: + """ + @deprecated + Use end_trace instead. + + End a previously started AgentOps session. + This function is maintained for backward compatibility. + """ + return end_trace(session_or_status, **kwargs) + + +def end_all_traces(): + """ + @deprecated + We don't automatically track more than one trace, so just end the trace + that we are tracking. + """ + end_trace() + + def end_all_sessions(): """ @deprecated - We don't automatically track more than one session, so just end the session - that we are tracking. + Use end_all_traces instead. + + We don't automatically track more than one session, so just end the session + that we are tracking. """ - end_session() + end_all_traces() def ToolEvent(*args, **kwargs) -> None: @@ -249,17 +284,17 @@ def ErrorEvent(*args, **kwargs): """ @deprecated Use tracing instead. - + For backward compatibility with tests, this returns a minimal object with the required attributes. """ from agentops.helpers.time import get_ISO_time - + class LegacyErrorEvent: def __init__(self): self.init_timestamp = get_ISO_time() self.end_timestamp = None - + return LegacyErrorEvent() @@ -267,17 +302,17 @@ def ActionEvent(*args, **kwargs): """ @deprecated Use tracing instead. - + For backward compatibility with tests, this returns a minimal object with the required attributes. """ from agentops.helpers.time import get_ISO_time - + class LegacyActionEvent: def __init__(self): self.init_timestamp = get_ISO_time() self.end_timestamp = None - + return LegacyActionEvent() @@ -294,28 +329,35 @@ def track_agent(*args, **kwargs): @deprecated Decorator for marking agents in legacy projects. """ + def noop(f): return f + return noop def track_tool(*args, **kwargs): """ @deprecated - Decorator for marking tools and legacy projects. + Decorator for marking tools and legacy projects. """ + def noop(f): return f + return noop __all__ = [ - "start_session", - "end_session", - "ToolEvent", - "ErrorEvent", - "ActionEvent", - "track_agent", + "start_trace", + "end_trace", + "end_all_traces", + "start_session", # For backward compatibility + "end_session", # For backward compatibility + "end_all_sessions", # For backward compatibility + "ToolEvent", + "ErrorEvent", + "ActionEvent", + "track_agent", "track_tool", - "end_all_sessions" ] diff --git a/agentops/sdk/__init__.py b/agentops/sdk/__init__.py index 1b0779dd5..f1be1f718 100644 --- a/agentops/sdk/__init__.py +++ b/agentops/sdk/__init__.py @@ -7,8 +7,10 @@ # Import core components from agentops.sdk.core import TracingCore + # Import decorators from agentops.sdk.decorators import agent, operation, session, task, workflow + # from agentops.sdk.traced import TracedObject # Merged into TracedObject from agentops.sdk.types import TracingConfig diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 540089e8c..887e17d35 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -5,10 +5,8 @@ from typing import List, Optional from opentelemetry import metrics, trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ - OTLPMetricExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ - OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -66,10 +64,7 @@ def setup_telemetry( trace.set_tracer_provider(provider) # Create exporter with authentication - exporter = OTLPSpanExporter( - endpoint=exporter_endpoint, - headers={"Authorization": f"Bearer {jwt}"} if jwt else {} - ) + exporter = OTLPSpanExporter(endpoint=exporter_endpoint, headers={"Authorization": f"Bearer {jwt}"} if jwt else {}) # Regular processor for normal spans and immediate export processor = BatchSpanProcessor( @@ -82,10 +77,7 @@ def setup_telemetry( # Setup metrics metric_reader = PeriodicExportingMetricReader( - OTLPMetricExporter( - endpoint=metrics_endpoint, - headers={"Authorization": f"Bearer {jwt}"} if jwt else {} - ) + OTLPMetricExporter(endpoint=metrics_endpoint, headers={"Authorization": f"Bearer {jwt}"} if jwt else {}) ) meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(meter_provider) @@ -201,7 +193,7 @@ def shutdown(self) -> None: # Perform a single flush on the SynchronousSpanProcessor (which takes care of all processors' shutdown) if not self._initialized: return - self._provider._active_span_processor.force_flush(self.config['max_wait_time']) # type: ignore + self._provider._active_span_processor.force_flush(self.config["max_wait_time"]) # type: ignore # Shutdown provider if self._provider: diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index a7ffad8a9..706bd4624 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -16,12 +16,6 @@ session = create_entity_decorator(SpanKind.SESSION) operation = task -__all__ = [ - 'agent', - 'task', - 'workflow', - 'session', - 'operation' -] +__all__ = ["agent", "task", "workflow", "session", "operation"] # Create decorators task, workflow, session, agent diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index b29ade4d6..1852ed314 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -8,26 +8,33 @@ from agentops.logging import logger from agentops.sdk.core import TracingCore -from .utility import (_create_as_current_span, _finalize_span, _make_span, - _process_async_generator, _process_sync_generator, - _record_entity_input, _record_entity_output) +from .utility import ( + _create_as_current_span, + _finalize_span, + _make_span, + _process_async_generator, + _process_sync_generator, + _record_entity_input, + _record_entity_output, +) def create_entity_decorator(entity_kind: str): """ Factory function that creates decorators for specific entity kinds. - + Args: entity_kind: The type of operation being performed (SpanKind.*) - + Returns: A decorator with optional arguments for name and version """ + def decorator(wrapped=None, *, name=None, version=None): # Handle case where decorator is called with parameters if wrapped is None: return functools.partial(decorator, name=name, version=version) - + # Handle class decoration if inspect.isclass(wrapped): # Create a proxy class that wraps the original class @@ -37,33 +44,33 @@ def __init__(self, *args, **kwargs): operation_name = name or wrapped.__name__ self._agentops_span_context_manager = _create_as_current_span(operation_name, entity_kind, version) self._agentops_active_span = self._agentops_span_context_manager.__enter__() - + try: _record_entity_input(self._agentops_active_span, args, kwargs) except Exception as e: logger.warning(f"Failed to record entity input: {e}") - + # Call the original __init__ super().__init__(*args, **kwargs) - + def __del__(self): # End span when instance is destroyed - if hasattr(self, '_agentops_active_span') and hasattr(self, '_agentops_span_context_manager'): + if hasattr(self, "_agentops_active_span") and hasattr(self, "_agentops_span_context_manager"): try: _record_entity_output(self._agentops_active_span, self) except Exception as e: logger.warning(f"Failed to record entity output: {e}") - + self._agentops_span_context_manager.__exit__(None, None, None) - + # Preserve metadata of the original class WrappedClass.__name__ = wrapped.__name__ WrappedClass.__qualname__ = wrapped.__qualname__ WrappedClass.__module__ = wrapped.__module__ WrappedClass.__doc__ = wrapped.__doc__ - + return WrappedClass - + # Create the actual decorator wrapper function for functions @wrapt.decorator def wrapper(wrapped, instance, args, kwargs): @@ -87,10 +94,10 @@ def wrapper(wrapped, instance, args, kwargs): _record_entity_input(span, args, kwargs) except Exception as e: logger.warning(f"Failed to record entity input: {e}") - + result = wrapped(*args, **kwargs) return _process_sync_generator(span, result) - + # Handle async generator functions elif is_async_generator: # Use the old approach for async generators @@ -99,19 +106,20 @@ def wrapper(wrapped, instance, args, kwargs): _record_entity_input(span, args, kwargs) except Exception as e: logger.warning(f"Failed to record entity input: {e}") - + result = wrapped(*args, **kwargs) return _process_async_generator(span, token, result) - + # Handle async functions elif is_async: + async def _wrapped_async(): with _create_as_current_span(operation_name, entity_kind, version) as span: try: _record_entity_input(span, args, kwargs) except Exception as e: logger.warning(f"Failed to record entity input: {e}") - + try: result = await wrapped(*args, **kwargs) try: @@ -122,9 +130,9 @@ async def _wrapped_async(): except Exception as e: span.record_exception(e) raise - + return _wrapped_async() - + # Handle sync functions else: with _create_as_current_span(operation_name, entity_kind, version) as span: @@ -132,7 +140,7 @@ async def _wrapped_async(): _record_entity_input(span, args, kwargs) except Exception as e: logger.warning(f"Failed to record entity input: {e}") - + try: result = wrapped(*args, **kwargs) try: @@ -145,8 +153,6 @@ async def _wrapped_async(): raise # Return the wrapper for functions, we already returned WrappedClass for classes - return wrapper(wrapped) # type: ignore - - return decorator - + return wrapper(wrapped) # type: ignore + return decorator diff --git a/agentops/sdk/decorators/utility.py b/agentops/sdk/decorators/utility.py index a55c54056..66e304c60 100644 --- a/agentops/sdk/decorators/utility.py +++ b/agentops/sdk/decorators/utility.py @@ -31,6 +31,7 @@ def set_workflow_name(workflow_name: str) -> None: def set_entity_path(entity_path: str) -> None: attach(set_value("entity_path", entity_path)) + # Helper functions for content management @@ -73,17 +74,14 @@ def _get_current_span_info(): "span_id": f"{ctx.span_id:x}" if hasattr(ctx, "span_id") else "None", "trace_id": f"{ctx.trace_id:x}" if hasattr(ctx, "trace_id") else "None", "name": getattr(current_span, "name", "Unknown"), - "is_recording": getattr(current_span, "is_recording", False) + "is_recording": getattr(current_span, "is_recording", False), } return {"name": "No current span"} @contextmanager def _create_as_current_span( - operation_name: str, - span_kind: str, - version: Optional[int] = None, - attributes: Optional[Dict[str, Any]] = None + operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None ) -> Generator[Span, None, None]: """ Create and yield an instrumentation span as the current span using proper context management. @@ -104,7 +102,7 @@ def _create_as_current_span( # Log before we do anything before_span = _get_current_span_info() logger.debug(f"[DEBUG] BEFORE {operation_name}.{span_kind} - Current context: {before_span}") - + # Create span with proper naming convention span_name = f"{operation_name}.{span_kind}" @@ -125,26 +123,25 @@ def _create_as_current_span( # Get current context explicitly to debug it current_context = context_api.get_current() - + # Use OpenTelemetry's context manager to properly handle span lifecycle with tracer.start_as_current_span(span_name, attributes=attributes, context=current_context) as span: # Log after span creation if hasattr(span, "get_span_context"): span_ctx = span.get_span_context() - logger.debug(f"[DEBUG] CREATED {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}") - + logger.debug( + f"[DEBUG] CREATED {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}" + ) + yield span - + # Log after we're done after_span = _get_current_span_info() logger.debug(f"[DEBUG] AFTER {operation_name}.{span_kind} - Returned to context: {after_span}") def _make_span( - operation_name: str, - span_kind: str, - version: Optional[int] = None, - attributes: Optional[Dict[str, Any]] = None + operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None ) -> tuple: """ Create a span without context management for manual span lifecycle control. @@ -167,7 +164,7 @@ def _make_span( # Log before we do anything before_span = _get_current_span_info() logger.debug(f"[DEBUG] BEFORE _make_span {operation_name}.{span_kind} - Current context: {before_span}") - + # Create span with proper naming convention span_name = f"{operation_name}.{span_kind}" @@ -188,18 +185,20 @@ def _make_span( # Get current context explicitly current_context = context_api.get_current() - + # Create the span with explicit context span = tracer.start_span(span_name, context=current_context, attributes=attributes) # Set as current context and get token for later detachment ctx = trace.set_span_in_context(span) token = context_api.attach(ctx) - + # Log after span creation if hasattr(span, "get_span_context"): span_ctx = span.get_span_context() - logger.debug(f"[DEBUG] CREATED _make_span {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}") + logger.debug( + f"[DEBUG] CREATED _make_span {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}" + ) return span, ctx, token @@ -236,15 +235,15 @@ def _finalize_span(span: trace.Span, token: Any) -> None: if hasattr(span, "get_span_context") and hasattr(span.get_span_context(), "span_id"): span_id = f"{span.get_span_context().span_id:x}" logger.debug(f"[DEBUG] ENDING span {getattr(span, 'name', 'unknown')} - span_id: {span_id}") - + span.end() - + # Debug info before detaching current_after_end = _get_current_span_info() logger.debug(f"[DEBUG] AFTER span.end() - Current context: {current_after_end}") - + context_api.detach(token) - + # Debug info after detaching final_context = _get_current_span_info() logger.debug(f"[DEBUG] AFTER detach - Final context: {final_context}") diff --git a/agentops/semconv/span_kinds.py b/agentops/semconv/span_kinds.py index 190afacbe..3315e04b4 100644 --- a/agentops/semconv/span_kinds.py +++ b/agentops/semconv/span_kinds.py @@ -16,7 +16,7 @@ class SpanKind: # Workflow kinds WORKFLOW_STEP = "workflow.step" # Step in a workflow - WORKFLOW = 'workflow' + WORKFLOW = "workflow" SESSION = "session" TASK = "task" OPERATION = "operation" diff --git a/docs/v1/concepts/sessions.mdx b/docs/v1/concepts/sessions.mdx deleted file mode 100644 index ec98cb7f5..000000000 --- a/docs/v1/concepts/sessions.mdx +++ /dev/null @@ -1,188 +0,0 @@ ---- -title: Sessions -description: Detailed breakdown of initializing AgentOps and managing sessions. ---- - -A **Session** encapsulates a singular execution instance of your workflow, bringing together all agents, LLMs, -actions, etc., under one umbrella. Consequently, it is imperative for each event to be associated with a session. -The AgentOps dashboard provides detailed insights at the session level, including costs, token counts, errors, and more. - -**There must be an active session in order to use AgentOps.** - ---- - -## `Session` - -### Properties - -Sessions possess the following attributes: -- **ID**: A unique identifier for the session. -- **Project ID**: Identifies the project associated with the session, determined by the API Key used. -- **Starting Timestamp**: Marks the beginning of the session. -- **Ending Timestamp**: Indicates when the session concludes. -- **End State**: Signifies the success or failure of the session. - -Optionally, sessions may include: -- **End State Reason**: Explains why the session ended, whether due to an error or a user-triggered interrupt (SIGINT). -- **Tags**: Tags allow for the categorization and later retrieval of sessions. -- **Host Environment**: Automatically gathers basic information about the system on which the session ran. -- **Video**: If applicable, an optional video recording of the session. - -### Methods -#### `end_session` -**Params** -- **end_state** (str, enum): Success|Failure|Indeterminate -- **end_state_reason** (optional, str): additional notes on end state - -**Returns** (str): Total cost of session in USD - -#### `record` -**Params** -- **event** ([Event](/v1/concepts/events#event-class)): The Event to record as part of the session - - -#### `add_tags` -**Params** -- **tags** (List[str]): a list of tags to assign to append to the current tags - -#### `set_tags` -**Params** -- **tags** (List[str]): a list of tags to assign to append to set - -_Note: Overrides any current tags_ - -#### `get_analytics` -**Returns** (dict): A dictionary containing various analytics metrics for the session. - - -## Starting a Session -When you call `agentops.init()`, a session is automatically started. -Calling `agentops.init(auto_start_session=False)` will initialize the AgentOps SDK but not start a session. - -To start a session later, call `agentops.start_session()` [(reference)](/v1/usage/sdk-reference/#start-session) - -Both `agentops.init()` and `agentops.start_session()` work as a factory pattern and return a `Session` object. The above methods can all be called on this session object. - -## Ending a Session -If a process ends without any call to agentops, it will show in the dashboard as `Indeterminate`. -To end with a state, call either `agentops.end_session(...)` [(reference)](/v1/usage/sdk-reference/#end-session) if only one session is in use. Otherwise use `session.end_session(...)`. - -## Inherited Sessions -When working with multiple agents running in different processes, it's possible to initialize AgentOps or start a session -with an existing session_id. - -`agentops.init(inherited_session_id=)` -`agentops.start_session(inherited_session_id=)` - -You can retrieve the current `session_id` by assigning the returned value from `init()` or `start_session()`. - - - -```python -import agentops -session = agentops.init() -# pass session.session_id to the other process -``` - -```python -# -- other process -- -session_id = retrieve_session_id() # <-- your function -agentops.init(inherited_session_id=) -``` - - - -Both processes will now contribute data to the same session. - -## Session Data Export -AgentOps provides REST endpoints to export your session data and statistics. These endpoints allow you to retrieve detailed information about your sessions programmatically. - -### Authentication -All data export requests require a single header: -- `X-Agentops-Api-Key`: Your AgentOps API key - -### Available Endpoints - -#### Get Session Statistics -```http -GET /v2/sessions//stats -``` - -Returns statistics for the specified session including: -- Event counts -- Duration -- Costs -- Token usage -- Other session metrics - -#### Export Complete Session Data -```http -GET /v2/sessions//export -``` - -Returns comprehensive session data including: -- Session metadata -- Statistics -- All recorded events: - - Actions - - LLM calls - - Tool usage - - Errors - -### Example Usage -```python -import requests - -# Your AgentOps API key -api_key = "your-api-key" -session_id = "your-session-id" - -headers = { - "X-Agentops-Api-Key": api_key -} - -# Get session stats -stats_url = f"https://api.agentops.ai/v2/sessions/{session_id}/stats" -stats_response = requests.get(stats_url, headers=headers) -stats = stats_response.json() - -# Export complete session data -export_url = f"https://api.agentops.ai/v2/sessions/{session_id}/export" -export_response = requests.get(export_url, headers=headers) -session_data = export_response.json() -``` - -## Session Analytics -You can retrieve the analytics for a session by calling `session.get_analytics()`. - -The example below shows how to record events and retrieve analytics. - - - -```python -import agentops -session = agentops.init() -session.record(ActionEvent("llms")) -session.record(ActionEvent("tools")) -analytics = session.get_analytics() -print(analytics) -session.end_session("Success") -``` - -The output will look like this - - -```bash -{'LLM calls': 0, 'Tool calls': 0, 'Actions': 0, 'Errors': 0, 'Duration': '0.9s', 'Cost': '0.00'} -``` - - - -## The AgentOps SDK Client -_More info for the curious_ - -Under the hood, `agentops.init()` creates a `Client` object with various configuration options. Whenever you start a new session, these configuration options will automatically -be applied. You can also apply different configuration options when you start a new session by passing in a -[Configuration](/v1/usage/sdk-reference/#configuration) object. - - - diff --git a/docs/v1/concepts/traces.mdx b/docs/v1/concepts/traces.mdx new file mode 100644 index 000000000..d1662baf2 --- /dev/null +++ b/docs/v1/concepts/traces.mdx @@ -0,0 +1,188 @@ +--- +title: Traces +description: Detailed breakdown of initializing AgentOps and managing traces. +--- + +A **Trace** encapsulates a singular execution instance of your workflow, bringing together all agents, LLMs, +actions, etc., under one umbrella. Consequently, it is imperative for each event to be associated with a trace. +The AgentOps dashboard provides detailed insights at the trace level, including costs, token counts, errors, and more. + +**There must be an active trace in order to use AgentOps.** + +--- + +## `Trace` + +### Properties + +Traces possess the following attributes: +- **ID**: A unique identifier for the trace. +- **Project ID**: Identifies the project associated with the trace, determined by the API Key used. +- **Starting Timestamp**: Marks the beginning of the trace. +- **Ending Timestamp**: Indicates when the trace concludes. +- **End State**: Signifies the success or failure of the trace. + +Optionally, traces may include: +- **End State Reason**: Explains why the trace ended, whether due to an error or a user-triggered interrupt (SIGINT). +- **Tags**: Tags allow for the categorization and later retrieval of traces. +- **Host Environment**: Automatically gathers basic information about the system on which the trace ran. +- **Video**: If applicable, an optional video recording of the trace. + +### Methods +#### `end_trace` +**Params** +- **end_state** (str, enum): Success|Failure|Indeterminate +- **end_state_reason** (optional, str): additional notes on end state + +**Returns** (str): Total cost of trace in USD + +#### `record` +**Params** +- **event** ([Event](/v1/concepts/events#event-class)): The Event to record as part of the trace + + +#### `add_tags` +**Params** +- **tags** (List[str]): a list of tags to assign to append to the current tags + +#### `set_tags` +**Params** +- **tags** (List[str]): a list of tags to assign to append to set + +_Note: Overrides any current tags_ + +#### `get_analytics` +**Returns** (dict): A dictionary containing various analytics metrics for the session. + + +## Starting a Trace +When you call `agentops.init()`, a trace is automatically started. +Calling `agentops.init(auto_start_trace=False)` will initialize the AgentOps SDK but not start a trace. + +To start a trace later, call `agentops.start_trace()` [(reference)](/v1/usage/sdk-reference/#start-trace) + +Both `agentops.init()` and `agentops.start_trace()` work as a factory pattern and return a `Trace` object. The above methods can all be called on this trace object. + +## Ending a Trace +If a process ends without any call to agentops, it will show in the dashboard as `Indeterminate`. +To end with a state, call either `agentops.end_trace(...)` [(reference)](/v1/usage/sdk-reference/#end-trace) if only one trace is in use. Otherwise use `trace.end_trace(...)`. + +## Inherited Traces +When working with multiple agents running in different processes, it's possible to initialize AgentOps or start a trace +with an existing trace_id. + +`agentops.init(inherited_trace_id=)` +`agentops.start_trace(inherited_trace_id=)` + +You can retrieve the current `trace_id` by assigning the returned value from `init()` or `start_trace()`. + + + +```python +import agentops +trace = agentops.init() +# pass trace.trace_id to the other process +``` + +```python +# -- other process -- +trace_id = retrieve_trace_id() # <-- your function +agentops.init(inherited_trace_id=) +``` + + + +Both processes will now contribute data to the same trace. + +## Trace Data Export +AgentOps provides REST endpoints to export your trace data and statistics. These endpoints allow you to retrieve detailed information about your traces programmatically. + +### Authentication +All data export requests require a single header: +- `X-Agentops-Api-Key`: Your AgentOps API key + +### Available Endpoints + +#### Get Trace Statistics +```http +GET /v2/traces//stats +``` + +Returns statistics for the specified trace including: +- Event counts +- Duration +- Costs +- Token usage +- Other trace metrics + +#### Export Complete Trace Data +```http +GET /v2/traces//export +``` + +Returns comprehensive trace data including: +- Trace metadata +- Statistics +- All recorded events: + - Actions + - LLM calls + - Tool usage + - Errors + +### Example Usage +```python +import requests + +# Your AgentOps API key +api_key = "your-api-key" +trace_id = "your-trace-id" + +headers = { + "X-Agentops-Api-Key": api_key +} + +# Get trace stats +stats_url = f"https://api.agentops.ai/v2/traces/{trace_id}/stats" +stats_response = requests.get(stats_url, headers=headers) +stats = stats_response.json() + +# Export complete trace data +export_url = f"https://api.agentops.ai/v2/traces/{trace_id}/export" +export_response = requests.get(export_url, headers=headers) +trace_data = export_response.json() +``` + +## Trace Analytics +You can retrieve the analytics for a trace by calling `trace.get_analytics()`. + +The example below shows how to record events and retrieve analytics. + + + +```python +import agentops +trace = agentops.init() +trace.record(ActionEvent("llms")) +trace.record(ActionEvent("tools")) +analytics = trace.get_analytics() +print(analytics) +trace.end_trace("Success") +``` + +The output will look like this - + +```bash +{'LLM calls': 0, 'Tool calls': 0, 'Actions': 0, 'Errors': 0, 'Duration': '0.9s', 'Cost': '0.00'} +``` + + + +## The AgentOps SDK Client +_More info for the curious_ + +Under the hood, `agentops.init()` creates a `Client` object with various configuration options. Whenever you start a new trace, these configuration options will automatically +be applied. You can also apply different configuration options when you start a new trace by passing in a +[Configuration](/v1/usage/sdk-reference/#configuration) object. + + + diff --git a/examples/agents-examples/basic/hello_world.py b/examples/agents-examples/basic/hello_world.py index d4f2264c2..5831fd46b 100644 --- a/examples/agents-examples/basic/hello_world.py +++ b/examples/agents-examples/basic/hello_world.py @@ -9,6 +9,7 @@ agentops.init() + async def main(): agent = Agent( name="Assistant", diff --git a/examples/crewai-basic.py b/examples/crewai-basic.py index 56532b3c3..667d517e8 100644 --- a/examples/crewai-basic.py +++ b/examples/crewai-basic.py @@ -1,4 +1,5 @@ from dotenv import load_dotenv + load_dotenv() import agentops diff --git a/examples/opentelemetry/token_importance.py b/examples/opentelemetry/token_importance.py index fd3c71159..654e5aa64 100644 --- a/examples/opentelemetry/token_importance.py +++ b/examples/opentelemetry/token_importance.py @@ -7,18 +7,20 @@ import json from typing import Dict, Any, List, Optional, Sequence + # Create a no-op exporter to prevent spans from being printed class NoopExporter(SpanExporter): """A span exporter that doesn't export spans anywhere.""" - + def export(self, spans: Sequence) -> None: """Do nothing with the spans.""" pass - + def shutdown(self) -> None: """Shutdown the exporter.""" pass + # Set up basic tracing provider = TracerProvider() # Use the NoopExporter instead of ConsoleSpanExporter @@ -32,16 +34,19 @@ def shutdown(self) -> None: # ======== Visualization Helpers ======== + def print_header(title): """Print a formatted header""" print("\n" + "=" * 80) print(f" {title}") print("=" * 80) + def print_step(step_num, description): """Print a step in the process""" print(f"\n[Step {step_num}] {description}") + def print_span_tree(spans, indent=0): """Print a visual representation of the span tree""" for i, span in enumerate(spans): @@ -49,12 +54,13 @@ def print_span_tree(spans, indent=0): prefix = "└── " if is_last else "├── " print("│ " * indent + prefix + span) + def print_context_state(active_span_name, context_stack=None, baggage_items=None): """Print the current context state with visualization""" print("\n Current Context State:") print(" --------------------") print(f" Active span: {active_span_name}") - + if context_stack: print("\n Context Stack (top to bottom):") for i, span in enumerate(context_stack): @@ -63,26 +69,27 @@ def print_context_state(active_span_name, context_stack=None, baggage_items=None else: print(f" │ {span}") print(" └─────────────") - + if baggage_items: print("\n Baggage Items:") print(" -------------") for key, value in baggage_items.items(): print(f" 🔷 {key}: {value}") + def print_span_details(span, title="Span Details"): """Print detailed information about a span""" if not hasattr(span, "get_span_context"): print(" No span details available") return - + ctx = span.get_span_context() print(f"\n {title}:") print(" " + "-" * len(title)) print(f" Name: {getattr(span, 'name', 'Unknown')}") print(f" Trace ID: {ctx.trace_id:x}") print(f" Span ID: {ctx.span_id:x}") - + # Try to get attributes if possible attributes = getattr(span, "_attributes", {}) if attributes: @@ -90,11 +97,13 @@ def print_span_details(span, title="Span Details"): for key, value in attributes.items(): print(f" 📎 {key}: {str(value)}") + def get_current_span_name(): """Get the name of the current span or 'None' if no span is active""" current = trace.get_current_span() return getattr(current, "name", "None") + def get_current_baggage() -> Dict[str, str]: """Get all baggage items in the current context""" items = {} @@ -105,80 +114,82 @@ def get_current_baggage() -> Dict[str, str]: items[key] = value return items + # ======== Simulated Application Functions ======== + def simulate_database_query(query: str) -> Dict[str, Any]: """Simulate a database query with proper context propagation""" with tracer.start_as_current_span("database.query") as span: span.set_attribute("db.statement", query) span.set_attribute("db.system", "postgresql") - + # Simulate query execution time time.sleep(0.01) - + # Add current baggage to demonstrate propagation user_id = baggage.get_baggage("user.id") if user_id: span.set_attribute("user.id", str(user_id)) - + # Return simulated data return {"id": 1234, "name": "Sample Data", "status": "active"} + def call_external_api(endpoint: str) -> Dict[str, Any]: """Simulate an external API call with a different tracer""" with llm_tracer.start_as_current_span("http.request") as span: span.set_attribute("http.url", f"https://api.example.com/{endpoint}") span.set_attribute("http.method", "GET") - + # Simulate API call latency time.sleep(0.02) - + # Add baggage to simulate cross-service propagation tenant_id = baggage.get_baggage("tenant.id") if tenant_id: span.set_attribute("tenant.id", str(tenant_id)) - + # Sometimes operations fail if endpoint == "error": span.set_status(Status(StatusCode.ERROR)) span.set_attribute("error.message", "API returned 500 status code") return {"error": "Internal Server Error"} - + return {"status": "success", "data": {"key": "value"}} + def process_user_request(user_id: str, action: str) -> Dict[str, Any]: """Process a user request with nested spans and context propagation""" # Set baggage for the entire operation ctx = baggage.set_baggage("user.id", user_id) ctx = baggage.set_baggage("tenant.id", "tenant-1234", context=ctx) ctx = baggage.set_baggage("request.id", f"req-{int(time.time())}", context=ctx) - + # Attach the context with baggage token = context.attach(ctx) - + try: with tracer.start_as_current_span("process_request") as span: span.set_attribute("user.id", user_id) span.set_attribute("request.action", action) - + # Query the database (creates a child span) db_result = simulate_database_query(f"SELECT * FROM users WHERE id = '{user_id}'") - + # Call an external API (creates a child span with a different tracer) api_result = call_external_api("users/profile") - + # Combine results - return { - "user": db_result, - "profile": api_result, - "processed_at": time.time() - } + return {"user": db_result, "profile": api_result, "processed_at": time.time()} finally: # Always detach the context to clean up context.detach(token) + # ======== Scenarios ======== + def run_basic_scenarios(): """Run the original basic scenarios to demonstrate token importance""" # Scenario 1: Proper token management @@ -191,26 +202,26 @@ def run_basic_scenarios(): parent_name = get_current_span_name() print_context_state(parent_name, ["parent"]) print_span_tree(["parent"]) - + print_step(2, "Creating child span and attaching to context") # Manually create a child span and save the token child = tracer.start_span("child") ctx = trace.set_span_in_context(child) token = context.attach(ctx) - + child_name = get_current_span_name() print_context_state(child_name, ["child", "parent"]) print_span_tree(["parent", "child"]) - + print_step(3, "Ending child span AND detaching token (proper cleanup)") # End the child span and detach the token child.end() context.detach(token) - + restored_name = get_current_span_name() print_context_state(restored_name, ["parent"]) print_span_tree(["parent"]) - + print("\n✅ Result: Context properly restored to parent after child span ended") # Scenario 2: Missing token detachment @@ -223,26 +234,26 @@ def run_basic_scenarios(): parent_name = get_current_span_name() print_context_state(parent_name, ["parent2"]) print_span_tree(["parent2"]) - + print_step(2, "Creating child2 span and attaching to context") # Manually create a child span but don't save the token child = tracer.start_span("child2") ctx = trace.set_span_in_context(child) token = context.attach(ctx) # Token saved but not used later - + child_name = get_current_span_name() print_context_state(child_name, ["child2", "parent2"]) print_span_tree(["parent2", "child2"]) - + print_step(3, "Ending child2 span WITHOUT detaching token (improper cleanup)") # End the child span but don't detach the token child.end() # No context.detach(token) call! - + leaked_name = get_current_span_name() print_context_state(leaked_name, ["child2 (ended but context still active)", "parent2"]) print_span_tree(["parent2", "child2 (ended)"]) - + print("\n⚠️ Result: Context LEAK! Still showing child2 as current context even though span ended") print(" Any new spans created here would incorrectly use child2 as parent instead of parent2") @@ -256,44 +267,44 @@ def run_basic_scenarios(): outer_name = get_current_span_name() print_context_state(outer_name, ["outer"]) print_span_tree(["outer"]) - + print_step(2, "Creating middle1 span and attaching to context") # First middle span middle1 = tracer.start_span("middle1") ctx1 = trace.set_span_in_context(middle1) token1 = context.attach(ctx1) - + middle1_name = get_current_span_name() print_context_state(middle1_name, ["middle1", "outer"]) print_span_tree(["outer", "middle1"]) - + print_step(3, "Creating middle2 span and attaching to context") # Second middle span middle2 = tracer.start_span("middle2") ctx2 = trace.set_span_in_context(middle2) token2 = context.attach(ctx2) - + middle2_name = get_current_span_name() print_context_state(middle2_name, ["middle2", "middle1", "outer"]) print_span_tree(["outer", "middle1", "middle2"]) - + print_step(4, "Ending middle2 span and detaching token2") # End spans in reverse order with proper token management middle2.end() context.detach(token2) - + restored_middle1_name = get_current_span_name() print_context_state(restored_middle1_name, ["middle1", "outer"]) print_span_tree(["outer", "middle1", "middle2 (ended)"]) - + print_step(5, "Ending middle1 span and detaching token1") middle1.end() context.detach(token1) - + restored_outer_name = get_current_span_name() print_context_state(restored_outer_name, ["outer"]) print_span_tree(["outer", "middle1 (ended)", "middle2 (ended)"]) - + print("\n✅ Result: Context properly restored through multiple levels") # Scenario 4: What happens if we create new spans after a context leak @@ -306,62 +317,63 @@ def run_basic_scenarios(): root_name = get_current_span_name() print_context_state(root_name, ["root"]) print_span_tree(["root"]) - + print_step(2, "Creating leaky_child span and attaching to context") # Create a child span but don't save the token leaky = tracer.start_span("leaky_child") ctx = trace.set_span_in_context(leaky) context.attach(ctx) # Token not saved - + leaky_name = get_current_span_name() print_context_state(leaky_name, ["leaky_child", "root"]) print_span_tree(["root", "leaky_child"]) - + print_step(3, "Ending leaky_child span WITHOUT detaching token") # End the child span but don't detach the token leaky.end() # No context.detach() call! - + print_step(4, "Creating new_child span after context leak") # This span will be created with leaky_child as parent, not root! with tracer.start_as_current_span("new_child") as new_child: new_child_name = get_current_span_name() print_context_state(new_child_name, ["new_child", "leaky_child (ended but context active)", "root"]) print_span_tree(["root", "leaky_child (ended)", "new_child"]) - + print("\n⚠️ Problem: new_child is incorrectly parented to leaky_child instead of root") print(" This creates an incorrect trace hierarchy that doesn't match execution flow") + def run_advanced_scenarios(): """Run the new advanced scenarios demonstrating more complex context patterns""" - + # Scenario 5: Cross-function context propagation print_header("Scenario 5: Cross-Function Context Propagation") print("This scenario demonstrates how context and baggage propagate across function boundaries.") print("We'll create a request processing flow with multiple nested functions and spans.") - + print_step(1, "Starting user request processing with baggage") # Process a simulated request that will create nested spans across functions result = process_user_request("user-5678", "update_profile") - + print_step(2, "Request processing completed") print("\n Request processing result:") print(f" User data: {result['user']['name']}") print(f" Profile status: {result['profile']['status']}") - + print("\n✅ Result: Context and baggage successfully propagated across multiple function calls") print(" Each function created properly nested spans that maintained the baggage context") - + # Scenario 6: Using different tracers with the same context print_header("Scenario 6: Multiple Tracers with Shared Context") print("This scenario demonstrates using multiple tracers while maintaining a consistent context.") - + print_step(1, "Creating context with baggage") # Set up a context with baggage ctx = baggage.set_baggage("environment", "production") ctx = baggage.set_baggage("tenant.id", "tenant-9876", context=ctx) token = context.attach(ctx) - + try: print_step(2, "Starting span with main tracer") with tracer.start_as_current_span("main_operation") as main_span: @@ -369,31 +381,33 @@ def run_advanced_scenarios(): baggage_items = get_current_baggage() print_context_state(main_span_name, ["main_operation"], baggage_items) print_span_details(main_span) - + print_step(3, "Creating span with LLM tracer (different tracer)") with llm_tracer.start_as_current_span("llm_inference") as llm_span: llm_span.set_attribute("model", "gpt-4") llm_span.set_attribute("tokens", 150) - + llm_span_name = get_current_span_name() print_context_state(llm_span_name, ["llm_inference", "main_operation"], baggage_items) print_span_details(llm_span, "LLM Span Details") - + print_step(4, "Back to main tracer") # Create another span with the first tracer with tracer.start_as_current_span("post_processing") as post_span: post_span_name = get_current_span_name() - print_context_state(post_span_name, ["post_processing", "llm_inference", "main_operation"], baggage_items) + print_context_state( + post_span_name, ["post_processing", "llm_inference", "main_operation"], baggage_items + ) finally: context.detach(token) - + print("\n✅ Result: Multiple tracers successfully shared the same context") print(" Baggage was accessible to spans from both tracers") - + # Scenario 7: Handling errors in spans print_header("Scenario 7: Error Handling in Spans") print("This scenario demonstrates proper error handling with spans.") - + print_step(1, "Starting operation that will encounter an error") with tracer.start_as_current_span("error_prone_operation") as error_span: try: @@ -405,30 +419,33 @@ def run_advanced_scenarios(): error_span.record_exception(e) error_span.set_status(Status(StatusCode.ERROR)) print(f" Recorded exception: {str(e)}") - + print("\n✅ Result: Properly recorded error in span without breaking execution flow") print(" Errors should be visible in the trace visualization") - + # Scenario 8: Manual context saving and restoring print_header("Scenario 8: Manual Context Saving and Restoring") print("This scenario demonstrates saving a context and restoring it later.") - + print_step(1, "Creating initial context") with tracer.start_as_current_span("initial_operation") as initial_span: # Set some baggage ctx = baggage.set_baggage("checkpoint", "saved_point") - + # Save the current context for later use saved_context = context.get_current() print_context_state("initial_operation", ["initial_operation"], {"checkpoint": "saved_point"}) - + print_step(2, "Creating a different context") with tracer.start_as_current_span("intermediate_operation") as intermediate_span: # Change the baggage ctx = baggage.set_baggage("checkpoint", "intermediate_point") - print_context_state("intermediate_operation", ["intermediate_operation", "initial_operation"], - {"checkpoint": "intermediate_point"}) - + print_context_state( + "intermediate_operation", + ["intermediate_operation", "initial_operation"], + {"checkpoint": "intermediate_point"}, + ) + print_step(3, "Restoring saved context") # Restore the saved context token = context.attach(saved_context) @@ -437,15 +454,19 @@ def run_advanced_scenarios(): current_name = getattr(current_span, "name", "Unknown") checkpoint = baggage.get_baggage("checkpoint") print_context_state(current_name, ["initial_operation"], {"checkpoint": checkpoint}) - + print("\n✅ Result: Successfully restored previous context") finally: context.detach(token) - + print_step(4, "Back to intermediate context") - print_context_state("intermediate_operation", ["intermediate_operation", "initial_operation"], - {"checkpoint": "intermediate_point"}) - + print_context_state( + "intermediate_operation", + ["intermediate_operation", "initial_operation"], + {"checkpoint": "intermediate_point"}, + ) + + print_header("OpenTelemetry Context Management Demonstration") print("This example illustrates the importance of proper context management in OpenTelemetry.") print("It covers basic and advanced scenarios showing how context affects span relationships.") @@ -457,7 +478,7 @@ def run_advanced_scenarios(): while True: choice = input("\nEnter your choice (1-4): ") - + if choice == "1": run_basic_scenarios() elif choice == "2": diff --git a/examples/opentelemetry/token_importance_2.py b/examples/opentelemetry/token_importance_2.py index aea2fe1f4..024c4a30c 100644 --- a/examples/opentelemetry/token_importance_2.py +++ b/examples/opentelemetry/token_importance_2.py @@ -9,9 +9,11 @@ trace.set_tracer_provider(provider) tracer = trace.get_tracer("demo") + def get_current_span_name(): return getattr(trace.get_current_span(), "name", "None") + print("\n=== Scenario: Multiple contexts with the same span ===") print("This demonstrates why coupling spans and tokens can be problematic") diff --git a/examples/sdk/basic.py b/examples/sdk/basic.py index c8dcb879b..6c008d4f9 100644 --- a/examples/sdk/basic.py +++ b/examples/sdk/basic.py @@ -6,7 +6,6 @@ @agent class Agent: - @operation def nested_operation(self): print("Hello, world!") diff --git a/tests/benchmark/benchmark_init.py b/tests/benchmark/benchmark_init.py index 674062a0f..93432d2b5 100644 --- a/tests/benchmark/benchmark_init.py +++ b/tests/benchmark/benchmark_init.py @@ -7,10 +7,11 @@ Benchmark script for measuring TracingCore initialization time. """ + def run_benchmark(): """ Run a benchmark of TracingCore initialization. - + Returns: Dictionary with timing results """ @@ -24,19 +25,19 @@ def run_benchmark(): return { "init": init_time, - "total": init_time # Total time is just init time now + "total": init_time, # Total time is just init time now } def print_results(results): """ Print benchmark results in a formatted way. - + Args: results: Dictionary with timing results """ print("\n=== BENCHMARK RESULTS ===") - + print(f"\nINIT TIME: {results['init']:.6f}s") print(f"TOTAL TIME: {results['total']:.6f}s") @@ -44,4 +45,4 @@ def print_results(results): if __name__ == "__main__": print("Running TracingCore benchmark...") results = run_benchmark() - print_results(results) + print_results(results) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 742c1c89f..90ebecd64 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -32,8 +32,10 @@ def mock_req(endpoint, api_key): """ with requests_mock.Mocker(real_http=False) as m: # Map session IDs to their JWTs - m.post(endpoint + "/v3/auth/token", json={"token": str(uuid.uuid4()), - "project_id": "test-project-id", "api_key": api_key}) + m.post( + endpoint + "/v3/auth/token", + json={"token": str(uuid.uuid4()), "project_id": "test-project-id", "api_key": api_key}, + ) yield m diff --git a/tests/unit/sdk/instrumentation_tester.py b/tests/unit/sdk/instrumentation_tester.py index 9e5dc80d5..a3f868805 100644 --- a/tests/unit/sdk/instrumentation_tester.py +++ b/tests/unit/sdk/instrumentation_tester.py @@ -5,8 +5,7 @@ from opentelemetry import trace as trace_api from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import \ - InMemorySpanExporter +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.util.types import Attributes from agentops.sdk.core import TracingCore, setup_telemetry @@ -46,8 +45,7 @@ def reset_trace_globals(): class HasAttributesViaProperty(Protocol): @property - def attributes(self) -> Attributes: - ... + def attributes(self) -> Attributes: ... class HasAttributesViaAttr(Protocol): @@ -92,8 +90,7 @@ def __init__(self): # Patch the setup_telemetry function to return our test providers self.setup_telemetry_patcher = mock.patch( - 'agentops.sdk.core.setup_telemetry', - return_value=(self.tracer_provider, self.mock_meter_provider) + "agentops.sdk.core.setup_telemetry", return_value=(self.tracer_provider, self.mock_meter_provider) ) self.mock_setup_telemetry = self.setup_telemetry_patcher.start() diff --git a/tests/unit/sdk/test_decorators.py b/tests/unit/sdk/test_decorators.py index fa389e59e..bd444386a 100644 --- a/tests/unit/sdk/test_decorators.py +++ b/tests/unit/sdk/test_decorators.py @@ -61,10 +61,15 @@ def test_session(): assert len(spans) == 4 # Verify span kinds - session_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION] - agent_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT] - operation_spans = [s for s in spans if s.attributes and s.attributes.get( - SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK] + session_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION + ] + agent_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT + ] + operation_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK + ] assert len(session_spans) == 1 assert len(agent_spans) == 1 @@ -73,31 +78,31 @@ def test_session(): # Find the main_operation and nested_operation spans main_operation = None nested_operation = None - + for span in operation_spans: - if span.attributes and span.attributes.get('agentops.operation.name') == 'main_operation': + if span.attributes and span.attributes.get("agentops.operation.name") == "main_operation": main_operation = span - elif span.attributes and span.attributes.get('agentops.operation.name') == 'nested_operation': + elif span.attributes and span.attributes.get("agentops.operation.name") == "nested_operation": nested_operation = span - + assert main_operation is not None, "main_operation span not found" assert nested_operation is not None, "nested_operation span not found" - + # Verify the session span is the root session_span = session_spans[0] assert session_span.parent is None - + # Verify the agent span is a child of the session span agent_span = agent_spans[0] assert agent_span.parent is not None assert session_span.context is not None assert agent_span.parent.span_id == session_span.context.span_id - + # Verify main_operation is a child of the agent span assert main_operation.parent is not None assert agent_span.context is not None assert main_operation.parent.span_id == agent_span.context.span_id - + # Verify nested_operation is a child of main_operation assert nested_operation.parent is not None assert main_operation.context is not None @@ -150,10 +155,15 @@ async def test_async_session(): assert len(spans) == 4 # Verify span kinds - session_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION] - agent_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT] - operation_spans = [s for s in spans if s.attributes and s.attributes.get( - SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK] + session_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION + ] + agent_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT + ] + operation_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK + ] assert len(session_spans) == 1 assert len(agent_spans) == 1 @@ -162,31 +172,31 @@ async def test_async_session(): # Find the main_operation and nested_operation spans main_operation = None nested_operation = None - + for span in operation_spans: - if span.attributes and span.attributes.get('agentops.operation.name') == 'main_async_operation': + if span.attributes and span.attributes.get("agentops.operation.name") == "main_async_operation": main_operation = span - elif span.attributes and span.attributes.get('agentops.operation.name') == 'nested_async_operation': + elif span.attributes and span.attributes.get("agentops.operation.name") == "nested_async_operation": nested_operation = span - + assert main_operation is not None, "main_async_operation span not found" assert nested_operation is not None, "nested_async_operation span not found" - + # Verify the session span is the root session_span = session_spans[0] assert session_span.parent is None - + # Verify the agent span is a child of the session span agent_span = agent_spans[0] assert agent_span.parent is not None assert session_span.context is not None assert agent_span.parent.span_id == session_span.context.span_id - + # Verify main_operation is a child of the agent span assert main_operation.parent is not None assert agent_span.context is not None assert main_operation.parent.span_id == agent_span.context.span_id - + # Verify nested_operation is a child of main_operation assert nested_operation.parent is not None assert main_operation.context is not None @@ -241,10 +251,15 @@ def test_generator_session(): assert len(spans) == 4 # Verify span kinds - session_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION] - agent_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT] - operation_spans = [s for s in spans if s.attributes and s.attributes.get( - SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK] + session_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION + ] + agent_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT + ] + operation_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK + ] assert len(session_spans) == 1 assert len(agent_spans) == 1 @@ -253,31 +268,31 @@ def test_generator_session(): # Find the main_operation and nested_operation spans main_operation = None nested_operation = None - + for span in operation_spans: - if span.attributes and span.attributes.get('agentops.operation.name') == 'main_generator_operation': + if span.attributes and span.attributes.get("agentops.operation.name") == "main_generator_operation": main_operation = span - elif span.attributes and span.attributes.get('agentops.operation.name') == 'nested_generator': + elif span.attributes and span.attributes.get("agentops.operation.name") == "nested_generator": nested_operation = span - + assert main_operation is not None, "main_generator_operation span not found" assert nested_operation is not None, "nested_generator span not found" - + # Verify the session span is the root session_span = session_spans[0] assert session_span.parent is None - + # Verify the agent span is a child of the session span agent_span = agent_spans[0] assert agent_span.parent is not None assert session_span.context is not None assert agent_span.parent.span_id == session_span.context.span_id - + # Verify main_operation is a child of the agent span assert main_operation.parent is not None assert agent_span.context is not None assert main_operation.parent.span_id == agent_span.context.span_id - + # Verify nested_operation is a child of main_operation assert nested_operation.parent is not None assert main_operation.context is not None @@ -333,10 +348,15 @@ async def test_async_generator_session(): assert len(spans) == 4 # Verify span kinds - session_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION] - agent_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT] - operation_spans = [s for s in spans if s.attributes and s.attributes.get( - SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK] + session_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION + ] + agent_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT + ] + operation_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK + ] assert len(session_spans) == 1 assert len(agent_spans) == 1 @@ -345,31 +365,31 @@ async def test_async_generator_session(): # Find the main_operation and nested_operation spans main_operation = None nested_operation = None - + for span in operation_spans: - if span.attributes and span.attributes.get('agentops.operation.name') == 'main_async_generator_operation': + if span.attributes and span.attributes.get("agentops.operation.name") == "main_async_generator_operation": main_operation = span - elif span.attributes and span.attributes.get('agentops.operation.name') == 'nested_async_generator': + elif span.attributes and span.attributes.get("agentops.operation.name") == "nested_async_generator": nested_operation = span - + assert main_operation is not None, "main_async_generator_operation span not found" assert nested_operation is not None, "nested_async_generator span not found" - + # Verify the session span is the root session_span = session_spans[0] assert session_span.parent is None - + # Verify the agent span is a child of the session span agent_span = agent_spans[0] assert agent_span.parent is not None assert session_span.context is not None assert agent_span.parent.span_id == session_span.context.span_id - + # Verify main_operation is a child of the agent span assert main_operation.parent is not None assert agent_span.context is not None assert main_operation.parent.span_id == agent_span.context.span_id - + # Verify nested_operation is a child of main_operation assert nested_operation.parent is not None assert main_operation.context is not None @@ -427,10 +447,15 @@ def test_complex_session(): assert len(spans) == 5 # Verify span kinds - session_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION] - agent_spans = [s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT] - operation_spans = [s for s in spans if s.attributes and s.attributes.get( - SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK] + session_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.SESSION + ] + agent_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.AGENT + ] + operation_spans = [ + s for s in spans if s.attributes and s.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TASK + ] assert len(session_spans) == 1 assert len(agent_spans) == 1 @@ -440,42 +465,40 @@ def test_complex_session(): level1_operation = None level2_operation = None level3_operation = None - + for span in operation_spans: - if span.attributes and span.attributes.get('agentops.operation.name') == 'level1_operation': + if span.attributes and span.attributes.get("agentops.operation.name") == "level1_operation": level1_operation = span - elif span.attributes and span.attributes.get('agentops.operation.name') == 'level2_operation': + elif span.attributes and span.attributes.get("agentops.operation.name") == "level2_operation": level2_operation = span - elif span.attributes and span.attributes.get('agentops.operation.name') == 'level3_operation': + elif span.attributes and span.attributes.get("agentops.operation.name") == "level3_operation": level3_operation = span - + assert level1_operation is not None, "level1_operation span not found" assert level2_operation is not None, "level2_operation span not found" assert level3_operation is not None, "level3_operation span not found" - + # Verify the session span is the root session_span = session_spans[0] assert session_span.parent is None - + # Verify the agent span is a child of the session span agent_span = agent_spans[0] assert agent_span.parent is not None assert session_span.context is not None assert agent_span.parent.span_id == session_span.context.span_id - + # Verify level1_operation is a child of the agent span assert level1_operation.parent is not None assert agent_span.context is not None assert level1_operation.parent.span_id == agent_span.context.span_id - + # Verify level2_operation is a child of level1_operation assert level2_operation.parent is not None assert level1_operation.context is not None assert level2_operation.parent.span_id == level1_operation.context.span_id - + # Verify level3_operation is a child of level2_operation assert level3_operation.parent is not None assert level2_operation.context is not None assert level3_operation.parent.span_id == level2_operation.context.span_id - - diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py index e63557787..455874c2e 100644 --- a/tests/unit/test_session_legacy.py +++ b/tests/unit/test_session_legacy.py @@ -1,12 +1,10 @@ - - def test_session_auto_start(instrumentation): import agentops from agentops.legacy import Session # Pass a dummy API key for the test session = agentops.init(api_key="test-api-key", auto_start_session=True) - + assert isinstance(session, Session) @@ -42,12 +40,12 @@ def __init__(self): self.role = "Test Agent" self.goal = "Testing" self.id = "test-agent-id" - + agent = MockAgent() agentops.track_agent(agent) except Exception as e: assert False, f"track_agent raised an exception: {e}" - + # Test track_tool function exists and doesn't raise errors try: # Mock a tool object similar to what CrewAI would provide @@ -55,7 +53,7 @@ class MockTool: def __init__(self): self.name = "Test Tool" self.description = "A test tool" - + tool = MockTool() agentops.track_tool(tool, "Test Agent") except Exception as e: @@ -64,97 +62,85 @@ def __init__(self): # Test events that CrewAI might use tool_event = agentops.ToolEvent(name="test_tool") action_event = agentops.ActionEvent(action_type="test_action") - + # Verify that record function works with these events agentops.record(tool_event) agentops.record(action_event) - - + + def test_crewai_kwargs_pattern(instrumentation): """ Test the CrewAI < 0.105.0 pattern where end_session is called with only kwargs. - + In versions < 0.105.0, CrewAI directly calls: agentops.end_session( end_state="Success", - end_state_reason="Finished Execution", + end_state_reason="Finished Execution", is_auto_end=True ) """ import agentops from agentops.legacy import Session - + # Initialize with test API key agentops.init(api_key="test-api-key") - + # Create a session session = agentops.start_session(tags=["test", "crewai-kwargs"]) assert isinstance(session, Session) - + # Test the CrewAI < 0.105.0 pattern - calling end_session with only kwargs - agentops.end_session( - end_state="Success", - end_state_reason="Finished Execution", - is_auto_end=True - ) - + agentops.end_session(end_state="Success", end_state_reason="Finished Execution", is_auto_end=True) + # After calling end_session, creating a new session should work correctly # (this implicitly tests that the internal state is reset properly) new_session = agentops.start_session(tags=["test", "post-end"]) assert isinstance(new_session, Session) - - + + def test_crewai_kwargs_pattern_no_session(instrumentation): """ Test the CrewAI < 0.105.0 pattern where end_session is called with only kwargs, but no session has been created. - + This should log a warning but not fail. """ import agentops - + # Initialize with test API key agentops.init(api_key="test-api-key") - + # We don't need to explicitly clear the session state # Just make sure we start with a clean state by calling init - + # Test the CrewAI < 0.105.0 pattern - calling end_session with only kwargs # when no session exists. This should not raise an error. - agentops.end_session( - end_state="Success", - end_state_reason="Finished Execution", - is_auto_end=True - ) + agentops.end_session(end_state="Success", end_state_reason="Finished Execution", is_auto_end=True) def test_crewai_kwargs_force_flush(): """ Test that when using the CrewAI < 0.105.0 pattern (end_session with kwargs), the spans are properly exported to the backend with force_flush. - + This is a more comprehensive test that ensures spans are actually sent to the backend when using the CrewAI integration pattern. """ import agentops from agentops.sdk.core import TracingCore import time - + # Initialize AgentOps with API key agentops.init(api_key="test-api-key") - + # Create a session session = agentops.start_session(tags=["test", "crewai-integration"]) - + # Simulate some work time.sleep(0.1) - + # End session with kwargs (CrewAI < 0.105.0 pattern) - agentops.end_session( - end_state="Success", - end_state_reason="Test Finished", - is_auto_end=True - ) - + agentops.end_session(end_state="Success", end_state_reason="Test Finished", is_auto_end=True) + # Explicitly ensure the core isn't already shut down for the test - assert TracingCore.get_instance()._initialized, "TracingCore should still be initialized" \ No newline at end of file + assert TracingCore.get_instance()._initialized, "TracingCore should still be initialized"