Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6fa9dd5
Introduced `start_trace` and `end_trace` functions for user-managed t…
Dwij1704 May 22, 2025
efacc7c
Enhance tracing functionality by introducing new decorators and impro…
Dwij1704 May 22, 2025
a369761
cleanup
Dwij1704 May 22, 2025
206dd6f
Enhance `init` function in AgentOps SDK by adding new parameters: `ta…
Dwij1704 May 22, 2025
79c7637
Merge branch 'main' into better-root-span-management
dot-agi May 25, 2025
6f284c7
Refactor legacy session handling by replacing `LegacySession` with `S…
Dwij1704 May 27, 2025
16893b6
Enhance unit tests for URL logging in TracingCore and InternalSpanPro…
Dwij1704 May 27, 2025
e8de762
Refactor CrewAI workflow instrumentation to enhance span management a…
Dwij1704 May 27, 2025
cd17b13
Refactor force_flush method in TracingCore to remove timeout paramete…
Dwij1704 May 27, 2025
1faf0e3
Refactor Client initialization logic to clarify re-initialization con…
Dwij1704 May 27, 2025
a701e4b
Refactor Client initialization to support backward compatibility with…
Dwij1704 May 27, 2025
67b2f80
Improve authentication error handling in Client and V3Client. Added e…
Dwij1704 May 27, 2025
fa82e41
Refactor integration tests for session concurrency to improve isolati…
Dwij1704 May 27, 2025
8c8e974
Enhance trace management by updating end_trace function to allow endi…
Dwij1704 May 27, 2025
c49191c
Merge branch 'main' into better-root-span-management
Dwij1704 May 27, 2025
fa6eca3
Enhance trace ID handling in TracingCore by adding exception handling…
Dwij1704 May 27, 2025
310aeed
Merge branch 'better-root-span-management' of github.com:AgentOps-AI/…
Dwij1704 May 27, 2025
27f1701
revert crewai
Dwij1704 May 27, 2025
abd53c3
Merge branch 'main' into better-root-span-management
Dwij1704 May 27, 2025
ce4ab1a
Merge branch 'main' into better-root-span-management
Dwij1704 May 27, 2025
51bbf7f
Enhance tracing functionality by adding `trace_name` parameter to con…
Dwij1704 May 27, 2025
89767bb
Merge branch 'better-root-span-management' of github.com:AgentOps-AI/…
Dwij1704 May 27, 2025
4fe79d9
Remove unused span variables in entity decorator function to streamli…
Dwij1704 May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
track_agent,
track_tool,
end_all_sessions,
Session,
Session as LegacySession,
ToolEvent,
ErrorEvent,
ActionEvent,
LLMEvent,
) # type: ignore

from typing import List, Optional, Union
from typing import List, Optional, Union, Dict, Any
from agentops.client import Client
from agentops.sdk.core import TracingCore, TraceContext
from agentops.sdk.decorators import trace, session, agent, task, workflow, operation

from agentops.logging.config import logger

# Client global instance; one per process runtime
_client = Client()
Expand Down Expand Up @@ -165,26 +168,79 @@ def configure(**kwargs):
# Check for invalid parameters
invalid_params = set(kwargs.keys()) - valid_params
if invalid_params:
from .logging.config import logger

logger.warning(f"Invalid configuration parameters: {invalid_params}")

_client.configure(**kwargs)


def start_trace(
trace_name: str = "session", tags: Optional[Union[Dict[str, Any], List[str]]] = None
) -> Optional[TraceContext]:
"""
Starts a new trace (root span) and returns its context.
This allows for multiple concurrent, user-managed traces.

Args:
trace_name: Name for the trace (e.g., "session", "my_custom_task").
tags: Optional tags to attach to the trace span (list of strings or dict).

Returns:
A TraceContext object containing the span and context token, or None if SDK not initialized.
"""
tracing_core = TracingCore.get_instance()
if not tracing_core.initialized:
# Optionally, attempt to initialize the client if not already, or log a more severe warning.
# For now, align with legacy start_session that would try to init.
# However, explicit init is preferred before starting traces.
logger.warning("AgentOps SDK not initialized. Attempting to initialize with defaults before starting trace.")
try:
init() # Attempt to initialize with environment variables / defaults
if not tracing_core.initialized:
logger.error("SDK initialization failed. Cannot start trace.")
return None
except Exception as e:
logger.error(f"SDK auto-initialization failed during start_trace: {e}. Cannot start trace.")
return None

