Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions agentops/client/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Optional, Union
import atexit

from agentops.client.api import ApiClient
from agentops.config import Config
Expand All @@ -8,6 +9,28 @@
from agentops.logging.config import configure_logging, intercept_opentelemetry_logging
from agentops.sdk.core import TracingCore

# Global registry for active session
_active_session = None

# Single atexit handler registered flag
_atexit_registered = False

def _end_active_session():
"""Global handler to end the active session during shutdown"""
global _active_session
if _active_session is not None:
logger.debug("Auto-ending active session during shutdown")
try:
from agentops.legacy import end_session
end_session(_active_session)
except Exception as e:
logger.warning(f"Error ending active session during shutdown: {e}")

Check warning on line 27 in agentops/client/client.py

View check run for this annotation

Codecov / codecov/patch

agentops/client/client.py#L21-L27

Added lines #L21 - L27 were not covered by tests
# Final fallback: try to end the span directly
try:
if hasattr(_active_session, 'span') and hasattr(_active_session.span, 'end'):
_active_session.span.end()
except:
pass

Check warning on line 33 in agentops/client/client.py

View check run for this annotation

Codecov / codecov/patch

agentops/client/client.py#L29-L33

Added lines #L29 - L33 were not covered by tests

class Client:
"""Singleton client for AgentOps service"""
Expand Down Expand Up @@ -61,6 +84,12 @@

self.initialized = True

# Register a single global atexit handler for session management
global _atexit_registered
if not _atexit_registered:
atexit.register(_end_active_session)
_atexit_registered = True

# Start a session if auto_start_session is True
session = None
if self.config.auto_start_session:
Expand All @@ -71,6 +100,10 @@
session = start_session(tags=list(self.config.default_tags))
else:
session = start_session()

# Register this session globally
global _active_session
_active_session = session

return session

Expand Down
67 changes: 53 additions & 14 deletions agentops/legacy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,21 @@
# Pass auto_start_session=False to prevent circular dependency
Client().init(auto_start_session=False)

span, context, token = _create_session_span(tags)
span, ctx, token = _create_session_span(tags)
session = Session(span, token)

# Set the global session reference
_current_session = session

# Also register with the client's session registry for consistent behavior
try:
import agentops.client.client
agentops.client.client._active_session = session
except Exception:
pass

Check warning on line 146 in agentops/legacy/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/legacy/__init__.py#L145-L146

Added lines #L145 - L146 were not covered by tests

return session


def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None:
"""
Helper to set attributes on a span.
Expand Down Expand Up @@ -202,6 +208,18 @@
logger.debug("Ignoring end_session call - TracingCore not initialized")
return

# Clear client active session reference
try:
import agentops.client.client
if session_or_status is None and kwargs:
if _current_session is agentops.client.client._active_session:
agentops.client.client._active_session = None
elif hasattr(session_or_status, 'span'):
if session_or_status is agentops.client.client._active_session:
agentops.client.client._active_session = None
except Exception:
pass

Check warning on line 221 in agentops/legacy/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/legacy/__init__.py#L220-L221

Added lines #L220 - L221 were not covered by tests

# In some old implementations, and in crew < 0.10.5 `end_session` will be
# called with a single string as a positional argument like: "Success"

Expand All @@ -216,24 +234,45 @@
# )
if session_or_status is None and kwargs:
if _current_session is not None:
_set_span_attributes(_current_session.span, kwargs)
_finalize_span(_current_session.span, _current_session.token)
_flush_span_processors()
_current_session = None
try:
_set_span_attributes(_current_session.span, kwargs)
_finalize_span(_current_session.span, _current_session.token)
_flush_span_processors()
_current_session = None
except Exception as e:
logger.warning(f"Error ending current session: {e}")

Check warning on line 243 in agentops/legacy/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/legacy/__init__.py#L242-L243

Added lines #L242 - L243 were not covered by tests
# Fallback: try direct span ending
try:
if hasattr(_current_session.span, "end"):
_current_session.span.end()
_current_session = None
except:
pass

Check warning on line 250 in agentops/legacy/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/legacy/__init__.py#L245-L250

Added lines #L245 - L250 were not covered by tests
return

# Handle the standard pattern and CrewAI >= 0.105.0 pattern where a Session object is passed.
# In both cases, we call _finalize_span with the span and token from the Session.
# This is the most direct and precise way to end a specific session.
if hasattr(session_or_status, 'span') and hasattr(session_or_status, 'token'):
# Set attributes and finalize the span
_set_span_attributes(session_or_status.span, kwargs)
_finalize_span(session_or_status.span, session_or_status.token)
_flush_span_processors()

# Clear the global session reference if this is the current session
if _current_session is session_or_status:
_current_session = None
try:
# Set attributes and finalize the span
_set_span_attributes(session_or_status.span, kwargs)
_finalize_span(session_or_status.span, session_or_status.token)
_flush_span_processors()

# Clear the global session reference if this is the current session
if _current_session is session_or_status:
_current_session = None
except Exception as e:
logger.warning(f"Error ending session object: {e}")

Check warning on line 267 in agentops/legacy/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/legacy/__init__.py#L266-L267

Added lines #L266 - L267 were not covered by tests
# Fallback: try direct span ending
try:
if hasattr(session_or_status.span, "end"):
session_or_status.span.end()
if _current_session is session_or_status:
_current_session = None
except:
pass

Check warning on line 275 in agentops/legacy/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/legacy/__init__.py#L269-L275

Added lines #L269 - L275 were not covered by tests


def end_all_sessions():
Expand Down
4 changes: 4 additions & 0 deletions agentops/sdk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry import context as context_api

from agentops.exceptions import AgentOpsClientNotInitializedException
from agentops.logging import logger
Expand Down Expand Up @@ -90,6 +91,9 @@
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

# Initialize root context
context_api.get_current()

Check warning on line 95 in agentops/sdk/core.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/core.py#L95

Added line #L95 was not covered by tests

logger.debug("Telemetry system initialized")

return provider, meter_provider
Expand Down
77 changes: 51 additions & 26 deletions agentops/sdk/decorators/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@
- context is the span context
- token is the context token needed for detaching
"""
# Log before we do anything
before_span = _get_current_span_info()
logger.debug(f"[DEBUG] BEFORE _make_span {operation_name}.{span_kind} - Current context: {before_span}")

