diff --git a/agentops/__init__.py b/agentops/__init__.py index 3b252759a..52f56e0bf 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -208,20 +208,20 @@ def start_trace( return tracing_core.start_trace(trace_name=trace_name, tags=tags) -def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: +def end_trace(tracer: Optional[TraceContext] = None, end_state: str = "Success") -> None: """ Ends a trace (its root span) and finalizes it. - If no trace_context is provided, ends all active session spans. + If no tracer is provided, ends all active session spans. Args: - trace_context: The TraceContext object returned by start_trace. If None, ends all active traces. + tracer: The TraceContext object returned by start_trace. If None, ends all active traces. end_state: The final state of the trace (e.g., "Success", "Failure", "Error"). """ tracing_core = TracingCore.get_instance() if not tracing_core.initialized: logger.warning("AgentOps SDK not initialized. Cannot end trace.") return - tracing_core.end_trace(trace_context=trace_context, end_state=end_state) + tracing_core.end_trace(tracer=tracer, end_state=end_state) __all__ = [ diff --git a/agentops/client/client.py b/agentops/client/client.py index 2ceacd90e..9666e7a2c 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -11,7 +11,7 @@ from agentops.legacy import Session # Global variables to hold the client's auto-started trace and its legacy session wrapper -_client_init_trace_context: Optional[TraceContext] = None +_client_init_tracer: Optional[TraceContext] = None _client_legacy_session_for_init_trace: Optional[Session] = None # Single atexit handler registered flag @@ -20,18 +20,18 @@ def _end_init_trace_atexit(): """Global atexit handler to end the client's auto-initialized trace during shutdown.""" - global _client_init_trace_context, _client_legacy_session_for_init_trace - if _client_init_trace_context is not None: + global _client_init_tracer, _client_legacy_session_for_init_trace + if _client_init_tracer is not None: logger.debug("Auto-ending client's init trace during shutdown.") try: # Use TracingCore to end the trace directly tracing_core = TracingCore.get_instance() - if tracing_core.initialized and _client_init_trace_context.span.is_recording(): - tracing_core.end_trace(_client_init_trace_context, end_state="Shutdown") + if tracing_core.initialized and _client_init_tracer.span.is_recording(): + tracing_core.end_trace(_client_init_tracer, end_state="Shutdown") except Exception as e: logger.warning(f"Error ending client's init trace during shutdown: {e}") finally: - _client_init_trace_context = None + _client_init_tracer = None _client_legacy_session_for_init_trace = None # Clear its legacy wrapper too @@ -40,7 +40,7 @@ class Client: config: Config _initialized: bool - _init_trace_context: Optional[TraceContext] = None # Stores the context of the auto-started trace + _init_tracer: Optional[TraceContext] = None # Stores the context of the auto-started trace _legacy_session_for_init_trace: Optional[ Session ] = None # Stores the legacy Session wrapper for the auto-started trace @@ -53,7 +53,7 @@ def __new__(cls, *args: Any, **kwargs: Any) -> "Client": if cls.__instance is None: cls.__instance = super(Client, cls).__new__(cls) # Initialize instance variables that should only be set once per instance - cls.__instance._init_trace_context = None + cls.__instance._init_tracer = None cls.__instance._legacy_session_for_init_trace = None return cls.__instance @@ -66,7 +66,7 @@ def __init__(self): ): # Ensure init logic runs only once per actual initialization intent self.config = Config() # Initialize config here for the instance self._initialized = False - # self._init_trace_context = None # Already done in __new__ + # self._init_tracer = None # Already done in __new__ # self._legacy_session_for_init_trace = None # Already done in __new__ def init(self, **kwargs: Any) -> None: # Return type updated to None @@ -81,10 +81,10 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None logger.warning("AgentOps Client being re-initialized with a different API key. This is unusual.") # Reset initialization status to allow re-init with new key/config self._initialized = False - if self._init_trace_context and self._init_trace_context.span.is_recording(): + if self._init_tracer and self._init_tracer.span.is_recording(): logger.warning("Ending previously auto-started trace due to re-initialization.") - TracingCore.get_instance().end_trace(self._init_trace_context, "Reinitialized") - self._init_trace_context = None + TracingCore.get_instance().end_trace(self._init_tracer, "Reinitialized") + self._init_tracer = None self._legacy_session_for_init_trace = None if self.initialized: @@ -133,31 +133,31 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None # Auto-start trace if configured if self.config.auto_start_session: - if self._init_trace_context is None or not self._init_trace_context.span.is_recording(): + if self._init_tracer is None or not self._init_tracer.span.is_recording(): logger.debug("Auto-starting init trace.") trace_name = self.config.trace_name or "default" - self._init_trace_context = tracing_core.start_trace( + self._init_tracer = tracing_core.start_trace( trace_name=trace_name, tags=list(self.config.default_tags) if self.config.default_tags else None, is_init_trace=True, ) - if self._init_trace_context: - self._legacy_session_for_init_trace = Session(self._init_trace_context) + if self._init_tracer: + self._legacy_session_for_init_trace = Session(self._init_tracer) # For backward compatibility, also update the global references in legacy and client modules # These globals are what old code might have been using via agentops.legacy.get_session() or similar indirect access. - global _client_init_trace_context, _client_legacy_session_for_init_trace - _client_init_trace_context = self._init_trace_context + global _client_init_tracer, _client_legacy_session_for_init_trace + _client_init_tracer = self._init_tracer _client_legacy_session_for_init_trace = self._legacy_session_for_init_trace - # Update legacy module's _current_session and _current_trace_context + # Update legacy module's _current_session and _current_tracer # This is tricky; direct access to another module's globals is not ideal. # Prefer explicit calls if possible, but for maximum BC: try: import agentops.legacy agentops.legacy._current_session = self._legacy_session_for_init_trace - agentops.legacy._current_trace_context = self._init_trace_context + agentops.legacy._current_tracer = self._init_tracer except ImportError: pass # Should not happen @@ -196,7 +196,7 @@ def initialized(self, value: bool) -> None: # Remove the old __instance = None at the end of the class definition if it's a repeat # __instance = None # This was a class variable, should be defined once - # Make _init_trace_context and _legacy_session_for_init_trace accessible + # Make _init_tracer and _legacy_session_for_init_trace accessible # to the atexit handler if it becomes a static/class method or needs access # For now, the atexit handler is global and uses global vars copied from these. @@ -210,4 +210,4 @@ def initialized(self, value: bool) -> None: # For now, _client_legacy_session_for_init_trace is the primary global for the auto-init trace's legacy Session. # Remove the old global _active_session defined at the top of this file if it's no longer the primary mechanism. -# The new globals _client_init_trace_context and _client_legacy_session_for_init_trace handle the auto-init trace. +# The new globals _client_init_tracer and _client_legacy_session_for_init_trace handle the auto-init trace. diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index ff32beb8b..213644d98 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -15,7 +15,7 @@ from agentops.sdk.core import TracingCore, TraceContext _current_session: Optional["Session"] = None -_current_trace_context: Optional[TraceContext] = None +_current_tracer: Optional[TraceContext] = None class Session: @@ -28,22 +28,22 @@ class Session: - end_session(): Called when a CrewAI run completes """ - def __init__(self, trace_context: Optional[TraceContext]): - self.trace_context = trace_context + def __init__(self, tracer: Optional[TraceContext]): + self.tracer = tracer @property def span(self) -> Optional[Any]: - return self.trace_context.span if self.trace_context else None + return self.tracer.span if self.tracer else None @property def token(self) -> Optional[Any]: - return self.trace_context.token if self.trace_context else None + return self.tracer.token if self.tracer else None def __del__(self): - if self.trace_context and self.trace_context.span and self.trace_context.span.is_recording(): - if not self.trace_context.is_init_trace: + if self.tracer and self.tracer.span and self.tracer.span.is_recording(): + if not self.tracer.is_init_trace: logger.warning( - f"Legacy Session (trace ID: {self.trace_context.span.get_span_context().span_id}) \ + f"Legacy Session (trace ID: {self.tracer.span.get_span_context().span_id}) \ was garbage collected but its trace might still be recording. Ensure legacy sessions are ended with end_session()." ) @@ -67,7 +67,7 @@ def start_session( @deprecated Use agentops.start_trace() instead. Starts a legacy AgentOps session. Calls TracingCore.start_trace internally. """ - global _current_session, _current_trace_context + global _current_session, _current_tracer tracing_core = TracingCore.get_instance() if not tracing_core.initialized: @@ -79,33 +79,33 @@ def start_session( logger.warning("AgentOps client init failed during legacy start_session. Creating dummy session.") dummy_session = Session(None) _current_session = dummy_session - _current_trace_context = None + _current_tracer = None return dummy_session except Exception as e: logger.warning(f"AgentOps client init failed: {str(e)}. Creating dummy session.") dummy_session = Session(None) _current_session = dummy_session - _current_trace_context = None + _current_tracer = None return dummy_session - trace_context = tracing_core.start_trace(trace_name="session", tags=tags) - if trace_context is None: + tracer = tracing_core.start_trace(trace_name="session", tags=tags) + if tracer is None: logger.error("Failed to start trace via TracingCore. Returning dummy session.") dummy_session = Session(None) _current_session = dummy_session - _current_trace_context = None + _current_tracer = None return dummy_session - session_obj = Session(trace_context) + session_obj = Session(tracer) _current_session = session_obj - _current_trace_context = trace_context + _current_tracer = tracer try: import agentops.client.client agentops.client.client._active_session = session_obj # type: ignore - if hasattr(agentops.client.client, "_active_trace_context"): - agentops.client.client._active_trace_context = trace_context # type: ignore + if hasattr(agentops.client.client, "_active_tracer"): + agentops.client.client._active_tracer = tracer # type: ignore except (ImportError, AttributeError): pass return session_obj @@ -128,61 +128,55 @@ def end_session(session_or_status: Any = None, **kwargs: Any) -> None: Ends a legacy AgentOps session. Calls TracingCore.end_trace internally. Supports multiple calling patterns for backward compatibility. """ - global _current_session, _current_trace_context + global _current_session, _current_tracer tracing_core = TracingCore.get_instance() if not tracing_core.initialized: logger.debug("Ignoring end_session: TracingCore not initialized.") return - target_trace_context: Optional[TraceContext] = None + target_tracer: Optional[TraceContext] = None end_state_from_args = "Success" extra_attributes = kwargs.copy() if isinstance(session_or_status, Session): - target_trace_context = session_or_status.trace_context + target_tracer = session_or_status.tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) elif isinstance(session_or_status, str): end_state_from_args = session_or_status - target_trace_context = _current_trace_context + target_tracer = _current_tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) elif session_or_status is None and kwargs: - target_trace_context = _current_trace_context + target_tracer = _current_tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) else: - target_trace_context = _current_trace_context + target_tracer = _current_tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) - if not target_trace_context: + if not target_tracer: logger.warning("end_session called but no active trace context found.") return - if target_trace_context.span and extra_attributes: - _set_span_attributes(target_trace_context.span, extra_attributes) + if target_tracer.span and extra_attributes: + _set_span_attributes(target_tracer.span, extra_attributes) - tracing_core.end_trace(target_trace_context, end_state=end_state_from_args) + tracing_core.end_trace(target_tracer, end_state=end_state_from_args) - if target_trace_context is _current_trace_context: + if target_tracer is _current_tracer: _current_session = None - _current_trace_context = None + _current_tracer = None try: import agentops.client.client - if ( - hasattr(agentops.client.client, "_active_trace_context") - and agentops.client.client._active_trace_context is target_trace_context - ): # type: ignore - agentops.client.client._active_trace_context = None # type: ignore + if hasattr(agentops.client.client, "_active_tracer") and agentops.client.client._active_tracer is target_tracer: # type: ignore + agentops.client.client._active_tracer = None # type: ignore agentops.client.client._active_session = None # type: ignore - elif ( - hasattr(agentops.client.client, "_init_trace_context") - and agentops.client.client._init_trace_context is target_trace_context - ): # type: ignore + elif hasattr(agentops.client.client, "_init_tracer") and agentops.client.client._init_tracer is target_tracer: # type: ignore logger.debug("Legacy end_session called on client's auto-init trace. This is unusual.") except (ImportError, AttributeError): pass @@ -198,12 +192,12 @@ def end_all_sessions() -> None: return # Use the new end_trace functionality to end all active traces - tracing_core.end_trace(trace_context=None, end_state="Success") + tracing_core.end_trace(tracer=None, end_state="Success") # Clear legacy global state - global _current_session, _current_trace_context + global _current_session, _current_tracer _current_session = None - _current_trace_context = None + _current_tracer = None def ToolEvent(*args: Any, **kwargs: Any) -> None: diff --git a/agentops/sdk/converters.py b/agentops/sdk/converters.py index fee21e257..8febe470a 100644 --- a/agentops/sdk/converters.py +++ b/agentops/sdk/converters.py @@ -124,3 +124,20 @@ def camel_to_snake(text: str) -> str: text = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", text).lower() + + +def format_trace_id(trace_id: int) -> str: + """ + Format trace ID consistently as hex string with error handling. + + Args: + trace_id: The trace ID integer to format + + Returns: + Formatted trace ID as hex string + """ + try: + return f"{trace_id:x}" + except (TypeError, ValueError): + # Handle case where trace_id is not a valid integer + return str(trace_id) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index d36c55228..a01142cc5 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -24,6 +24,7 @@ from agentops.sdk.types import TracingConfig from agentops.semconv import ResourceAttributes, SpanKind, SpanAttributes, CoreAttributes from agentops.helpers.dashboard import log_trace_url +from agentops.sdk.converters import format_trace_id # No need to create shortcuts since we're using our own ResourceAttributes class now @@ -330,16 +331,26 @@ def shutdown(self) -> None: logger.debug("Tracing core shut down") def _flush_span_processors(self) -> None: - """Helper to force flush all span processors.""" - if not self._provider or not hasattr(self._provider, "force_flush"): - logger.debug("No provider or provider cannot force_flush.") + """Helper to force flush all span processors with comprehensive error handling.""" + if not self._provider: + logger.debug("No provider available for force_flush.") + return + + if not hasattr(self._provider, "force_flush"): + logger.debug("Provider does not support force_flush.") return try: + logger.debug("Attempting to force flush span processors...") self._provider.force_flush() # type: ignore - logger.debug("Provider force_flush completed.") + logger.debug("Provider force_flush completed successfully.") + except AttributeError as e: + logger.warning(f"Provider force_flush method not available: {e}") + except RuntimeError as e: + logger.warning(f"Runtime error during force_flush (provider may be shutting down): {e}") except Exception as e: - logger.warning(f"Failed to force flush provider's span processors: {e}", exc_info=True) + logger.error(f"Unexpected error during force_flush: {e}", exc_info=True) + # Continue execution - don't let flush failures break the application def get_tracer(self, name: str = "agentops") -> trace.Tracer: """ @@ -440,35 +451,31 @@ def start_trace( except Exception as e: logger.warning(f"Failed to log trace URL for '{trace_name}': {e}") - trace_context = TraceContext(span, token=context_token, is_init_trace=is_init_trace) + tracer = TraceContext(span, token=context_token, is_init_trace=is_init_trace) # Track the active trace with self._traces_lock: - try: - trace_id = f"{span.get_span_context().trace_id:x}" - except (TypeError, ValueError): - # Handle case where span is mocked or trace_id is not a valid integer - trace_id = str(span.get_span_context().trace_id) - self._active_traces[trace_id] = trace_context + trace_id = format_trace_id(span.get_span_context().trace_id) + self._active_traces[trace_id] = tracer logger.debug(f"Added trace {trace_id} to active traces. Total active: {len(self._active_traces)}") - return trace_context + return tracer - def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: + def end_trace(self, tracer: Optional[TraceContext] = None, end_state: str = "Success") -> None: """ Ends a trace (its root span) and finalizes it. - If no trace_context is provided, ends all active session spans. + If no tracer is provided, ends all active session spans. Args: - trace_context: The TraceContext object returned by start_trace. If None, ends all active traces. + tracer: The TraceContext object returned by start_trace. If None, ends all active traces. end_state: The final state of the trace (e.g., "Success", "Failure", "Error"). """ if not self.initialized: logger.warning("TracingCore not initialized. Cannot end trace.") return - # If no specific trace_context provided, end all active traces - if trace_context is None: + # If no specific tracer provided, end all active traces + if tracer is None: with self._traces_lock: active_traces = list(self._active_traces.values()) logger.debug(f"Ending all {len(active_traces)} active traces with state: {end_state}") @@ -478,29 +485,25 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str return # End specific trace - self._end_single_trace(trace_context, end_state) + self._end_single_trace(tracer, end_state) - def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None: + def _end_single_trace(self, tracer: TraceContext, end_state: str) -> None: """ Internal method to end a single trace. Args: - trace_context: The TraceContext object to end. + tracer: The TraceContext object to end. end_state: The final state of the trace. """ from agentops.sdk.decorators.utility import _finalize_span # Local import - if not trace_context or not trace_context.span: + if not tracer or not tracer.span: logger.warning("Invalid TraceContext or span provided to end trace.") return - span = trace_context.span - token = trace_context.token - try: - trace_id = f"{span.get_span_context().trace_id:x}" - except (TypeError, ValueError): - # Handle case where span is mocked or trace_id is not a valid integer - trace_id = str(span.get_span_context().trace_id) + span = tracer.span + token = tracer.token + trace_id = format_trace_id(span.get_span_context().trace_id) logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}") diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index 043679fa8..3296231b7 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -16,16 +16,15 @@ operation_decorator = create_entity_decorator(SpanKind.OPERATION) workflow = create_entity_decorator(SpanKind.WORKFLOW) trace = create_entity_decorator(SpanKind.SESSION) -session = create_entity_decorator(SpanKind.SESSION) tool = create_entity_decorator(SpanKind.TOOL) -operation = task -# For backward compatibility: @session decorator calls @trace decorator +# For backward compatibility: @session decorator calls @trace decorator with deprecation warning @functools.wraps(trace) def session(*args, **kwargs): # noqa: F811 """@deprecated Use @agentops.trace instead. Wraps the @trace decorator for backward compatibility.""" - logger.info(colored("@agentops.session decorator is deprecated. Please use @agentops.trace instead.", "yellow")) + logger.warning(colored("@agentops.session decorator is deprecated. Please use @agentops.trace instead.", "yellow")) + # If called as @session or @session(...) if not args or not callable(args[0]): # called with kwargs like @session(name=...) return trace(*args, **kwargs) diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index cbf0e7026..aab690e79 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -21,6 +21,114 @@ ) +def _handle_session_trace_sync( + operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any] +) -> Any: + """Helper function to handle SESSION trace lifecycle for sync functions with proper cleanup""" + tracer: Optional[TraceContext] = None + trace_ended = False + + try: + # Start trace + tracer = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) + if not tracer: + logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.") + return wrapped_func(*args, **kwargs) + + # Record input + try: + _record_entity_input(tracer.span, args, kwargs) + except Exception as e: + logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") + + # Execute function + result = wrapped_func(*args, **kwargs) + + # Record output + try: + _record_entity_output(tracer.span, result) + except Exception as e: + logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") + + # End trace successfully + TracingCore.get_instance().end_trace(tracer, "Success") + trace_ended = True + return result + + except Exception: + # End trace with failure if not already ended + if tracer and not trace_ended: + try: + TracingCore.get_instance().end_trace(tracer, "Failure") + trace_ended = True + except Exception as cleanup_error: + logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}") + raise + + finally: + # Safety net - only end if not already ended and still recording + if tracer and not trace_ended and tracer.span.is_recording(): + try: + TracingCore.get_instance().end_trace(tracer, "Unknown") + logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.") + except Exception as cleanup_error: + logger.error(f"Failed to end trace in finally block: {cleanup_error}") + + +async def _handle_session_trace_async( + operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any] +) -> Any: + """Helper function to handle SESSION trace lifecycle for async functions with proper cleanup""" + tracer: Optional[TraceContext] = None + trace_ended = False + + try: + # Start trace + tracer = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) + if not tracer: + logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.") + return await wrapped_func(*args, **kwargs) + + # Record input + try: + _record_entity_input(tracer.span, args, kwargs) + except Exception as e: + logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") + + # Execute function + result = await wrapped_func(*args, **kwargs) + + # Record output + try: + _record_entity_output(tracer.span, result) + except Exception as e: + logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") + + # End trace successfully + TracingCore.get_instance().end_trace(tracer, "Success") + trace_ended = True + return result + + except Exception: + # End trace with failure if not already ended + if tracer and not trace_ended: + try: + TracingCore.get_instance().end_trace(tracer, "Failure") + trace_ended = True + except Exception as cleanup_error: + logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}") + raise + + finally: + # Safety net - only end if not already ended and still recording + if tracer and not trace_ended and tracer.span.is_recording(): + try: + TracingCore.get_instance().end_trace(tracer, "Unknown") + logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.") + except Exception as cleanup_error: + logger.error(f"Failed to end trace in finally block: {cleanup_error}") + + def create_entity_decorator(entity_kind: str) -> Callable[..., Any]: """ Factory that creates decorators for instrumenting functions and classes. @@ -96,69 +204,9 @@ def wrapper( ) # Fallthrough to existing generator logic which creates a single span. elif is_async: - - async def _wrapped_session_async() -> Any: - trace_context: Optional[TraceContext] = None - try: - trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) - if not trace_context: - logger.error( - f"Failed to start trace for @trace '{operation_name}'. Executing without trace." - ) - return await wrapped_func(*args, **kwargs) - try: - _record_entity_input(trace_context.span, args, kwargs) - except Exception as e: - logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") - result = await wrapped_func(*args, **kwargs) - try: - _record_entity_output(trace_context.span, result) - except Exception as e: - logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") - TracingCore.get_instance().end_trace(trace_context, "Success") - return result - except Exception: - if trace_context: - TracingCore.get_instance().end_trace(trace_context, "Failure") - raise - finally: - if trace_context and trace_context.span.is_recording(): - logger.warning( - f"Trace for @trace '{operation_name}' not explicitly ended. Ending as 'Unknown'." - ) - TracingCore.get_instance().end_trace(trace_context, "Unknown") - - return _wrapped_session_async() + return _handle_session_trace_async(operation_name, tags, wrapped_func, args, kwargs) else: # Sync function for SpanKind.SESSION - trace_context: Optional[TraceContext] = None - try: - trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) - if not trace_context: - logger.error( - f"Failed to start trace for @trace '{operation_name}'. Executing without trace." - ) - return wrapped_func(*args, **kwargs) - try: - _record_entity_input(trace_context.span, args, kwargs) - except Exception as e: - logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") - result = wrapped_func(*args, **kwargs) - try: - _record_entity_output(trace_context.span, result) - except Exception as e: - logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") - TracingCore.get_instance().end_trace(trace_context, "Success") - return result - except Exception: - if trace_context: - TracingCore.get_instance().end_trace(trace_context, "Failure") - raise - finally: - if trace_context and trace_context.span.is_recording(): - logger.warning( - f"Trace for @trace '{operation_name}' not explicitly ended. Ending as 'Unknown'." - ) - TracingCore.get_instance().end_trace(trace_context, "Unknown") + return _handle_session_trace_sync(operation_name, tags, wrapped_func, args, kwargs) # Logic for non-SESSION kinds or generators under @trace (as per fallthrough) elif is_generator: diff --git a/agentops/sdk/processors.py b/agentops/sdk/processors.py index 2f65a1637..87a0d2d93 100644 --- a/agentops/sdk/processors.py +++ b/agentops/sdk/processors.py @@ -4,78 +4,15 @@ This module contains processors for OpenTelemetry spans. """ -import time -from threading import Event, Lock, Thread -from typing import Dict, Optional +from typing import Optional from opentelemetry.context import Context from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor -from opentelemetry.sdk.trace.export import SpanExporter from agentops.logging import logger -from agentops.semconv.core import CoreAttributes from agentops.logging import upload_logfile -class LiveSpanProcessor(SpanProcessor): - def __init__(self, span_exporter: SpanExporter, **kwargs): - self.span_exporter = span_exporter - self._in_flight: Dict[int, Span] = {} - self._lock = Lock() - self._stop_event = Event() - self._export_thread = Thread(target=self._export_periodically, daemon=True) - self._export_thread.start() - - def _export_periodically(self) -> None: - while not self._stop_event.is_set(): - time.sleep(1) - with self._lock: - to_export = [self._readable_span(span) for span in self._in_flight.values()] - if to_export: - self.span_exporter.export(to_export) - - def _readable_span(self, span: Span) -> ReadableSpan: - readable = span._readable_span() - readable._end_time = time.time_ns() - readable._attributes = { - **(readable._attributes or {}), - CoreAttributes.IN_FLIGHT: True, - } - return readable - - def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: - if not span.context or not span.context.trace_flags.sampled: - return - with self._lock: - self._in_flight[span.context.span_id] = span - - def on_end(self, span: ReadableSpan) -> None: - if not span.context or not span.context.trace_flags.sampled: - return - with self._lock: - del self._in_flight[span.context.span_id] - self.span_exporter.export((span,)) - - def shutdown(self) -> None: - self._stop_event.set() - self._export_thread.join() - self.span_exporter.shutdown() - - def force_flush(self, timeout_millis: int = 30000) -> bool: - return True - - def export_in_flight_spans(self) -> None: - """Export all in-flight spans without ending them. - - This method is primarily used for testing to ensure all spans - are exported before assertions are made. - """ - with self._lock: - to_export = [self._readable_span(span) for span in self._in_flight.values()] - if to_export: - self.span_exporter.export(to_export) - - class InternalSpanProcessor(SpanProcessor): """ A span processor that prints information about spans. diff --git a/agentops/sdk/types.py b/agentops/sdk/types.py index 0d4e37bcb..a792339f8 100644 --- a/agentops/sdk/types.py +++ b/agentops/sdk/types.py @@ -1,5 +1,10 @@ from typing import Annotated, Optional, TypedDict +try: + from typing import Required +except ImportError: + from typing_extensions import Required + from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter @@ -16,6 +21,6 @@ class TracingConfig(TypedDict, total=False): metrics_endpoint: Optional[str] api_key: Optional[str] # API key for authentication with AgentOps services project_id: Optional[str] # Project ID to include in resource attributes - max_queue_size: int # Required with a default value - max_wait_time: int # Required with a default value - export_flush_interval: int # Time interval between automatic exports + max_queue_size: Required[int] # Required with a default value + max_wait_time: Required[int] # Required with a default value + export_flush_interval: Required[int] # Time interval between automatic exports diff --git a/pyproject.toml b/pyproject.toml index b0833c9b3..192a05ef1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "opentelemetry-instrumentation>=0.50b0; python_version>='3.10'", "opentelemetry-semantic-conventions==0.50b0; python_version<'3.10'", "opentelemetry-semantic-conventions>=0.50b0; python_version>='3.10'", + "typing_extensions>=4.0.0; python_version<'3.11'", # Required for Required and NotRequired in Python <3.11 ] [dependency-groups] diff --git a/tests/unit/sdk/test_core_error_handling.py b/tests/unit/sdk/test_core_error_handling.py new file mode 100644 index 000000000..9488c5a69 --- /dev/null +++ b/tests/unit/sdk/test_core_error_handling.py @@ -0,0 +1,353 @@ +""" +Tests for TracingCore error handling improvements. + +This module tests the enhanced error handling in TracingCore._flush_span_processors() +that was added to provide comprehensive exception handling for different failure scenarios. +""" + +from unittest.mock import MagicMock, patch + +from agentops.sdk.core import TracingCore + + +class TestTracingCoreFlushErrorHandling: + """Tests for the enhanced _flush_span_processors error handling.""" + + def setup_method(self): + """Set up test fixtures.""" + self.tracing_core = TracingCore.get_instance() + # Reset state for each test + self.tracing_core._initialized = False + self.tracing_core._provider = None + + @patch("agentops.sdk.core.logger") + def test_flush_no_provider(self, mock_logger): + """Test _flush_span_processors when no provider is available.""" + # Ensure no provider is set + self.tracing_core._provider = None + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify debug message was logged + mock_logger.debug.assert_called_once_with("No provider available for force_flush.") + + @patch("agentops.sdk.core.logger") + def test_flush_provider_without_force_flush_method(self, mock_logger): + """Test _flush_span_processors when provider doesn't support force_flush.""" + # Create a mock provider without force_flush method + mock_provider = MagicMock() + del mock_provider.force_flush # Remove the method + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify debug message was logged + mock_logger.debug.assert_called_once_with("Provider does not support force_flush.") + + @patch("agentops.sdk.core.logger") + def test_flush_successful(self, mock_logger): + """Test _flush_span_processors with successful flush.""" + # Create a mock provider with force_flush method + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify force_flush was called + mock_provider.force_flush.assert_called_once() + + # Verify success messages were logged + expected_calls = [ + ("debug", "Attempting to force flush span processors..."), + ("debug", "Provider force_flush completed successfully."), + ] + + actual_calls = [(call[0], call[1][0]) for call in mock_logger.method_calls] + for level, message in expected_calls: + assert (level, message) in actual_calls + + @patch("agentops.sdk.core.logger") + def test_flush_attribute_error(self, mock_logger): + """Test _flush_span_processors when AttributeError is raised.""" + # Create a mock provider where force_flush raises AttributeError + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = AttributeError("force_flush method not available") + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted + mock_provider.force_flush.assert_called_once() + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Provider force_flush method not available: force_flush method not available" + ) + + @patch("agentops.sdk.core.logger") + def test_flush_runtime_error(self, mock_logger): + """Test _flush_span_processors when RuntimeError is raised.""" + # Create a mock provider where force_flush raises RuntimeError + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = RuntimeError("Provider is shutting down") + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted + mock_provider.force_flush.assert_called_once() + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Runtime error during force_flush (provider may be shutting down): Provider is shutting down" + ) + + @patch("agentops.sdk.core.logger") + def test_flush_unexpected_exception(self, mock_logger): + """Test _flush_span_processors when unexpected exception is raised.""" + # Create a mock provider where force_flush raises unexpected exception + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = ValueError("Unexpected error") + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted + mock_provider.force_flush.assert_called_once() + + # Verify error was logged with exc_info + mock_logger.error.assert_called_once_with( + "Unexpected error during force_flush: Unexpected error", exc_info=True + ) + + @patch("agentops.sdk.core.logger") + def test_flush_multiple_exception_types(self, mock_logger): + """Test that different exception types are handled with appropriate log levels.""" + test_cases = [ + (AttributeError("attr error"), "warning", "Provider force_flush method not available: attr error"), + ( + RuntimeError("runtime error"), + "warning", + "Runtime error during force_flush (provider may be shutting down): runtime error", + ), + (ValueError("value error"), "error", "Unexpected error during force_flush: value error"), + (TypeError("type error"), "error", "Unexpected error during force_flush: type error"), + (Exception("generic error"), "error", "Unexpected error during force_flush: generic error"), + ] + + for exception, expected_level, expected_message in test_cases: + # Reset mock + mock_logger.reset_mock() + + # Create a mock provider that raises the specific exception + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = exception + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify the appropriate log level was used + if expected_level == "warning": + mock_logger.warning.assert_called_once_with(expected_message) + mock_logger.error.assert_not_called() + elif expected_level == "error": + mock_logger.error.assert_called_once_with(expected_message, exc_info=True) + mock_logger.warning.assert_not_called() + + @patch("agentops.sdk.core.logger") + def test_flush_graceful_degradation(self, mock_logger): + """Test that flush failures don't break the application.""" + # Create a mock provider that always fails + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = Exception("Always fails") + self.tracing_core._provider = mock_provider + + # Call flush multiple times - should never raise exception + for _ in range(3): + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted each time + assert mock_provider.force_flush.call_count == 3 + + # Verify errors were logged each time + assert mock_logger.error.call_count == 3 + + @patch("agentops.sdk.core.logger") + def test_flush_hasattr_check_behavior(self, mock_logger): + """Test the hasattr check behavior for force_flush method.""" + # Test case 1: Provider without force_flush attribute + mock_provider = MagicMock() + del mock_provider.force_flush # Remove the attribute completely + self.tracing_core._provider = mock_provider + + # This should trigger the hasattr check and log debug message + self.tracing_core._flush_span_processors() + + # When provider doesn't have force_flush attribute, it should only log the "does not support" message + mock_logger.debug.assert_called_once_with("Provider does not support force_flush.") + + # Reset for next test + mock_logger.reset_mock() + + # Test case 2: Provider has callable force_flush + mock_provider = MagicMock() + mock_provider.force_flush = MagicMock(return_value=True) + self.tracing_core._provider = mock_provider + + self.tracing_core._flush_span_processors() + + # Should attempt to call force_flush + mock_provider.force_flush.assert_called_once() + + @patch("agentops.sdk.core.logger") + def test_flush_logging_sequence(self, mock_logger): + """Test the complete logging sequence during successful flush.""" + # Create a mock provider with force_flush method + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify the complete logging sequence + expected_debug_calls = [ + "Attempting to force flush span processors...", + "Provider force_flush completed successfully.", + ] + + debug_calls = [call[0][0] for call in mock_logger.debug.call_args_list] + assert debug_calls == expected_debug_calls + + @patch("agentops.sdk.core.logger") + def test_flush_provider_none_vs_no_method(self, mock_logger): + """Test distinction between no provider and provider without method.""" + # Test 1: No provider + self.tracing_core._provider = None + self.tracing_core._flush_span_processors() + + mock_logger.debug.assert_called_with("No provider available for force_flush.") + mock_logger.reset_mock() + + # Test 2: Provider without force_flush method + mock_provider = MagicMock() + del mock_provider.force_flush + self.tracing_core._provider = mock_provider + + self.tracing_core._flush_span_processors() + + mock_logger.debug.assert_called_with("Provider does not support force_flush.") + + +class TestTracingCoreFlushIntegration: + """Integration tests for TracingCore flush functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + self.tracing_core = TracingCore.get_instance() + # Reset state for each test + self.tracing_core._initialized = False + self.tracing_core._provider = None + + @patch("agentops.sdk.core.logger") + def test_flush_called_during_shutdown(self, mock_logger): + """Test that _flush_span_processors is called during shutdown.""" + # Create a mock provider + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + self.tracing_core._initialized = True + + # Mock the shutdown method to track flush calls + with patch.object(self.tracing_core, "_flush_span_processors") as mock_flush: + # Call shutdown + self.tracing_core.shutdown() + + # Verify flush was called + mock_flush.assert_called_once() + + @patch("agentops.sdk.core.logger") + def test_flush_resilience_during_shutdown(self, mock_logger): + """Test that shutdown continues even if flush fails.""" + # Create a mock provider that fails on flush + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = Exception("Flush failed") + mock_provider.shutdown.return_value = None + self.tracing_core._provider = mock_provider + self.tracing_core._initialized = True + + # Shutdown should complete successfully despite flush failure + self.tracing_core.shutdown() + + # Verify provider shutdown was still called + mock_provider.shutdown.assert_called_once() + + # Verify error was logged + mock_logger.error.assert_called_once_with("Unexpected error during force_flush: Flush failed", exc_info=True) + + @patch("agentops.sdk.core.logger") + def test_flush_with_real_provider_interface(self, mock_logger): + """Test flush with a more realistic provider mock.""" + # Create a mock provider that mimics real TracerProvider interface + mock_provider = MagicMock() + mock_provider.force_flush = MagicMock(return_value=True) + + # Add other typical provider methods + mock_provider.get_tracer = MagicMock() + mock_provider.shutdown = MagicMock() + + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify only force_flush was called, not other methods + mock_provider.force_flush.assert_called_once() + mock_provider.get_tracer.assert_not_called() + mock_provider.shutdown.assert_not_called() + + +class TestTracingCoreFlushBackwardCompatibility: + """Tests to ensure flush error handling doesn't break existing functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + self.tracing_core = TracingCore.get_instance() + # Reset state for each test + self.tracing_core._initialized = False + self.tracing_core._provider = None + + def test_flush_preserves_existing_behavior(self): + """Test that flush behavior is preserved for existing code.""" + # Create a mock provider + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + + # Call flush - should not raise any exceptions + self.tracing_core._flush_span_processors() + + # Verify force_flush was called + mock_provider.force_flush.assert_called_once() + + def test_flush_method_signature_unchanged(self): + """Test that _flush_span_processors method signature is unchanged.""" + # This test ensures the method can still be called without parameters + import inspect + + signature = inspect.signature(self.tracing_core._flush_span_processors) + + # Should have no required parameters + assert len(signature.parameters) == 0 + + # Should be callable without arguments + self.tracing_core._provider = None + self.tracing_core._flush_span_processors() # Should not raise diff --git a/tests/unit/sdk/test_internal_span_processor.py b/tests/unit/sdk/test_internal_span_processor.py index dc397d3ad..d163f84eb 100644 --- a/tests/unit/sdk/test_internal_span_processor.py +++ b/tests/unit/sdk/test_internal_span_processor.py @@ -32,12 +32,12 @@ def test_start_trace_logs_url(self, mock_make_span, mock_log_trace_url): mock_make_span.return_value = (mock_span, mock_context, mock_token) # Call start_trace - trace_context = self.tracing_core.start_trace(trace_name="test_trace") + tracer = self.tracing_core.start_trace(trace_name="test_trace") # Assert that log_trace_url was called with the span and title mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") - self.assertIsInstance(trace_context, TraceContext) - self.assertEqual(trace_context.span, mock_span) + self.assertIsInstance(tracer, TraceContext) + self.assertEqual(tracer.span, mock_span) @patch("agentops.sdk.core.log_trace_url") @patch("agentops.sdk.decorators.utility._finalize_span") @@ -48,10 +48,10 @@ def test_end_trace_logs_url(self, mock_finalize_span, mock_log_trace_url): mock_span.name = "test_trace" mock_span.get_span_context.return_value.span_id = 12345 mock_token = MagicMock() - trace_context = TraceContext(mock_span, mock_token) + tracer = TraceContext(mock_span, mock_token) # Call end_trace - self.tracing_core.end_trace(trace_context, "Success") + self.tracing_core.end_trace(tracer, "Success") # Assert that log_trace_url was called with the span and title mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @@ -71,11 +71,11 @@ def test_start_trace_url_logging_failure_does_not_break_trace(self, mock_make_sp mock_log_trace_url.side_effect = Exception("URL logging failed") # Call start_trace - should not raise exception - trace_context = self.tracing_core.start_trace(trace_name="test_trace") + tracer = self.tracing_core.start_trace(trace_name="test_trace") # Assert that trace was still created successfully - self.assertIsInstance(trace_context, TraceContext) - self.assertEqual(trace_context.span, mock_span) + self.assertIsInstance(tracer, TraceContext) + self.assertEqual(tracer.span, mock_span) mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @patch("agentops.sdk.core.log_trace_url") @@ -87,13 +87,13 @@ def test_end_trace_url_logging_failure_does_not_break_trace(self, mock_finalize_ mock_span.name = "test_trace" mock_span.get_span_context.return_value.span_id = 12345 mock_token = MagicMock() - trace_context = TraceContext(mock_span, mock_token) + tracer = TraceContext(mock_span, mock_token) # Make log_trace_url raise an exception mock_log_trace_url.side_effect = Exception("URL logging failed") # Call end_trace - should not raise exception - self.tracing_core.end_trace(trace_context, "Success") + self.tracing_core.end_trace(tracer, "Success") # Assert that finalize_span was still called mock_finalize_span.assert_called_once() @@ -111,11 +111,11 @@ def test_start_trace_with_tags_logs_url(self, mock_make_span, mock_log_trace_url mock_make_span.return_value = (mock_span, mock_context, mock_token) # Call start_trace with tags - trace_context = self.tracing_core.start_trace(trace_name="tagged_trace", tags=["test", "integration"]) + tracer = self.tracing_core.start_trace(trace_name="tagged_trace", tags=["test", "integration"]) # Assert that log_trace_url was called with the span and title mock_log_trace_url.assert_called_once_with(mock_span, title="tagged_trace") - self.assertIsInstance(trace_context, TraceContext) + self.assertIsInstance(tracer, TraceContext) class TestSessionDecoratorURLLogging(unittest.TestCase): diff --git a/tests/unit/test_init_api_consistency.py b/tests/unit/test_init_api_consistency.py new file mode 100644 index 000000000..52122f0f0 --- /dev/null +++ b/tests/unit/test_init_api_consistency.py @@ -0,0 +1,388 @@ +""" +Tests for API consistency changes in agentops.init(). + +This module tests the changes to the init() function return value behavior +that were reverted in commit bf850af to maintain backward compatibility. +""" + +import pytest +from unittest.mock import MagicMock, patch + +import agentops +from agentops.legacy import Session + + +class TestInitReturnValueConsistency: + """Tests for agentops.init() return value consistency.""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_init_returns_client_init_result(self, mock_client_init): + """Test that init() returns the result of _client.init().""" + # Setup mock to return a specific value + expected_result = MagicMock() + mock_client_init.return_value = expected_result + + # Call init + result = agentops.init(api_key="test-key") + + # Verify the result is what client.init() returned + assert result == expected_result + + # Verify client.init was called with correct parameters + mock_client_init.assert_called_once() + + @patch("agentops.client.Client.init") + def test_init_auto_start_session_true_returns_session(self, mock_client_init): + """Test that init() with auto_start_session=True returns a Session object.""" + # Setup mock to return a Session (typical behavior when auto_start_session=True) + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + # Call init with auto_start_session=True + result = agentops.init(api_key="test-key", auto_start_session=True) + + # Verify the result is the Session object + assert result == mock_session + + # Verify client.init was called with auto_start_session=True + mock_client_init.assert_called_once() + call_kwargs = mock_client_init.call_args[1] + assert call_kwargs["auto_start_session"] is True + + @patch("agentops.client.Client.init") + def test_init_auto_start_session_false_returns_none(self, mock_client_init): + """Test that init() with auto_start_session=False returns None.""" + # Setup mock to return None (typical behavior when auto_start_session=False) + mock_client_init.return_value = None + + # Call init with auto_start_session=False + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Verify the result is None + assert result is None + + # Verify client.init was called with auto_start_session=False + mock_client_init.assert_called_once() + call_kwargs = mock_client_init.call_args[1] + assert call_kwargs["auto_start_session"] is False + + @patch("agentops.client.Client.init") + def test_init_default_auto_start_session_behavior(self, mock_client_init): + """Test that init() with default auto_start_session returns appropriate value.""" + # Setup mock to return a Session (default behavior typically starts a session) + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + # Call init without specifying auto_start_session + result = agentops.init(api_key="test-key") + + # Verify the result is the Session object + assert result == mock_session + + # Verify client.init was called + mock_client_init.assert_called_once() + + @patch("agentops.client.Client.init") + def test_init_passes_all_parameters(self, mock_client_init): + """Test that init() passes all parameters to client.init().""" + # Setup mock + mock_client_init.return_value = None + + # Call init with various parameters + result = agentops.init( # noqa: F841 + api_key="test-key", + endpoint="https://test.endpoint.com", + app_url="https://test.app.com", + max_wait_time=5000, + max_queue_size=512, + default_tags=["test", "integration"], + instrument_llm_calls=True, + auto_start_session=False, + skip_auto_end_session=True, + env_data_opt_out=True, + custom_param="custom_value", + ) + + # Verify client.init was called with all parameters + mock_client_init.assert_called_once() + call_args, call_kwargs = mock_client_init.call_args + + # Check that all expected parameters were passed + expected_params = { + "api_key": "test-key", + "endpoint": "https://test.endpoint.com", + "app_url": "https://test.app.com", + "max_wait_time": 5000, + "max_queue_size": 512, + "default_tags": ["test", "integration"], + "instrument_llm_calls": True, + "auto_start_session": False, + "skip_auto_end_session": True, + "env_data_opt_out": True, + "custom_param": "custom_value", + } + + for param, value in expected_params.items(): + assert call_kwargs[param] == value + + @patch("agentops.client.Client.init") + def test_init_exception_propagation(self, mock_client_init): + """Test that exceptions from client.init() are properly propagated.""" + # Setup mock to raise an exception + test_exception = ValueError("Test initialization error") + mock_client_init.side_effect = test_exception + + # Call init and expect the exception to be raised + with pytest.raises(ValueError, match="Test initialization error"): + agentops.init(api_key="test-key") + + # Verify client.init was called + mock_client_init.assert_called_once() + + +class TestInitBackwardCompatibility: + """Tests for backward compatibility of agentops.init().""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_init_return_value_used_in_conditional(self, mock_client_init): + """Test that init() return value can be used in conditional statements.""" + # Test case 1: init returns a Session (truthy) + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + result = agentops.init(api_key="test-key", auto_start_session=True) + + # Should be able to use in conditional + if result: + session_available = True + else: + session_available = False + + assert session_available is True + assert result == mock_session + + # Test case 2: init returns None (falsy) + mock_client_init.return_value = None + + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Should be able to use in conditional + if result: + session_available = True + else: + session_available = False + + assert session_available is False + assert result is None + + @patch("agentops.client.Client.init") + def test_init_return_value_assignment_patterns(self, mock_client_init): + """Test common assignment patterns with init() return value.""" + # Pattern 1: Direct assignment + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + session = agentops.init(api_key="test-key") + assert session == mock_session + + # Pattern 2: Assignment with default + mock_client_init.return_value = None + + session = agentops.init(api_key="test-key") or "no_session" + assert session == "no_session" + + # Pattern 3: Conditional assignment + mock_client_init.return_value = mock_session + + result = agentops.init(api_key="test-key") + session = result if result else None + assert session == mock_session + + @patch("agentops.client.Client.init") + def test_init_chaining_with_session_methods(self, mock_client_init): + """Test that init() return value can be used for method chaining when appropriate.""" + # Create a mock session with methods + mock_session = MagicMock(spec=Session) + # Add the method to the mock before using it + mock_session.some_method = MagicMock(return_value="method_result") + mock_client_init.return_value = mock_session + + # Should be able to chain methods when session is returned + result = agentops.init(api_key="test-key") + if result: + method_result = result.some_method() + assert method_result == "method_result" + + @patch("agentops.client.Client.init") + def test_init_multiple_calls_behavior(self, mock_client_init): + """Test behavior of multiple init() calls.""" + # First call returns a session + mock_session1 = MagicMock(spec=Session) + mock_client_init.return_value = mock_session1 + + result1 = agentops.init(api_key="test-key") + assert result1 == mock_session1 + + # Second call returns None (already initialized) + mock_client_init.return_value = None + + result2 = agentops.init(api_key="test-key") + assert result2 is None + + # Verify both calls were made + assert mock_client_init.call_count == 2 + + +class TestInitAPIConsistencyRegression: + """Regression tests for the API consistency changes that were reverted.""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_init_does_not_always_return_client(self, mock_client_init): + """Test that init() does NOT always return the client instance (reverted behavior).""" + # This test ensures the reverted change is working correctly + # The original change made init() always return the client, but this was reverted + + # Setup mock to return None + mock_client_init.return_value = None + + # Call init + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Should return None, not the client instance + assert result is None + assert result != agentops._client + + @patch("agentops.client.Client.init") + def test_init_return_value_matches_client_init(self, mock_client_init): + """Test that init() return value exactly matches client.init() return value.""" + # Test with various return values + test_values = [None, MagicMock(spec=Session), "string_value", 42, {"dict": "value"}, ["list", "value"]] + + for test_value in test_values: + # Reset mock + mock_client_init.reset_mock() + mock_client_init.return_value = test_value + + # Call init + result = agentops.init(api_key="test-key") + + # Should return exactly what client.init() returned + assert result == test_value + assert result is test_value # Same object reference + + def test_init_function_signature_unchanged(self): + """Test that init() function signature is unchanged.""" + import inspect + + # Get the signature of the init function + signature = inspect.signature(agentops.init) + + # Verify key parameters exist + expected_params = [ + "api_key", + "endpoint", + "app_url", + "max_wait_time", + "max_queue_size", + "default_tags", + "instrument_llm_calls", + "auto_start_session", + "skip_auto_end_session", + "env_data_opt_out", + ] + + for param in expected_params: + assert param in signature.parameters, f"Parameter {param} missing from init() signature" + + @patch("agentops.client.Client.init") + def test_init_preserves_client_state(self, mock_client_init): + """Test that init() preserves client state regardless of return value.""" + # Setup mock + mock_client_init.return_value = None + + # Get initial client reference + initial_client = agentops._client + + # Call init + result = agentops.init(api_key="test-key") + + # Client reference should be unchanged + assert agentops._client is initial_client + + # Return value should not affect client state + assert result != agentops._client + + +class TestInitDocumentationExamples: + """Tests based on common documentation examples to ensure compatibility.""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_common_usage_pattern_1(self, mock_client_init): + """Test: session = agentops.init(api_key="...", auto_start_session=True)""" + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + # Common pattern from documentation + session = agentops.init(api_key="test-key", auto_start_session=True) + + # Should work as expected + assert session == mock_session + + @patch("agentops.client.Client.init") + def test_common_usage_pattern_2(self, mock_client_init): + """Test: agentops.init(api_key="...", auto_start_session=False)""" + mock_client_init.return_value = None + + # Common pattern from documentation + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Should return None + assert result is None + + @patch("agentops.client.Client.init") + def test_common_usage_pattern_3(self, mock_client_init): + """Test: if agentops.init(...): # do something""" + # Test truthy case + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + if agentops.init(api_key="test-key"): + truthy_result = True + else: + truthy_result = False + + assert truthy_result is True + + # Test falsy case + mock_client_init.return_value = None + + if agentops.init(api_key="test-key"): + falsy_result = True + else: + falsy_result = False + + assert falsy_result is False diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index bf20aa764..803548ce9 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -31,16 +31,16 @@ def mock_api_client(): @pytest.fixture(scope="function") -def mock_trace_context(): +def mock_tracer(): """Mock the TraceContext creation""" mock_span = MagicMock() mock_token = MagicMock() - mock_trace_context_instance = MagicMock() - mock_trace_context_instance.span = mock_span - mock_trace_context_instance.token = mock_token - mock_trace_context_instance.is_init_trace = False + mock_tracer_instance = MagicMock() + mock_tracer_instance.span = mock_span + mock_tracer_instance.token = mock_token + mock_tracer_instance.is_init_trace = False - return mock_trace_context_instance + return mock_tracer_instance @pytest.fixture(scope="function") @@ -52,22 +52,22 @@ def reset_client(): agentops._client = agentops.Client() # Reset all client state agentops._client._initialized = False - agentops._client._init_trace_context = None + agentops._client._init_tracer = None agentops._client._legacy_session_for_init_trace = None yield # Clean up after test try: if hasattr(agentops._client, "_initialized"): agentops._client._initialized = False - if hasattr(agentops._client, "_init_trace_context"): - agentops._client._init_trace_context = None + if hasattr(agentops._client, "_init_tracer"): + agentops._client._init_tracer = None if hasattr(agentops._client, "_legacy_session_for_init_trace"): agentops._client._legacy_session_for_init_trace = None except: pass -def test_explicit_init_then_explicit_start_trace(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_explicit_init_then_explicit_start_trace(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test explicitly initializing followed by explicitly starting a trace""" import agentops @@ -78,23 +78,23 @@ def test_explicit_init_then_explicit_start_trace(mock_tracing_core, mock_api_cli mock_tracing_core.start_trace.assert_not_called() # Mock the start_trace method to return our mock trace context - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Explicitly start a trace - trace_context = agentops.start_trace(trace_name="test_trace", tags=["test"]) + tracer = agentops.start_trace(trace_name="test_trace", tags=["test"]) # Verify the trace was created mock_tracing_core.start_trace.assert_called_once_with(trace_name="test_trace", tags=["test"]) - assert trace_context == mock_trace_context + assert tracer == mock_tracer -def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test initializing with auto_start_session=True""" import agentops from agentops.legacy import Session # Mock the start_trace method to return our mock trace context - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Initialize with auto_start_session=True result = agentops.init(api_key="test-api-key", auto_start_session=True) @@ -103,16 +103,16 @@ def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_trace_ mock_tracing_core.start_trace.assert_called_once() # init() should return a Session object when auto-starting a session assert isinstance(result, Session) - assert result.trace_context == mock_trace_context + assert result.tracer == mock_tracer -def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test initializing with default auto_start_session behavior""" import agentops from agentops.legacy import Session # Mock the start_trace method to return our mock trace context - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Initialize without explicitly setting auto_start_session (defaults to True) result = agentops.init(api_key="test-api-key") @@ -121,7 +121,7 @@ def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_tra assert agentops._client.initialized # Since auto_start_session defaults to True, init() should return a Session object assert isinstance(result, Session) - assert result.trace_context == mock_trace_context + assert result.tracer == mock_tracer def test_start_trace_without_init(): @@ -156,18 +156,18 @@ def side_effect(): assert result is None -def test_end_trace(mock_tracing_core, mock_trace_context): +def test_end_trace(mock_tracing_core, mock_tracer): """Test ending a trace""" import agentops # End the trace - agentops.end_trace(mock_trace_context, end_state="Success") + agentops.end_trace(mock_tracer, end_state="Success") # Verify end_trace was called on TracingCore - mock_tracing_core.end_trace.assert_called_once_with(trace_context=mock_trace_context, end_state="Success") + mock_tracing_core.end_trace.assert_called_once_with(tracer=mock_tracer, end_state="Success") -def test_session_decorator_creates_trace(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_session_decorator_creates_trace(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that the @session decorator creates a trace-level span""" import agentops from agentops.sdk.decorators import session @@ -176,7 +176,7 @@ def test_session_decorator_creates_trace(mock_tracing_core, mock_api_client, moc agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace and end_trace methods - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer @session(name="test_session", tags=["decorator_test"]) def test_function(): @@ -194,7 +194,7 @@ def test_function(): assert mock_tracing_core.end_trace.call_count >= 1 -def test_session_decorator_with_exception(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_session_decorator_with_exception(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that the @session decorator handles exceptions properly""" import agentops from agentops.sdk.decorators import session @@ -203,7 +203,7 @@ def test_session_decorator_with_exception(mock_tracing_core, mock_api_client, mo agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer @session(name="failing_session") def failing_function(): @@ -219,7 +219,7 @@ def failing_function(): assert mock_tracing_core.end_trace.call_count >= 1 -def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that legacy start_session still works and calls TracingCore.start_trace""" import agentops from agentops.legacy import Session @@ -228,21 +228,21 @@ def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Start a legacy session session = agentops.start_session(tags=["legacy_test"]) # Verify the session was created assert isinstance(session, Session) - assert session.trace_context == mock_trace_context + assert session.tracer == mock_tracer # Verify that TracingCore.start_trace was called # Note: May be called multiple times due to initialization assert mock_tracing_core.start_trace.call_count >= 1 -def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that legacy end_session still works and calls TracingCore.end_trace""" import agentops from agentops.legacy import Session @@ -251,13 +251,13 @@ def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mo agentops.init(api_key="test-api-key", auto_start_session=False) # Create a legacy session object - session = Session(mock_trace_context) + session = Session(mock_tracer) # End the session agentops.end_session(session) # Verify that TracingCore.end_trace was called - mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, end_state="Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_tracer, end_state="Success") def test_no_double_init(mock_tracing_core, mock_api_client, reset_client): @@ -277,12 +277,12 @@ def test_no_double_init(mock_tracing_core, mock_api_client, reset_client): assert mock_api_client.call_count == call_count -def test_client_initialization_behavior(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_client_initialization_behavior(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test basic client initialization behavior""" import agentops # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Test that initialization works agentops.init(api_key="test-api-key", auto_start_session=False) @@ -308,13 +308,13 @@ def test_multiple_concurrent_traces(mock_tracing_core, mock_api_client, reset_cl agentops.init(api_key="test-api-key", auto_start_session=False) # Create mock trace contexts for different traces - mock_trace_context_1 = MagicMock() - mock_trace_context_2 = MagicMock() + mock_tracer_1 = MagicMock() + mock_tracer_2 = MagicMock() # Mock start_trace to return different contexts mock_tracing_core.start_trace.side_effect = [ - mock_trace_context_1, - mock_trace_context_2, + mock_tracer_1, + mock_tracer_2, ] # Start multiple traces @@ -322,27 +322,27 @@ def test_multiple_concurrent_traces(mock_tracing_core, mock_api_client, reset_cl trace2 = agentops.start_trace(trace_name="trace2", tags=["test2"]) # Verify both traces were created - assert trace1 == mock_trace_context_1 - assert trace2 == mock_trace_context_2 + assert trace1 == mock_tracer_1 + assert trace2 == mock_tracer_2 # Verify start_trace was called twice assert mock_tracing_core.start_trace.call_count == 2 -def test_trace_context_properties(mock_trace_context): +def test_tracer_properties(mock_tracer): """Test that TraceContext properties work correctly""" from agentops.legacy import Session # Create a legacy session with the mock trace context - session = Session(mock_trace_context) + session = Session(mock_tracer) # Test that properties are accessible - assert session.span == mock_trace_context.span - assert session.token == mock_trace_context.token - assert session.trace_context == mock_trace_context + assert session.span == mock_tracer.span + assert session.token == mock_tracer.token + assert session.tracer == mock_tracer -def test_session_decorator_async_function(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_session_decorator_async_function(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that the @session decorator works with async functions""" import agentops import asyncio @@ -352,7 +352,7 @@ def test_session_decorator_async_function(mock_tracing_core, mock_api_client, mo agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer @session(name="async_test_session") async def async_test_function(): @@ -370,7 +370,7 @@ async def async_test_function(): assert mock_tracing_core.end_trace.call_count >= 1 -def test_trace_context_creation(): +def test_tracer_creation(): """Test that TraceContext can be created with proper attributes""" from agentops.sdk.core import TraceContext @@ -378,11 +378,11 @@ def test_trace_context_creation(): mock_token = MagicMock() # Test creating a TraceContext - trace_context = TraceContext(span=mock_span, token=mock_token, is_init_trace=False) + tracer = TraceContext(span=mock_span, token=mock_token, is_init_trace=False) - assert trace_context.span == mock_span - assert trace_context.token == mock_token - assert trace_context.is_init_trace == False + assert tracer.span == mock_span + assert tracer.token == mock_token + assert tracer.is_init_trace == False def test_session_management_integration(): @@ -408,19 +408,19 @@ def test_session_management_integration(): agentops.init(api_key="test-api-key", auto_start_session=False) # Create mock trace context - mock_trace_context = MagicMock() - mock_instance.start_trace.return_value = mock_trace_context + mock_tracer = MagicMock() + mock_instance.start_trace.return_value = mock_tracer # Test new API - trace_context = agentops.start_trace(trace_name="new_api_trace") - assert trace_context == mock_trace_context + tracer = agentops.start_trace(trace_name="new_api_trace") + assert tracer == mock_tracer # Test legacy API session = agentops.start_session(tags=["legacy"]) - assert session.trace_context == mock_trace_context + assert session.tracer == mock_tracer # Test ending both - agentops.end_trace(trace_context) + agentops.end_trace(tracer) agentops.end_session(session) # Verify calls were made