return tracing_core.start_trace(trace_name=trace_name, tags=tags)


def end_trace(trace_context: TraceContext, end_state: str = "Success") -> None:
"""
Ends a trace (its root span) and finalizes it.

Args:
trace_context: The TraceContext object returned by start_trace.
end_state: The final state of the trace (e.g., "Success", "Failure", "Error").
"""
tracing_core = TracingCore.get_instance()
if not tracing_core.initialized:
logger.warning("AgentOps SDK not initialized. Cannot end trace.")
return
tracing_core.end_trace(trace_context=trace_context, end_state=end_state)


__all__ = [
"init",
"configure",
"get_client",
"record",
"start_trace",
"end_trace",
"start_session",
"end_session",
"track_agent",
"track_tool",
"end_all_sessions",
"Session",
"ToolEvent",
"ErrorEvent",
"ActionEvent",
"LLMEvent",
"LegacySession",
"trace",
"session",
"agent",
"task",
"workflow",
"operation",
]
186 changes: 131 additions & 55 deletions agentops/client/client.py
Original file line number Diff line number Diff line change
@@ -1,117 +1,174 @@
import atexit
from typing import Optional, Any

from agentops.client.api import ApiClient
from agentops.config import Config
from agentops.exceptions import NoApiKeyException
from agentops.instrumentation import instrument_all
from agentops.logging import logger
from agentops.logging.config import configure_logging, intercept_opentelemetry_logging
from agentops.sdk.core import TracingCore
from agentops.sdk.core import TracingCore, TraceContext
from agentops.legacy import Session as LegacySession

# Global registry for active session
_active_session = None
# Global variables to hold the client's auto-started trace and its legacy session wrapper
_client_init_trace_context: Optional[TraceContext] = None
_client_legacy_session_for_init_trace: Optional[LegacySession] = None

# Single atexit handler registered flag
_atexit_registered = False


def _end_active_session():
"""Global handler to end the active session during shutdown"""
global _active_session
if _active_session is not None:
logger.debug("Auto-ending active session during shutdown")
def _end_init_trace_atexit():
"""Global atexit handler to end the client's auto-initialized trace during shutdown."""
global _client_init_trace_context, _client_legacy_session_for_init_trace
if _client_init_trace_context is not None:
logger.debug("Auto-ending client's init trace during shutdown.")
try:
from agentops.legacy import end_session

end_session(_active_session)
# Use TracingCore to end the trace directly
tracing_core = TracingCore.get_instance()
if tracing_core.initialized and _client_init_trace_context.span.is_recording():
tracing_core.end_trace(_client_init_trace_context, end_state="Shutdown")
except Exception as e:
logger.warning(f"Error ending active session during shutdown: {e}")
# Final fallback: try to end the span directly
try:
if hasattr(_active_session, "span") and hasattr(_active_session.span, "end"):
_active_session.span.end()
except:
pass
logger.warning(f"Error ending client's init trace during shutdown: {e}")
finally:
_client_init_trace_context = None
_client_legacy_session_for_init_trace = None # Clear its legacy wrapper too


class Client:
"""Singleton client for AgentOps service"""

config: Config
_initialized: bool
_init_trace_context: Optional[TraceContext] = None # Stores the context of the auto-started trace
_legacy_session_for_init_trace: Optional[
LegacySession
] = None # Stores the legacy Session wrapper for the auto-started trace

__instance = None # Class variable for singleton pattern

api: ApiClient