# Create span with proper naming convention
span_name = f"{operation_name}.{span_kind}"

Expand All @@ -186,20 +182,19 @@
if version is not None:
attributes[SpanAttributes.OPERATION_VERSION] = version

# Get current context explicitly
current_context = context_api.get_current()

# Create the span with explicit context
span = tracer.start_span(span_name, context=current_context, attributes=attributes)

# Set as current context and get token for later detachment
# Create the span with proper context management
if span_kind == SpanKind.SESSION:
# For session spans, create as a root span
span = tracer.start_span(span_name, attributes=attributes)
else:
# For other spans, use the current context
span = tracer.start_span(span_name, context=current_context, attributes=attributes)

# Set as current context and get token for detachment
ctx = trace.set_span_in_context(span)
token = context_api.attach(ctx)

# Log after span creation
if hasattr(span, "get_span_context"):
span_ctx = span.get_span_context()
logger.debug(f"[DEBUG] CREATED _make_span {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}")

return span, ctx, token

Expand Down Expand Up @@ -232,19 +227,49 @@


def _finalize_span(span: trace.Span, token: Any) -> None:
"""End the span and detach the context token"""
if hasattr(span, "get_span_context") and hasattr(span.get_span_context(), "span_id"):
span_id = f"{span.get_span_context().span_id:x}"
logger.debug(f"[DEBUG] ENDING span {getattr(span, 'name', 'unknown')} - span_id: {span_id}")
"""
Finalizes a span and cleans up its context.

span.end()
This function performs three critical tasks needed for proper span lifecycle management:
1. Ends the span to mark it complete and calculate its duration
2. Detaches the context token to prevent memory leaks and maintain proper context hierarchy
3. Forces immediate span export rather than waiting for batch processing

Use cases:
- Session span termination: Ensures root spans are properly ended and exported
- Shutdown handling: Ensures spans are flushed during application termination
- Async operations: Finalizes spans from asynchronous execution contexts

# Debug info before detaching
current_after_end = _get_current_span_info()
logger.debug(f"[DEBUG] AFTER span.end() - Current context: {current_after_end}")
Without proper finalization, spans may not trigger on_end events in processors,
potentially resulting in missing or incomplete telemetry data.

context_api.detach(token)
Args:
span: The span to finalize
token: The context token to detach
"""
# End the span
if span:
try:
span.end()
except Exception as e:
logger.warning(f"Error ending span: {e}")

Check warning on line 255 in agentops/sdk/decorators/utility.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/utility.py#L254-L255

Added lines #L254 - L255 were not covered by tests

# Detach context token if provided
if token:
try:
context_api.detach(token)
except Exception:
pass

Check warning on line 262 in agentops/sdk/decorators/utility.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/utility.py#L261-L262

Added lines #L261 - L262 were not covered by tests

# Debug info after detaching
final_context = _get_current_span_info()
logger.debug(f"[DEBUG] AFTER detach - Final context: {final_context}")
# Try to flush span processors
# Note: force_flush() might not be available in certain scenarios:
# - During application shutdown when the provider may be partially destroyed
# We use try/except to gracefully handle these cases while ensuring spans are
# flushed when possible, which is especially critical for session spans.
try:
from opentelemetry.trace import get_tracer_provider
tracer_provider = get_tracer_provider()
tracer_provider.force_flush()
except (AttributeError, Exception):

Check warning on line 273 in agentops/sdk/decorators/utility.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/utility.py#L273

Added line #L273 was not covered by tests
# Either force_flush doesn't exist or there was an error calling it
pass

Check warning on line 275 in agentops/sdk/decorators/utility.py

View check run for this annotation

Codecov / codecov/patch

agentops/sdk/decorators/utility.py#L275

Added line #L275 was not covered by tests
Loading