diff --git a/agentops/client/client.py b/agentops/client/client.py index 935f21c3c..ae6f8e8c6 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -1,4 +1,5 @@ from typing import List, Optional, Union +import atexit from agentops.client.api import ApiClient from agentops.config import Config @@ -8,6 +9,28 @@ from agentops.logging.config import configure_logging, intercept_opentelemetry_logging from agentops.sdk.core import TracingCore +# Global registry for active session +_active_session = None + +# Single atexit handler registered flag +_atexit_registered = False + +def _end_active_session(): + """Global handler to end the active session during shutdown""" + global _active_session + if _active_session is not None: + logger.debug("Auto-ending active session during shutdown") + try: + from agentops.legacy import end_session + end_session(_active_session) + except Exception as e: + logger.warning(f"Error ending active session during shutdown: {e}") + # Final fallback: try to end the span directly + try: + if hasattr(_active_session, 'span') and hasattr(_active_session.span, 'end'): + _active_session.span.end() + except: + pass class Client: """Singleton client for AgentOps service""" @@ -61,6 +84,12 @@ def init(self, **kwargs): self.initialized = True + # Register a single global atexit handler for session management + global _atexit_registered + if not _atexit_registered: + atexit.register(_end_active_session) + _atexit_registered = True + # Start a session if auto_start_session is True session = None if self.config.auto_start_session: @@ -71,6 +100,10 @@ def init(self, **kwargs): session = start_session(tags=list(self.config.default_tags)) else: session = start_session() + + # Register this session globally + global _active_session + _active_session = session return session diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index 0949825d1..60226b0dd 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -132,15 +132,21 @@ def start_session( # Pass auto_start_session=False to prevent circular dependency Client().init(auto_start_session=False) - span, context, token = _create_session_span(tags) + span, ctx, token = _create_session_span(tags) session = Session(span, token) # Set the global session reference _current_session = session + # Also register with the client's session registry for consistent behavior + try: + import agentops.client.client + agentops.client.client._active_session = session + except Exception: + pass + return session - def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None: """ Helper to set attributes on a span. @@ -202,6 +208,18 @@ def end_session(session_or_status: Any = None, **kwargs) -> None: logger.debug("Ignoring end_session call - TracingCore not initialized") return + # Clear client active session reference + try: + import agentops.client.client + if session_or_status is None and kwargs: + if _current_session is agentops.client.client._active_session: + agentops.client.client._active_session = None + elif hasattr(session_or_status, 'span'): + if session_or_status is agentops.client.client._active_session: + agentops.client.client._active_session = None + except Exception: + pass + # In some old implementations, and in crew < 0.10.5 `end_session` will be # called with a single string as a positional argument like: "Success" @@ -216,24 +234,45 @@ def end_session(session_or_status: Any = None, **kwargs) -> None: # ) if session_or_status is None and kwargs: if _current_session is not None: - _set_span_attributes(_current_session.span, kwargs) - _finalize_span(_current_session.span, _current_session.token) - _flush_span_processors() - _current_session = None + try: + _set_span_attributes(_current_session.span, kwargs) + _finalize_span(_current_session.span, _current_session.token) + _flush_span_processors() + _current_session = None + except Exception as e: + logger.warning(f"Error ending current session: {e}") + # Fallback: try direct span ending + try: + if hasattr(_current_session.span, "end"): + _current_session.span.end() + _current_session = None + except: + pass return # Handle the standard pattern and CrewAI >= 0.105.0 pattern where a Session object is passed. # In both cases, we call _finalize_span with the span and token from the Session. # This is the most direct and precise way to end a specific session. if hasattr(session_or_status, 'span') and hasattr(session_or_status, 'token'): - # Set attributes and finalize the span - _set_span_attributes(session_or_status.span, kwargs) - _finalize_span(session_or_status.span, session_or_status.token) - _flush_span_processors() - - # Clear the global session reference if this is the current session - if _current_session is session_or_status: - _current_session = None + try: + # Set attributes and finalize the span + _set_span_attributes(session_or_status.span, kwargs) + _finalize_span(session_or_status.span, session_or_status.token) + _flush_span_processors() + + # Clear the global session reference if this is the current session + if _current_session is session_or_status: + _current_session = None + except Exception as e: + logger.warning(f"Error ending session object: {e}") + # Fallback: try direct span ending + try: + if hasattr(session_or_status.span, "end"): + session_or_status.span.end() + if _current_session is session_or_status: + _current_session = None + except: + pass def end_all_sessions(): diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 540089e8c..08086d0ef 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -14,6 +14,7 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import SpanProcessor, TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry import context as context_api from agentops.exceptions import AgentOpsClientNotInitializedException from agentops.logging import logger @@ -90,6 +91,9 @@ def setup_telemetry( meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(meter_provider) + # Initialize root context + context_api.get_current() + logger.debug("Telemetry system initialized") return provider, meter_provider diff --git a/agentops/sdk/decorators/utility.py b/agentops/sdk/decorators/utility.py index 0e3953dfd..c0a335dd0 100644 --- a/agentops/sdk/decorators/utility.py +++ b/agentops/sdk/decorators/utility.py @@ -164,10 +164,6 @@ def _make_span( - context is the span context - token is the context token needed for detaching """ - # Log before we do anything - before_span = _get_current_span_info() - logger.debug(f"[DEBUG] BEFORE _make_span {operation_name}.{span_kind} - Current context: {before_span}") - # Create span with proper naming convention span_name = f"{operation_name}.{span_kind}" @@ -186,20 +182,19 @@ def _make_span( if version is not None: attributes[SpanAttributes.OPERATION_VERSION] = version - # Get current context explicitly current_context = context_api.get_current() - # Create the span with explicit context - span = tracer.start_span(span_name, context=current_context, attributes=attributes) - - # Set as current context and get token for later detachment + # Create the span with proper context management + if span_kind == SpanKind.SESSION: + # For session spans, create as a root span + span = tracer.start_span(span_name, attributes=attributes) + else: + # For other spans, use the current context + span = tracer.start_span(span_name, context=current_context, attributes=attributes) + + # Set as current context and get token for detachment ctx = trace.set_span_in_context(span) token = context_api.attach(ctx) - - # Log after span creation - if hasattr(span, "get_span_context"): - span_ctx = span.get_span_context() - logger.debug(f"[DEBUG] CREATED _make_span {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}") return span, ctx, token @@ -232,19 +227,49 @@ def _record_entity_output(span: trace.Span, result: Any) -> None: def _finalize_span(span: trace.Span, token: Any) -> None: - """End the span and detach the context token""" - if hasattr(span, "get_span_context") and hasattr(span.get_span_context(), "span_id"): - span_id = f"{span.get_span_context().span_id:x}" - logger.debug(f"[DEBUG] ENDING span {getattr(span, 'name', 'unknown')} - span_id: {span_id}") + """ + Finalizes a span and cleans up its context. - span.end() + This function performs three critical tasks needed for proper span lifecycle management: + 1. Ends the span to mark it complete and calculate its duration + 2. Detaches the context token to prevent memory leaks and maintain proper context hierarchy + 3. Forces immediate span export rather than waiting for batch processing + + Use cases: + - Session span termination: Ensures root spans are properly ended and exported + - Shutdown handling: Ensures spans are flushed during application termination + - Async operations: Finalizes spans from asynchronous execution contexts - # Debug info before detaching - current_after_end = _get_current_span_info() - logger.debug(f"[DEBUG] AFTER span.end() - Current context: {current_after_end}") + Without proper finalization, spans may not trigger on_end events in processors, + potentially resulting in missing or incomplete telemetry data. - context_api.detach(token) + Args: + span: The span to finalize + token: The context token to detach + """ + # End the span + if span: + try: + span.end() + except Exception as e: + logger.warning(f"Error ending span: {e}") + + # Detach context token if provided + if token: + try: + context_api.detach(token) + except Exception: + pass - # Debug info after detaching - final_context = _get_current_span_info() - logger.debug(f"[DEBUG] AFTER detach - Final context: {final_context}") + # Try to flush span processors + # Note: force_flush() might not be available in certain scenarios: + # - During application shutdown when the provider may be partially destroyed + # We use try/except to gracefully handle these cases while ensuring spans are + # flushed when possible, which is especially critical for session spans. + try: + from opentelemetry.trace import get_tracer_provider + tracer_provider = get_tracer_provider() + tracer_provider.force_flush() + except (AttributeError, Exception): + # Either force_flush doesn't exist or there was an error calling it + pass