def __new__(cls, *args, **kwargs):
def __new__(cls, *args: Any, **kwargs: Any) -> "Client":
if cls.__instance is None:
cls.__instance = super(Client, cls).__new__(cls)
# Initialize instance variables that should only be set once per instance
cls.__instance._init_trace_context = None
cls.__instance._legacy_session_for_init_trace = None
return cls.__instance

def __init__(self):
# Only initialize once
self._initialized = False
self.config = Config()

def init(self, **kwargs):
# Initialization of attributes like config, _initialized should happen here if they are instance-specific
# and not shared via __new__ for a true singleton that can be re-configured.
# However, the current pattern re-initializes config in init().
if (
not hasattr(self, "_initialized") or not self._initialized
): # Ensure init logic runs only once per actual initialization intent
self.config = Config() # Initialize config here for the instance
self._initialized = False
# self._init_trace_context = None # Already done in __new__
# self._legacy_session_for_init_trace = None # Already done in __new__

def init(self, **kwargs: Any) -> None: # Return type updated to None
# Recreate the Config object to parse environment variables at the time of initialization
# This allows re-init with new env vars if needed, though true singletons usually init once.
self.config = Config()
self.configure(**kwargs)

if self.initialized and kwargs.get("api_key") != self.config.api_key:
logger.warning("AgentOps Client being re-initialized with a different API key. This is unusual.")
# Reset initialization status to allow re-init with new key/config
self._initialized = False
if self._init_trace_context and self._init_trace_context.span.is_recording():
logger.warning("Ending previously auto-started trace due to re-initialization.")
TracingCore.get_instance().end_trace(self._init_trace_context, "Reinitialized")
self._init_trace_context = None
self._legacy_session_for_init_trace = None

if self.initialized:
logger.debug("AgentOps Client already initialized.")
# If auto_start_session was true, return the existing legacy session wrapper
if self.config.auto_start_session:
return self._legacy_session_for_init_trace
return None # If not auto-starting, and already initialized, return None

if not self.config.api_key:
raise NoApiKeyException

# TODO we may need to initialize logging before importing OTEL to capture all
configure_logging(self.config)
intercept_opentelemetry_logging()

self.api = ApiClient(self.config.endpoint)

# Prefetch JWT token if enabled
# TODO: Move this validation somewhere else (and integrate with self.config.prefetch_jwt_token once we have a solution to that)
response = self.api.v3.fetch_auth_token(self.config.api_key)
if response is None:
return
# If auth fails, we cannot proceed with TracingCore initialization that depends on project_id
logger.error("Failed to fetch auth token. AgentOps SDK will not be initialized.")
return None # Explicitly return None if auth fails

# Save the bearer for use with the v4 API
self.api.v4.set_auth_token(response["token"])

# Initialize TracingCore with the current configuration and project_id
tracing_config = self.config.dict()
tracing_config["project_id"] = response["project_id"]

TracingCore.initialize_from_config(tracing_config, jwt=response["token"])
tracing_core = TracingCore.get_instance()
tracing_core.initialize_from_config(tracing_config, jwt=response["token"])

# Instrument LLM calls if enabled
if self.config.instrument_llm_calls:
instrument_all()

self.initialized = True
# self._initialized = True # Set initialized to True here - MOVED to after trace start attempt

# Register a single global atexit handler for session management
global _atexit_registered
if not _atexit_registered:
atexit.register(_end_active_session)
atexit.register(_end_init_trace_atexit) # Register new atexit handler
_atexit_registered = True

# Start a session if auto_start_session is True
session = None
# Auto-start trace if configured
if self.config.auto_start_session:
from agentops.legacy import start_session

# Pass default_tags if they exist
if self.config.default_tags:
session = start_session(tags=list(self.config.default_tags))
else:
session = start_session()

# Register this session globally
global _active_session
_active_session = session

return session

def configure(self, **kwargs):
if self._init_trace_context is None or not self._init_trace_context.span.is_recording():
logger.debug("Auto-starting init trace.")
self._init_trace_context = tracing_core.start_trace(
trace_name="default",
tags=list(self.config.default_tags) if self.config.default_tags else None,
is_init_trace=True,
)
if self._init_trace_context:
self._legacy_session_for_init_trace = LegacySession(self._init_trace_context)

# For backward compatibility, also update the global references in legacy and client modules
# These globals are what old code might have been using via agentops.legacy.get_session() or similar indirect access.
global _client_init_trace_context, _client_legacy_session_for_init_trace
_client_init_trace_context = self._init_trace_context
_client_legacy_session_for_init_trace = self._legacy_session_for_init_trace

# Update legacy module's _current_session and _current_trace_context
# This is tricky; direct access to another module's globals is not ideal.
# Prefer explicit calls if possible, but for maximum BC:
try:
import agentops.legacy

agentops.legacy._current_session = self._legacy_session_for_init_trace
agentops.legacy._current_trace_context = self._init_trace_context
except ImportError:
pass # Should not happen

else:
logger.error("Failed to start the auto-init trace.")
# Even if auto-start fails, core services up to TracingCore might be initialized.
# Set self.initialized to True if TracingCore is up, but return None.
self._initialized = tracing_core.initialized
return None # Failed to start trace

self._initialized = True # Successfully initialized and auto-trace started (if configured)
# Do not return the init_trace_context or its session wrapper to the user from init()
return None # As per requirements, init() doesn't return the auto-started trace object
else:
logger.debug("Auto-start session is disabled. No init trace started by client.")
self._initialized = True # Successfully initialized, just no auto-trace
return None # No auto-session, so return None

def configure(self, **kwargs: Any) -> None:
"""Update client configuration"""
self.config.configure(**kwargs)

Expand All @@ -120,10 +177,29 @@ def initialized(self) -> bool:
return self._initialized

@initialized.setter
def initialized(self, value: bool):
def initialized(self, value: bool) -> None:
if self._initialized and self._initialized != value:
raise ValueError("Client already initialized")
# Allow re-setting to False if we are intentionally re-initializing
# This logic is now partly in init() to handle re-init cases
pass
self._initialized = value

# ------------------------------------------------------------
__instance = None
# Remove the old __instance = None at the end of the class definition if it's a repeat
# __instance = None # This was a class variable, should be defined once

# Make _init_trace_context and _legacy_session_for_init_trace accessible
# to the atexit handler if it becomes a static/class method or needs access
# For now, the atexit handler is global and uses global vars copied from these.

# Deprecate and remove the old global _active_session from this module.
# Consumers should use agentops.start_trace() or rely on the auto-init trace.
# For a transition, the auto-init trace's legacy wrapper is set to legacy module's globals.


# Ensure the global _active_session (if needed for some very old compatibility) points to the client's legacy session for init trace.
# This specific global _active_session in client.py is problematic and should be phased out.
# For now, _client_legacy_session_for_init_trace is the primary global for the auto-init trace's legacy Session.

# Remove the old global _active_session defined at the top of this file if it's no longer the primary mechanism.
# The new globals _client_init_trace_context and _client_legacy_session_for_init_trace handle the auto-init trace.
6 changes: 3 additions & 3 deletions agentops/helpers/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Helpers for interacting with the AgentOps dashboard.
"""

from typing import Union
from typing import Union, Optional
from termcolor import colored
from opentelemetry.sdk.trace import Span, ReadableSpan
from agentops.logging import logger
Expand Down Expand Up @@ -33,12 +33,12 @@ def get_trace_url(span: Union[Span, ReadableSpan]) -> str:
return f"{app_url}/sessions?trace_id={trace_id}"


def log_trace_url(span: Union[Span, ReadableSpan]) -> None:
def log_trace_url(span: Union[Span, ReadableSpan], title: Optional[str] = None) -> None:
"""
Log the trace URL for the AgentOps dashboard.

Args:
span: The span to log the URL for.
"""
session_url = get_trace_url(span)
logger.info(colored(f"\x1b[34mSession Replay: {session_url}\x1b[0m", "blue"))
logger.info(colored(f"\x1b[34mSession Replay for {title} trace: {session_url}\x1b[0m", "blue"))
Loading
Loading