diff --git a/agentops/CURRENT_FLOW.md b/agentops/CURRENT_FLOW.md new file mode 100644 index 000000000..335d7a2a5 --- /dev/null +++ b/agentops/CURRENT_FLOW.md @@ -0,0 +1,67 @@ +## 1. Current Architecture Flow + +```mermaid +flowchart TB + subgraph Client["Client Singleton"] + Config["Configuration"] + direction TB + Client_API["Client API Layer"] + LLMTracker["LLM Tracker"] + end + + subgraph Sessions["Session Management"] + Session["Session Class"] + SessionManager["SessionManager"] + LogCapture["LogCapture"] + SessionAPI["SessionApiClient"] + end + + subgraph Events["Event System"] + Event["Base Event"] + LLMEvent["LLMEvent"] + ActionEvent["ActionEvent"] + ToolEvent["ToolEvent"] + ErrorEvent["ErrorEvent"] + end + + subgraph Telemetry["Current Telemetry"] + SessionTelemetry["SessionTelemetry"] + OTELTracer["OTEL Tracer"] + SessionExporter["SessionExporter"] + BatchProcessor["BatchSpanProcessor"] + end + + subgraph Providers["LLM Providers"] + InstrumentedProvider["InstrumentedProvider"] + OpenAIProvider["OpenAIProvider"] + AnthropicProvider["AnthropicProvider"] + end + + %% Client Relationships + Client_API -->|initializes| Session + Client_API -->|configures| LLMTracker + LLMTracker -->|instruments| Providers + + %% Session Direct Dependencies + Session -->|creates| SessionManager + Session -->|creates| SessionTelemetry + Session -->|creates| LogCapture + Session -->|owns| SessionAPI + + %% Event Flow + InstrumentedProvider -->|creates| LLMEvent + InstrumentedProvider -->|requires| Session + Session -->|records| Event + SessionManager -->|processes| Event + SessionTelemetry -->|converts to spans| Event + + %% Telemetry Flow + SessionTelemetry -->|uses| OTELTracer + OTELTracer -->|sends to| BatchProcessor + BatchProcessor -->|exports via| SessionExporter + SessionExporter -->|uses| SessionAPI + + %% Problem Areas + style Session fill:#f77,stroke:#333 + style InstrumentedProvider fill:#f77,stroke:#333 + style SessionTelemetry fill:#f77,stroke:#333 diff --git a/agentops/PROPOSED_FLOW.md b/agentops/PROPOSED_FLOW.md new file mode 100644 index 000000000..2a1aab1bb --- /dev/null +++ b/agentops/PROPOSED_FLOW.md @@ -0,0 +1,92 @@ +```mermaid +flowchart TB + subgraph Client["Client Singleton"] + Config["Configuration"] + direction TB + Client_API["Client API Layer"] + InstrumentationManager["Instrumentation Manager"] + end + + subgraph Sessions["Session Management"] + Session["Session Class"] + SessionManager["SessionManager"] + LogCapture["LogCapture"] + SessionAPI["SessionApiClient"] + end + + subgraph Events["Event System"] + Event["Base Event"] + LLMEvent["LLMEvent"] + ActionEvent["ActionEvent"] + ToolEvent["ToolEvent"] + ErrorEvent["ErrorEvent"] + end + + subgraph Telemetry["Enhanced Telemetry"] + TelemetryManager["TelemetryManager"] + OTELTracer["OTEL Tracer"] + + subgraph Exporters["Exporters"] + SessionExporter["SessionExporter"] + OTLPExporter["OTLP Exporter"] + end + + subgraph Processors["Processors"] + BatchProcessor["BatchProcessor"] + SamplingProcessor["SamplingProcessor"] + end + end + + subgraph Providers["LLM Providers"] + BaseInstrumentation["BaseInstrumentation"] + InstrumentedProvider["InstrumentedProvider"] + OpenAIProvider["OpenAIProvider"] + AnthropicProvider["AnthropicProvider"] + end + + subgraph Context["Context Management"] + TraceContext["TraceContext"] + ContextPropagation["ContextPropagation"] + end + + %% Client Relationships + Client_API -->|initializes| Session + Client_API -->|configures| InstrumentationManager + InstrumentationManager -->|manages| BaseInstrumentation + + %% Session Dependencies + Session -->|creates| SessionManager + Session -->|uses| TelemetryManager + Session -->|creates| LogCapture + Session -->|owns| SessionAPI + + %% Event Flow + InstrumentedProvider -->|creates| LLMEvent + InstrumentedProvider -->|requires| Session + Session -->|records| Event + SessionManager -->|processes| Event + + %% Telemetry Flow + TelemetryManager -->|manages| OTELTracer + TelemetryManager -->|uses| TraceContext + OTELTracer -->|uses| Processors + Processors -->|send to| Exporters + + %% Provider Structure + BaseInstrumentation -->|extends| InstrumentedProvider + InstrumentedProvider -->|implements| OpenAIProvider + InstrumentedProvider -->|implements| AnthropicProvider + + %% Context Flow + ContextPropagation -->|enriches| Event + TraceContext -->|propagates to| SessionAPI + + %% Highlight New/Changed Components + style InstrumentationManager fill:#90EE90,stroke:#333 + style TelemetryManager fill:#90EE90,stroke:#333 + style BaseInstrumentation fill:#90EE90,stroke:#333 + style TraceContext fill:#90EE90,stroke:#333 + style ContextPropagation fill:#90EE90,stroke:#333 + style OTLPExporter fill:#90EE90,stroke:#333 + style SamplingProcessor fill:#90EE90,stroke:#333 +``` diff --git a/agentops/api/base.py b/agentops/api/base.py new file mode 100644 index 000000000..297b6c15f --- /dev/null +++ b/agentops/api/base.py @@ -0,0 +1,77 @@ +from typing import Optional, Dict, Any +import requests +from requests.adapters import HTTPAdapter +from urllib3.util import Retry + +from ..exceptions import ApiServerException + + +class ApiClient: + """Base class for API communication with connection pooling""" + + _session: Optional[requests.Session] = None + + @classmethod + def get_session(cls) -> requests.Session: + """Get or create the global session with optimized connection pooling""" + if cls._session is None: + cls._session = requests.Session() + + # Configure connection pooling + adapter = HTTPAdapter( + pool_connections=15, + pool_maxsize=256, + max_retries=Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]), + ) + + # Mount adapter for both HTTP and HTTPS + cls._session.mount("http://", adapter) + cls._session.mount("https://", adapter) + + # Set default headers + cls._session.headers.update( + { + "Connection": "keep-alive", + "Keep-Alive": "timeout=10, max=1000", + "Content-Type": "application/json", + } + ) + + return cls._session + + def __init__(self, endpoint: str): + self.endpoint = endpoint + + def _prepare_headers( + self, + api_key: Optional[str] = None, + parent_key: Optional[str] = None, + jwt: Optional[str] = None, + custom_headers: Optional[Dict[str, str]] = None, + ) -> Dict[str, str]: + """Prepare headers for the request""" + headers = {"Content-Type": "application/json; charset=UTF-8", "Accept": "*/*"} + + if api_key: + headers["X-Agentops-Api-Key"] = api_key + + if parent_key: + headers["X-Agentops-Parent-Key"] = parent_key + + if jwt: + headers["Authorization"] = f"Bearer {jwt}" + + if custom_headers: + # Don't let custom headers override critical headers + safe_headers = custom_headers.copy() + for protected in ["Authorization", "X-Agentops-Api-Key", "X-Agentops-Parent-Key"]: + safe_headers.pop(protected, None) + headers.update(safe_headers) + + return headers + + def post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response: + """Make POST request""" + url = f"{self.endpoint}{path}" + session = self.get_session() + return session.post(url, json=data, headers=headers) diff --git a/agentops/api/session.py b/agentops/api/session.py new file mode 100644 index 000000000..4875178a6 --- /dev/null +++ b/agentops/api/session.py @@ -0,0 +1,84 @@ +from typing import Dict, List, Optional, Tuple, Union, Any +from uuid import UUID +import requests + +from .base import ApiClient +from ..exceptions import ApiServerException +from ..helpers import safe_serialize +from ..log_config import logger +from ..event import Event + + +class SessionApiClient(ApiClient): + """Handles API communication for sessions""" + + def __init__(self, endpoint: str, session_id: UUID, api_key: str, jwt: Optional[str] = None): + super().__init__(endpoint) + self.session_id = session_id + self.api_key = api_key + self.jwt = jwt + + def create_session( + self, session_data: Dict[str, Any], parent_key: Optional[str] = None + ) -> Tuple[bool, Optional[str]]: + """Create a new session""" + try: + headers = self._prepare_headers( + api_key=self.api_key, parent_key=parent_key, custom_headers={"X-Session-ID": str(self.session_id)} + ) + + res = self.post("/v2/create_session", {"session": session_data}, headers) + jwt = res.json().get("jwt") + return bool(jwt), jwt + + except ApiServerException as e: + logger.error(f"Could not create session - {e}") + return False, None + + def update_session(self, session_data: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: + """Update session state""" + try: + headers = self._prepare_headers( + api_key=self.api_key, jwt=self.jwt, custom_headers={"X-Session-ID": str(self.session_id)} + ) + + res = self.post("/v2/update_session", {"session": session_data or {}}, headers) + return res.json() + + except ApiServerException as e: + logger.error(f"Could not update session - {e}") + return None + + def create_events(self, events: List[Dict[str, Any]]) -> bool: + """Send events to API""" + try: + headers = self._prepare_headers( + api_key=self.api_key, jwt=self.jwt, custom_headers={"X-Session-ID": str(self.session_id)} + ) + + res = self.post("/v2/create_events", {"events": events}, headers) + return res.status_code == 200 + + except ApiServerException as e: + logger.error(f"Could not create events - {e}") + return False + + def create_agent(self, name: str, agent_id: str) -> bool: + """Create a new agent""" + try: + headers = self._prepare_headers( + api_key=self.api_key, jwt=self.jwt, custom_headers={"X-Session-ID": str(self.session_id)} + ) + + res = self.post("/v2/create_agent", {"id": agent_id, "name": name}, headers) + return res.status_code == 200 + + except ApiServerException as e: + logger.error(f"Could not create agent - {e}") + return False + + def _post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response: + """Make POST request""" + url = f"{self.endpoint}{path}" + session = self.get_session() + return session.post(url, json=data, headers=headers) diff --git a/agentops/client.py b/agentops/client.py index fb3e17937..eed85f454 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -179,7 +179,7 @@ def get_default_tags(self) -> List[str]: """ return list(self._config.default_tags) - def record(self, event: Union[Event, ErrorEvent]) -> None: + def record(self, event: Event) -> None: """ Record an event with the AgentOps service. @@ -198,47 +198,31 @@ def start_session( self, tags: Optional[List[str]] = None, inherited_session_id: Optional[str] = None, - ) -> Union[Session, None]: - """ - Start a new session for recording events. - - Args: - tags (List[str], optional): Tags that can be used for grouping or sorting later. - e.g. ["test_run"]. - config: (Configuration, optional): Client configuration object - inherited_session_id (optional, str): assign session id to match existing Session - """ + ) -> Optional[Session]: + """Start a new session""" if not self.is_initialized: - return - - if inherited_session_id is not None: - try: - session_id = UUID(inherited_session_id) - except ValueError: - return logger.warning(f"Invalid session id: {inherited_session_id}") - else: - session_id = uuid4() + return None - session_tags = self._config.default_tags.copy() - if tags is not None: - session_tags.update(tags) + try: + session_id = UUID(inherited_session_id) if inherited_session_id else uuid4() + except ValueError: + return logger.warning(f"Invalid session id: {inherited_session_id}") + default_tags = list(self._config.default_tags) if self._config.default_tags else [] session = Session( session_id=session_id, - tags=list(session_tags), - host_env=self.host_env, config=self._config, + tags=tags or default_tags, + host_env=self.host_env, ) - if not session.is_running: - return logger.error("Failed to start session") - - if self._pre_init_queue["agents"] and len(self._pre_init_queue["agents"]) > 0: - for agent_args in self._pre_init_queue["agents"]: - session.create_agent(name=agent_args["name"], agent_id=agent_args["agent_id"]) - self._pre_init_queue["agents"] = [] + if session.is_running: + # Process any queued agents + if self._pre_init_queue["agents"]: + for agent_args in self._pre_init_queue["agents"]: + session.create_agent(name=agent_args["name"], agent_id=agent_args["agent_id"]) + self._pre_init_queue["agents"].clear() - self._sessions.append(session) return session def end_session( diff --git a/agentops/event.py b/agentops/event.py index c6200aca1..50e669078 100644 --- a/agentops/event.py +++ b/agentops/event.py @@ -25,6 +25,7 @@ class Event: end_timestamp(str): A timestamp indicating when the event ended. Defaults to the time when this Event was instantiated. agent_id(UUID, optional): The unique identifier of the agent that triggered the event. id(UUID): A unique identifier for the event. Defaults to a new UUID. + session_id(UUID, optional): The unique identifier of the session that the event belongs to. foo(x=1) { ... @@ -43,6 +44,7 @@ class Event: end_timestamp: Optional[str] = None agent_id: Optional[UUID] = field(default_factory=check_call_stack_for_agent_id) id: UUID = field(default_factory=uuid4) + session_id: Optional[UUID] = None @dataclass @@ -105,7 +107,7 @@ class ToolEvent(Event): @dataclass -class ErrorEvent: +class ErrorEvent(Event): """ For recording any errors e.g. ones related to agent execution @@ -115,21 +117,31 @@ class ErrorEvent: code(str, optional): A code that can be used to identify the error e.g. 501. details(str, optional): Detailed information about the error. logs(str, optional): For detailed information/logging related to the error. - timestamp(str): A timestamp indicating when the error occurred. Defaults to the time when this ErrorEvent was instantiated. - """ + # Inherit common Event fields + event_type: str = field(default=EventType.ERROR.value) + + # Error-specific fields trigger_event: Optional[Event] = None exception: Optional[BaseException] = None error_type: Optional[str] = None code: Optional[str] = None details: Optional[Union[str, Dict[str, str]]] = None logs: Optional[str] = field(default_factory=traceback.format_exc) - timestamp: str = field(default_factory=get_ISO_time) def __post_init__(self): - self.event_type = EventType.ERROR.value + """Process exception if provided""" if self.exception: self.error_type = self.error_type or type(self.exception).__name__ self.details = self.details or str(self.exception) self.exception = None # removes exception from serialization + + # Ensure end timestamp is set + if not self.end_timestamp: + self.end_timestamp = get_ISO_time() + + @property + def timestamp(self) -> str: + """Maintain backward compatibility with old code expecting timestamp""" + return self.init_timestamp diff --git a/agentops/host_env.py b/agentops/host_env.py index 5307dec4a..d3f798b72 100644 --- a/agentops/host_env.py +++ b/agentops/host_env.py @@ -100,9 +100,9 @@ def get_ram_details(): try: ram_info = psutil.virtual_memory() return { - "Total": f"{ram_info.total / (1024 ** 3):.2f} GB", - "Available": f"{ram_info.available / (1024 ** 3):.2f} GB", - "Used": f"{ram_info.used / (1024 ** 3):.2f} GB", + "Total": f"{ram_info.total / (1024**3):.2f} GB", + "Available": f"{ram_info.available / (1024**3):.2f} GB", + "Used": f"{ram_info.used / (1024**3):.2f} GB", "Percentage": f"{ram_info.percent}%", } except: diff --git a/agentops/http_client.py b/agentops/http_client.py deleted file mode 100644 index 11c0bf49f..000000000 --- a/agentops/http_client.py +++ /dev/null @@ -1,208 +0,0 @@ -from enum import Enum -from typing import Optional, Dict, Any - -import requests -from requests.adapters import HTTPAdapter, Retry -import json - -from .exceptions import ApiServerException - -JSON_HEADER = {"Content-Type": "application/json; charset=UTF-8", "Accept": "*/*"} - -retry_config = Retry(total=5, backoff_factor=0.1) - - -class HttpStatus(Enum): - SUCCESS = 200 - INVALID_REQUEST = 400 - INVALID_API_KEY = 401 - TIMEOUT = 408 - PAYLOAD_TOO_LARGE = 413 - TOO_MANY_REQUESTS = 429 - FAILED = 500 - UNKNOWN = -1 - - -class Response: - def __init__(self, status: HttpStatus = HttpStatus.UNKNOWN, body: Optional[dict] = None): - self.status: HttpStatus = status - self.code: int = status.value - self.body = body if body else {} - - def parse(self, res: requests.models.Response): - res_body = res.json() - self.code = res.status_code - self.status = self.get_status(self.code) - self.body = res_body - return self - - @staticmethod - def get_status(code: int) -> HttpStatus: - if 200 <= code < 300: - return HttpStatus.SUCCESS - elif code == 429: - return HttpStatus.TOO_MANY_REQUESTS - elif code == 413: - return HttpStatus.PAYLOAD_TOO_LARGE - elif code == 408: - return HttpStatus.TIMEOUT - elif code == 401: - return HttpStatus.INVALID_API_KEY - elif 400 <= code < 500: - return HttpStatus.INVALID_REQUEST - elif code >= 500: - return HttpStatus.FAILED - return HttpStatus.UNKNOWN - - -class HttpClient: - _session: Optional[requests.Session] = None - - @classmethod - def get_session(cls) -> requests.Session: - """Get or create the global session with optimized connection pooling""" - if cls._session is None: - cls._session = requests.Session() - - # Configure connection pooling - adapter = requests.adapters.HTTPAdapter( - pool_connections=15, # Number of connection pools - pool_maxsize=256, # Connections per pool - max_retries=Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]), - ) - - # Mount adapter for both HTTP and HTTPS - cls._session.mount("http://", adapter) - cls._session.mount("https://", adapter) - - # Set default headers - cls._session.headers.update( - { - "Connection": "keep-alive", - "Keep-Alive": "timeout=10, max=1000", - "Content-Type": "application/json", - } - ) - - return cls._session - - @classmethod - def _prepare_headers( - cls, - api_key: Optional[str] = None, - parent_key: Optional[str] = None, - jwt: Optional[str] = None, - custom_headers: Optional[dict] = None, - ) -> dict: - """Prepare headers for the request""" - headers = JSON_HEADER.copy() - - if api_key is not None: - headers["X-Agentops-Api-Key"] = api_key - - if parent_key is not None: - headers["X-Agentops-Parent-Key"] = parent_key - - if jwt is not None: - headers["Authorization"] = f"Bearer {jwt}" - - if custom_headers is not None: - headers.update(custom_headers) - - return headers - - @classmethod - def post( - cls, - url: str, - payload: bytes, - api_key: Optional[str] = None, - parent_key: Optional[str] = None, - jwt: Optional[str] = None, - header: Optional[Dict[str, str]] = None, - ) -> Response: - """Make HTTP POST request using connection pooling""" - result = Response() - try: - headers = cls._prepare_headers(api_key, parent_key, jwt, header) - session = cls.get_session() - res = session.post(url, data=payload, headers=headers, timeout=20) - result.parse(res) - - except requests.exceptions.Timeout: - result.code = 408 - result.status = HttpStatus.TIMEOUT - raise ApiServerException("Could not reach API server - connection timed out") - except requests.exceptions.HTTPError as e: - try: - result.parse(e.response) - except Exception: - result = Response() - result.code = e.response.status_code - result.status = Response.get_status(e.response.status_code) - result.body = {"error": str(e)} - raise ApiServerException(f"HTTPError: {e}") - except requests.exceptions.RequestException as e: - result.body = {"error": str(e)} - raise ApiServerException(f"RequestException: {e}") - - if result.code == 401: - raise ApiServerException( - f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" - ) - if result.code == 400: - if "message" in result.body: - raise ApiServerException(f"API server: {result.body['message']}") - else: - raise ApiServerException(f"API server: {result.body}") - if result.code == 500: - raise ApiServerException("API server: - internal server error") - - return result - - @classmethod - def get( - cls, - url: str, - api_key: Optional[str] = None, - jwt: Optional[str] = None, - header: Optional[Dict[str, str]] = None, - ) -> Response: - """Make HTTP GET request using connection pooling""" - result = Response() - try: - headers = cls._prepare_headers(api_key, None, jwt, header) - session = cls.get_session() - res = session.get(url, headers=headers, timeout=20) - result.parse(res) - - except requests.exceptions.Timeout: - result.code = 408 - result.status = HttpStatus.TIMEOUT - raise ApiServerException("Could not reach API server - connection timed out") - except requests.exceptions.HTTPError as e: - try: - result.parse(e.response) - except Exception: - result = Response() - result.code = e.response.status_code - result.status = Response.get_status(e.response.status_code) - result.body = {"error": str(e)} - raise ApiServerException(f"HTTPError: {e}") - except requests.exceptions.RequestException as e: - result.body = {"error": str(e)} - raise ApiServerException(f"RequestException: {e}") - - if result.code == 401: - raise ApiServerException( - f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" - ) - if result.code == 400: - if "message" in result.body: - raise ApiServerException(f"API server: {result.body['message']}") - else: - raise ApiServerException(f"API server: {result.body}") - if result.code == 500: - raise ApiServerException("API server: - internal server error") - - return result diff --git a/agentops/llms/README.md b/agentops/llms/README.md new file mode 100644 index 000000000..fc8b03cc3 --- /dev/null +++ b/agentops/llms/README.md @@ -0,0 +1,98 @@ +# AgentOps LLM Tracking System + +This module provides instrumentation for various LLM providers to track and analyze their usage. It supports multiple providers including OpenAI, Anthropic, Cohere, Groq, Mistral, and others. + +## Architecture + +```mermaid +graph TD + A[LlmTracker] -->|creates & manages| B[OpenAiProvider] + A -->|creates & manages| C[AnthropicProvider] + A -->|creates & manages| D[CohereProvider] + A -->|creates & manages| E[Other Providers...] + + B -->|inherits from| F[InstrumentedProvider] + C -->|inherits from| F + D -->|inherits from| F + E -->|inherits from| F + + F -->|records to| G[AgentOps Client/Session] +``` + +## Key Components + +### LlmTracker (tracker.py) +The orchestrator that manages instrumentation across different LLM providers: +- Detects installed LLM packages +- Verifies version compatibility +- Initializes provider-specific instrumentation +- Provides methods to start/stop tracking + +```python +from agentops import AgentOps +from agentops.llms.tracker import LlmTracker + +client = AgentOps(api_key="your-key") +tracker = LlmTracker(client) + +# Start tracking +tracker.override_api() + +# Your LLM calls will now be tracked... + +# Stop tracking +tracker.stop_instrumenting() +``` + +### InstrumentedProvider (instrumented_provider.py) +Abstract base class that defines the interface for provider-specific implementations: +- Provides base implementation for tracking +- Defines required methods for all providers +- Handles event recording + +## Supported Providers + +| Provider | Minimum Version | Tracked Methods | +|----------|----------------|-----------------| +| OpenAI | 1.0.0 | chat.completions.create, assistants API | +| Anthropic | 0.32.0 | completions.create | +| Cohere | 5.4.0 | chat, chat_stream | +| Groq | 0.9.0 | Client.chat, AsyncClient.chat | +| Mistral | 1.0.1 | chat.complete, chat.stream | +| LiteLLM | 1.3.1 | openai_chat_completions.completion | +| ... | ... | ... | + +## Adding New Providers + +To add support for a new LLM provider: + +1. Create a new provider class in `providers/`: +```python +from .instrumented_provider import InstrumentedProvider + +class NewProvider(InstrumentedProvider): + _provider_name = "NewProvider" + + def override(self): + # Implementation + pass + + def undo_override(self): + # Implementation + pass + + def handle_response(self, response, kwargs, init_timestamp, session=None): + # Implementation + pass +``` + +2. Add provider configuration to `SUPPORTED_APIS` in `tracker.py` +3. Add provider to `stop_instrumenting()` method + +## Event Recording + +Events can be recorded to either: +- An active session (if provided) +- Directly to the AgentOps client + +The `_safe_record()` method handles this routing automatically. diff --git a/agentops/llms/providers/instrumented_provider.py b/agentops/llms/providers/instrumented_provider.py index 9b069c535..48b5ac4f6 100644 --- a/agentops/llms/providers/instrumented_provider.py +++ b/agentops/llms/providers/instrumented_provider.py @@ -1,14 +1,14 @@ from abc import ABC, abstractmethod from typing import Optional -from agentops.session import Session +from agentops import Client, Session from agentops.event import LLMEvent class InstrumentedProvider(ABC): _provider_name: str = "InstrumentedModel" llm_event: Optional[LLMEvent] = None - client = None + client: Client def __init__(self, client): self.client = client @@ -29,8 +29,15 @@ def undo_override(self): def provider_name(self): return self._provider_name - def _safe_record(self, session, event): - if session is not None: + def _safe_record(self, session: Session, event: LLMEvent) -> None: + """ + Safely record an event either to a session or directly to the client. + + Args: + session: Session object to record the event to + event: The LLMEvent to record, since we're inside the llms/ domain + """ + if isinstance(sessino, Session): session.record(event) else: self.client.record(event) diff --git a/agentops/llms/providers/ollama.py b/agentops/llms/providers/ollama.py index e944469c9..42c682865 100644 --- a/agentops/llms/providers/ollama.py +++ b/agentops/llms/providers/ollama.py @@ -26,7 +26,7 @@ def handle_stream_chunk(chunk: dict): if chunk.get("done"): llm_event.end_timestamp = get_ISO_time() - llm_event.model = f'ollama/{chunk.get("model")}' + llm_event.model = f"ollama/{chunk.get('model')}" llm_event.returns = chunk llm_event.returns["message"] = llm_event.completion llm_event.prompt = kwargs["messages"] @@ -53,7 +53,7 @@ def generator(): return generator() llm_event.end_timestamp = get_ISO_time() - llm_event.model = f'ollama/{response["model"]}' + llm_event.model = f"ollama/{response['model']}" llm_event.returns = response llm_event.agent_id = check_call_stack_for_agent_id() llm_event.prompt = kwargs["messages"] diff --git a/agentops/llms/tracker.py b/agentops/llms/tracker.py index 3609354f5..93dee0e6a 100644 --- a/agentops/llms/tracker.py +++ b/agentops/llms/tracker.py @@ -23,6 +23,18 @@ class LlmTracker: + """ + A class that handles the instrumentation of various LLM provider APIs to track and record their usage. + + This tracker supports multiple LLM providers including OpenAI, Anthropic, Cohere, Groq, Mistral, + and others. It patches their API methods to collect telemetry data while maintaining the original + functionality. + + Attributes: + SUPPORTED_APIS (dict): A mapping of supported API providers to their versions and tracked methods. + client: The AgentOps client instance used for telemetry collection. + """ + SUPPORTED_APIS = { "litellm": {"1.3.1": ("openai_chat_completions.completion",)}, "openai": { @@ -94,11 +106,29 @@ class LlmTracker: } def __init__(self, client): + """ + Initialize the LlmTracker with an AgentOps client. + + Args: + client: The AgentOps client instance used for collecting and sending telemetry data. + """ self.client = client def override_api(self): """ - Overrides key methods of the specified API to record events. + Overrides key methods of the supported LLM APIs to record events. + + This method checks for installed LLM packages and their versions, then applies + the appropriate instrumentation based on the provider and version. It patches + API methods to collect telemetry while maintaining their original functionality. + + For each supported provider: + 1. Checks if the package is installed + 2. Verifies version compatibility + 3. Initializes the appropriate provider class + 4. Applies the instrumentation + + If using LiteLLM, only patches LiteLLM methods and skips patching underlying providers. """ for api in self.SUPPORTED_APIS: @@ -211,6 +241,13 @@ def override_api(self): logger.warning(f"Only TaskWeaver>=0.0.1 supported. v{module_version} found.") def stop_instrumenting(self): + """ + Removes all API instrumentation and restores original functionality. + + This method undoes all the patches applied by override_api(), returning + each provider's methods to their original state. This is useful for cleanup + or when you need to temporarily disable tracking. + """ OpenAiProvider(self.client).undo_override() GroqProvider(self.client).undo_override() CohereProvider(self.client).undo_override() diff --git a/agentops/meta_client.py b/agentops/meta_client.py index 6cc7ed2ef..d41a8657b 100644 --- a/agentops/meta_client.py +++ b/agentops/meta_client.py @@ -2,7 +2,7 @@ import traceback from .host_env import get_host_env -from .http_client import HttpClient +from .api.base import ApiClient from .helpers import safe_serialize, get_agentops_version from os import environ diff --git a/agentops/partners/autogen_logger.py b/agentops/partners/autogen_logger.py index e35b04c80..e37bff7d6 100644 --- a/agentops/partners/autogen_logger.py +++ b/agentops/partners/autogen_logger.py @@ -1,26 +1,24 @@ from __future__ import annotations -import logging import threading import uuid -from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union, TypeVar, Callable +from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable, Optional import agentops from openai import AzureOpenAI, OpenAI from openai.types.chat import ChatCompletion - from autogen.logger.base_logger import BaseLogger, LLMConfig from agentops.enums import EndState from agentops.helpers import get_ISO_time - from agentops import LLMEvent, ToolEvent, ActionEvent +from agentops.log_config import logger from uuid import uuid4 if TYPE_CHECKING: from autogen import Agent, ConversableAgent, OpenAIWrapper + from agentops import Session -logger = logging.getLogger(__name__) lock = threading.Lock() __all__ = ("AutogenLogger",) @@ -41,6 +39,7 @@ def _get_agentops_id_from_agent(self, autogen_id: str) -> str: for agent in self.agent_store: if agent["autogen_id"] == autogen_id: return agent["agentops_id"] + return None def log_chat_completion( self, @@ -55,45 +54,73 @@ def log_chat_completion( start_time: str, ) -> None: """Records an LLMEvent to AgentOps session""" - - completion = response.choices[len(response.choices) - 1] - - # Note: Autogen tokens are not included in the request and function call tokens are not counted in the completion - llm_event = LLMEvent( - prompt=request["messages"], - completion=completion.message.to_dict(), - model=response.model, - cost=cost, - returns=completion.message.to_json(), - ) - llm_event.init_timestamp = start_time - llm_event.end_timestamp = get_ISO_time() - llm_event.agent_id = self._get_agentops_id_from_agent(str(id(agent))) - agentops.record(llm_event) + try: + completion = response.choices[len(response.choices) - 1] + # Note: Autogen tokens are not included in the request and function call tokens are not counted in the completion + llm_event = LLMEvent( + prompt=request["messages"], + completion=completion.message.to_dict(), + model=response.model, + cost=cost, + returns=completion.message.to_json(), + ) + llm_event.init_timestamp = start_time + llm_event.end_timestamp = get_ISO_time() + llm_event.agent_id = self._get_agentops_id_from_agent(str(id(agent))) + + agentops.record(llm_event) + except Exception as e: + logger.error(f"❌ Failed to record LLM event: {str(e)}") + raise def log_new_agent(self, agent: ConversableAgent, init_args: Dict[str, Any]) -> None: - """Calls agentops.create_agent""" - ao_agent_id = agentops.create_agent(agent.name, str(uuid4())) - self.agent_store.append({"agentops_id": ao_agent_id, "autogen_id": str(id(agent))}) + """Creates agent in current session""" + try: + ao_agent_id = agentops.create_agent(agent.name, str(uuid4())) + self.agent_store.append({"agentops_id": ao_agent_id, "autogen_id": str(id(agent))}) + except Exception as e: + logger.error(f"❌ Failed to create agent {agent.name}: {str(e)}") + raise def log_event(self, source: Union[str, Agent], name: str, **kwargs: Dict[str, Any]) -> None: """Records an ActionEvent to AgentOps session""" - event = ActionEvent(action_type=name) - agentops_id = self._get_agentops_id_from_agent(str(id(source))) - event.agent_id = agentops_id - event.params = kwargs - agentops.record(event) + try: + returns = None + if "reply" in kwargs: + returns = kwargs["reply"] + elif "message" in kwargs: + returns = kwargs["message"] + + event = ActionEvent( + agent_id=self._get_agentops_id_from_agent(str(id(source))), + action_type=name, + params=kwargs, + returns=returns, + end_timestamp=get_ISO_time(), + ) + + with lock: + agentops.record(event) + except Exception as e: + logger.error(f"❌ Failed to record action event: {str(e)}") + raise def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: any): """Records a ToolEvent to AgentOps session""" - event = ToolEvent() - agentops_id = self._get_agentops_id_from_agent(str(id(source))) - event.agent_id = agentops_id - event.function = function # TODO: this is not a parameter - event.params = args - event.returns = returns - event.name = getattr(function, "__name__") - agentops.record(event) + try: + function_name = getattr(function, "__name__", str(function)) + event = ToolEvent( + agent_id=self._get_agentops_id_from_agent(str(id(source))), + name=function_name, + params=args, + returns=returns, + end_timestamp=get_ISO_time(), + ) + with lock: + agentops.record(event) + except Exception as e: + logger.error(f"❌ Failed to record tool event: {str(e)}") + raise def log_new_wrapper( self, @@ -112,7 +139,12 @@ def log_new_client( def stop(self) -> None: """Ends AgentOps session""" - agentops.end_session(end_state=EndState.INDETERMINATE.value) + logger.info("🛑 Stopping AutogenLogger") + try: + agentops.end_session(end_state=EndState.INDETERMINATE.value) + except Exception as e: + logger.error(f"❌ Failed to end session: {str(e)}") + raise def get_connection(self) -> None: """Method intentionally left blank""" diff --git a/agentops/session.py b/agentops/session.py deleted file mode 100644 index b9f07d20b..000000000 --- a/agentops/session.py +++ /dev/null @@ -1,660 +0,0 @@ -from __future__ import annotations - -import asyncio -import functools -import json -import threading -from datetime import datetime, timezone -from decimal import ROUND_HALF_UP, Decimal -from typing import Any, Dict, List, Optional, Sequence, Union -from uuid import UUID, uuid4 - -from opentelemetry import trace -from opentelemetry.context import attach, detach, set_value -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import ReadableSpan, TracerProvider -from opentelemetry.sdk.trace.export import ( - BatchSpanProcessor, - ConsoleSpanExporter, - SpanExporter, - SpanExportResult, -) -from termcolor import colored - -from .config import Configuration -from .enums import EndState -from .event import ErrorEvent, Event -from .exceptions import ApiServerException -from .helpers import filter_unjsonable, get_ISO_time, safe_serialize -from .http_client import HttpClient, Response -from .log_config import logger - -""" -OTEL Guidelines: - - - -- Maintain a single TracerProvider for the application runtime - - Have one global TracerProvider in the Client class - -- According to the OpenTelemetry Python documentation, Resource should be initialized once per application and shared across all telemetry (traces, metrics, logs). -- Each Session gets its own Tracer (with session-specific context) -- Allow multiple sessions to share the provider while maintaining their own context - - - -:: Resource - - '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' - Captures information about the entity producing telemetry as Attributes. - For example, a process producing telemetry that is running in a container - on Kubernetes has a process name, a pod name, a namespace, and possibly - a deployment name. All these attributes can be included in the Resource. - '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' - - The key insight from the documentation is: - - - Resource represents the entity producing telemetry - in our case, that's the AgentOps SDK application itself - - Session-specific information should be attributes on the spans themselves - - A Resource is meant to identify the service/process/application1 - - Sessions are units of work within that application - - The documentation example about "process name, pod name, namespace" refers to where the code is running, not the work it's doing - -""" - - -class SessionExporter(SpanExporter): - """ - Manages publishing events for Session - """ - - def __init__(self, session: Session, **kwargs): - self.session = session - self._shutdown = threading.Event() - self._export_lock = threading.Lock() - super().__init__(**kwargs) - - @property - def endpoint(self): - return f"{self.session.config.endpoint}/v2/create_events" - - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - if self._shutdown.is_set(): - return SpanExportResult.SUCCESS - - with self._export_lock: - try: - # Skip if no spans to export - if not spans: - return SpanExportResult.SUCCESS - - events = [] - for span in spans: - event_data = json.loads(span.attributes.get("event.data", "{}")) - - # Format event data based on event type - if span.name == "actions": - formatted_data = { - "action_type": event_data.get("action_type", event_data.get("name", "unknown_action")), - "params": event_data.get("params", {}), - "returns": event_data.get("returns"), - } - elif span.name == "tools": - formatted_data = { - "name": event_data.get("name", event_data.get("tool_name", "unknown_tool")), - "params": event_data.get("params", {}), - "returns": event_data.get("returns"), - } - else: - formatted_data = event_data - - formatted_data = {**event_data, **formatted_data} - # Get timestamps, providing defaults if missing - current_time = datetime.now(timezone.utc).isoformat() - init_timestamp = span.attributes.get("event.timestamp") - end_timestamp = span.attributes.get("event.end_timestamp") - - # Handle missing timestamps - if init_timestamp is None: - init_timestamp = current_time - if end_timestamp is None: - end_timestamp = current_time - - # Get event ID, generate new one if missing - event_id = span.attributes.get("event.id") - if event_id is None: - event_id = str(uuid4()) - - events.append( - { - "id": event_id, - "event_type": span.name, - "init_timestamp": init_timestamp, - "end_timestamp": end_timestamp, - **formatted_data, - "session_id": str(self.session.session_id), - } - ) - - # Only make HTTP request if we have events and not shutdown - if events: - try: - res = HttpClient.post( - self.endpoint, - json.dumps({"events": events}).encode("utf-8"), - api_key=self.session.config.api_key, - jwt=self.session.jwt, - ) - return SpanExportResult.SUCCESS if res.code == 200 else SpanExportResult.FAILURE - except Exception as e: - logger.error(f"Failed to send events: {e}") - return SpanExportResult.FAILURE - - return SpanExportResult.SUCCESS - - except Exception as e: - logger.error(f"Failed to export spans: {e}") - return SpanExportResult.FAILURE - - def force_flush(self, timeout_millis: Optional[int] = None) -> bool: - return True - - def shutdown(self) -> None: - """Handle shutdown gracefully""" - self._shutdown.set() - # Don't call session.end_session() here to avoid circular dependencies - - -class Session: - """ - Represents a session of events, with a start and end state. - - Args: - session_id (UUID): The session id is used to record particular runs. - config (Configuration): The configuration object for the session. - tags (List[str], optional): Tags that can be used for grouping or sorting later. Examples could be ["GPT-4"]. - host_env (dict, optional): A dictionary containing host and environment data. - - Attributes: - init_timestamp (str): The ISO timestamp for when the session started. - end_timestamp (str, optional): The ISO timestamp for when the session ended. Only set after end_session is called. - end_state (str, optional): The final state of the session. Options: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate". - end_state_reason (str, optional): The reason for ending the session. - session_id (UUID): Unique identifier for the session. - tags (List[str]): List of tags associated with the session for grouping and filtering. - video (str, optional): URL to a video recording of the session. - host_env (dict, optional): Dictionary containing host and environment data. - config (Configuration): Configuration object containing settings for the session. - jwt (str, optional): JSON Web Token for authentication with the AgentOps API. - token_cost (Decimal): Running total of token costs for the session. - event_counts (dict): Counter for different types of events: - - llms: Number of LLM calls - - tools: Number of tool calls - - actions: Number of actions - - errors: Number of errors - - apis: Number of API calls - session_url (str, optional): URL to view the session in the AgentOps dashboard. - is_running (bool): Flag indicating if the session is currently active. - """ - - def __init__( - self, - session_id: UUID, - config: Configuration, - tags: Optional[List[str]] = None, - host_env: Optional[dict] = None, - ): - self.end_timestamp = None - self.end_state: Optional[str] = "Indeterminate" - self.session_id = session_id - self.init_timestamp = get_ISO_time() - self.tags: List[str] = tags or [] - self.video: Optional[str] = None - self.end_state_reason: Optional[str] = None - self.host_env = host_env - self.config = config - self.jwt = None - self._lock = threading.Lock() - self._end_session_lock = threading.Lock() - self.token_cost: Decimal = Decimal(0) - self._session_url: str = "" - self.event_counts = { - "llms": 0, - "tools": 0, - "actions": 0, - "errors": 0, - "apis": 0, - } - # self.session_url: Optional[str] = None - - # Start session first to get JWT - self.is_running = self._start_session() - if not self.is_running: - return - - # Initialize OTEL components with a more controlled processor - self._tracer_provider = TracerProvider() - self._otel_tracer = self._tracer_provider.get_tracer( - f"agentops.session.{str(session_id)}", - ) - self._otel_exporter = SessionExporter(session=self) - - # Use smaller batch size and shorter delay to reduce buffering - self._span_processor = BatchSpanProcessor( - self._otel_exporter, - max_queue_size=self.config.max_queue_size, - schedule_delay_millis=self.config.max_wait_time, - max_export_batch_size=min( - max(self.config.max_queue_size // 20, 1), - min(self.config.max_queue_size, 32), - ), - export_timeout_millis=20000, - ) - - self._tracer_provider.add_span_processor(self._span_processor) - - def set_video(self, video: str) -> None: - """ - Sets a url to the video recording of the session. - - Args: - video (str): The url of the video recording - """ - self.video = video - - def _flush_spans(self) -> bool: - """ - Flush pending spans for this specific session with timeout. - Returns True if flush was successful, False otherwise. - """ - if not hasattr(self, "_span_processor"): - return True - - try: - success = self._span_processor.force_flush(timeout_millis=self.config.max_wait_time) - if not success: - logger.warning("Failed to flush all spans before session end") - return success - except Exception as e: - logger.warning(f"Error flushing spans: {e}") - return False - - def end_session( - self, - end_state: str = "Indeterminate", - end_state_reason: Optional[str] = None, - video: Optional[str] = None, - ) -> Union[Decimal, None]: - with self._end_session_lock: - if not self.is_running: - return None - - if not any(end_state == state.value for state in EndState): - logger.warning("Invalid end_state. Please use one of the EndState enums") - return None - - try: - # Force flush any pending spans before ending session - if hasattr(self, "_span_processor"): - self._span_processor.force_flush(timeout_millis=5000) - - # 1. Set shutdown flag on exporter first - if hasattr(self, "_otel_exporter"): - self._otel_exporter.shutdown() - - # 2. Set session end state - self.end_timestamp = get_ISO_time() - self.end_state = end_state - self.end_state_reason = end_state_reason - if video is not None: - self.video = video - - # 3. Mark session as not running before cleanup - self.is_running = False - - # 4. Clean up OTEL components - if hasattr(self, "_span_processor"): - try: - # Force flush any pending spans - self._span_processor.force_flush(timeout_millis=5000) - # Shutdown the processor - self._span_processor.shutdown() - except Exception as e: - logger.warning(f"Error during span processor cleanup: {e}") - finally: - del self._span_processor - - # 5. Final session update - if not (analytics_stats := self.get_analytics()): - return None - - analytics = ( - f"Session Stats - " - f"{colored('Duration:', attrs=['bold'])} {analytics_stats['Duration']} | " - f"{colored('Cost:', attrs=['bold'])} ${analytics_stats['Cost']} | " - f"{colored('LLMs:', attrs=['bold'])} {analytics_stats['LLM calls']} | " - f"{colored('Tools:', attrs=['bold'])} {analytics_stats['Tool calls']} | " - f"{colored('Actions:', attrs=['bold'])} {analytics_stats['Actions']} | " - f"{colored('Errors:', attrs=['bold'])} {analytics_stats['Errors']}" - ) - logger.info(analytics) - - except Exception as e: - logger.exception(f"Error during session end: {e}") - finally: - active_sessions.remove(self) # First thing, get rid of the session - - logger.info( - colored( - f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", - "blue", - ) - ) - return self.token_cost - - def add_tags(self, tags: List[str]) -> None: - """ - Append to session tags at runtime. - """ - if not self.is_running: - return - - if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): - if isinstance(tags, str): - tags = [tags] - - # Initialize tags if None - if self.tags is None: - self.tags = [] - - # Add new tags that don't exist - for tag in tags: - if tag not in self.tags: - self.tags.append(tag) - - # Update session state immediately - self._update_session() - - def set_tags(self, tags): - """Set session tags, replacing any existing tags""" - if not self.is_running: - return - - if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): - if isinstance(tags, str): - tags = [tags] - - # Set tags directly - self.tags = tags.copy() # Make a copy to avoid reference issues - - # Update session state immediately - self._update_session() - - def record(self, event: Union[Event, ErrorEvent], flush_now=False): - """Record an event using OpenTelemetry spans""" - if not self.is_running: - return - - # Ensure event has all required base attributes - if not hasattr(event, "id"): - event.id = uuid4() - if not hasattr(event, "init_timestamp"): - event.init_timestamp = get_ISO_time() - if not hasattr(event, "end_timestamp") or event.end_timestamp is None: - event.end_timestamp = get_ISO_time() - - # Create session context - token = set_value("session.id", str(self.session_id)) - - try: - token = attach(token) - - # Create a copy of event data to modify - event_data = dict(filter_unjsonable(event.__dict__)) - - # Add required fields based on event type - if isinstance(event, ErrorEvent): - event_data["error_type"] = getattr(event, "error_type", event.event_type) - elif event.event_type == "actions": - # Ensure action events have action_type - if "action_type" not in event_data: - event_data["action_type"] = event_data.get("name", "unknown_action") - if "name" not in event_data: - event_data["name"] = event_data.get("action_type", "unknown_action") - elif event.event_type == "tools": - # Ensure tool events have name - if "name" not in event_data: - event_data["name"] = event_data.get("tool_name", "unknown_tool") - if "tool_name" not in event_data: - event_data["tool_name"] = event_data.get("name", "unknown_tool") - - with self._otel_tracer.start_as_current_span( - name=event.event_type, - attributes={ - "event.id": str(event.id), - "event.type": event.event_type, - "event.timestamp": event.init_timestamp or get_ISO_time(), - "event.end_timestamp": event.end_timestamp or get_ISO_time(), - "session.id": str(self.session_id), - "session.tags": ",".join(self.tags) if self.tags else "", - "event.data": json.dumps(event_data), - }, - ) as span: - if event.event_type in self.event_counts: - self.event_counts[event.event_type] += 1 - - if isinstance(event, ErrorEvent): - span.set_attribute("error", True) - if hasattr(event, "trigger_event") and event.trigger_event: - span.set_attribute("trigger_event.id", str(event.trigger_event.id)) - span.set_attribute("trigger_event.type", event.trigger_event.event_type) - - if flush_now and hasattr(self, "_span_processor"): - self._span_processor.force_flush() - finally: - detach(token) - - def _send_event(self, event): - """Direct event sending for testing""" - try: - payload = { - "events": [ - { - "id": str(event.id), - "event_type": event.event_type, - "init_timestamp": event.init_timestamp, - "end_timestamp": event.end_timestamp, - "data": filter_unjsonable(event.__dict__), - } - ] - } - - HttpClient.post( - f"{self.config.endpoint}/v2/create_events", - json.dumps(payload).encode("utf-8"), - jwt=self.jwt, - ) - except Exception as e: - logger.error(f"Failed to send event: {e}") - - def _reauthorize_jwt(self) -> Union[str, None]: - with self._lock: - payload = {"session_id": self.session_id} - serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") - res = HttpClient.post( - f"{self.config.endpoint}/v2/reauthorize_jwt", - serialized_payload, - self.config.api_key, - ) - - logger.debug(res.body) - - if res.code != 200: - return None - - jwt = res.body.get("jwt", None) - self.jwt = jwt - return jwt - - def _start_session(self): - with self._lock: - payload = {"session": self.__dict__} - serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") - - try: - res = HttpClient.post( - f"{self.config.endpoint}/v2/create_session", - serialized_payload, - api_key=self.config.api_key, - parent_key=self.config.parent_key, - ) - except ApiServerException as e: - return logger.error(f"Could not start session - {e}") - - logger.debug(res.body) - - if res.code != 200: - return False - - jwt = res.body.get("jwt", None) - self.jwt = jwt - if jwt is None: - return False - - logger.info( - colored( - f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", - "blue", - ) - ) - - return True - - def _update_session(self) -> None: - """Update session state on the server""" - if not self.is_running: - return - - # TODO: Determine whether we really need to lock here: are incoming calls coming from other threads? - with self._lock: - payload = {"session": self.__dict__} - - try: - res = HttpClient.post( - f"{self.config.endpoint}/v2/update_session", - json.dumps(filter_unjsonable(payload)).encode("utf-8"), - # self.config.api_key, - jwt=self.jwt, - ) - except ApiServerException as e: - return logger.error(f"Could not update session - {e}") - - def create_agent(self, name, agent_id): - if not self.is_running: - return - if agent_id is None: - agent_id = str(uuid4()) - - payload = { - "id": agent_id, - "name": name, - } - - serialized_payload = safe_serialize(payload).encode("utf-8") - try: - HttpClient.post( - f"{self.config.endpoint}/v2/create_agent", - serialized_payload, - api_key=self.config.api_key, - jwt=self.jwt, - ) - except ApiServerException as e: - return logger.error(f"Could not create agent - {e}") - - return agent_id - - def patch(self, func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - kwargs["session"] = self - return func(*args, **kwargs) - - return wrapper - - def _get_response(self) -> Optional[Response]: - payload = {"session": self.__dict__} - try: - response = HttpClient.post( - f"{self.config.endpoint}/v2/update_session", - json.dumps(filter_unjsonable(payload)).encode("utf-8"), - api_key=self.config.api_key, - jwt=self.jwt, - ) - except ApiServerException as e: - return logger.error(f"Could not end session - {e}") - - logger.debug(response.body) - return response - - def _format_duration(self, start_time, end_time) -> str: - start = datetime.fromisoformat(start_time.replace("Z", "+00:00")) - end = datetime.fromisoformat(end_time.replace("Z", "+00:00")) - duration = end - start - - hours, remainder = divmod(duration.total_seconds(), 3600) - minutes, seconds = divmod(remainder, 60) - - parts = [] - if hours > 0: - parts.append(f"{int(hours)}h") - if minutes > 0: - parts.append(f"{int(minutes)}m") - parts.append(f"{seconds:.1f}s") - - return " ".join(parts) - - def _get_token_cost(self, response: Response) -> Decimal: - token_cost = response.body.get("token_cost", "unknown") - if token_cost == "unknown" or token_cost is None: - return Decimal(0) - return Decimal(token_cost) - - def _format_token_cost(self, token_cost: Decimal) -> str: - return ( - "{:.2f}".format(token_cost) - if token_cost == 0 - else "{:.6f}".format(token_cost.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP)) - ) - - def get_analytics(self) -> Optional[Dict[str, Any]]: - if not self.end_timestamp: - self.end_timestamp = get_ISO_time() - - formatted_duration = self._format_duration(self.init_timestamp, self.end_timestamp) - - if (response := self._get_response()) is None: - return None - - self.token_cost = self._get_token_cost(response) - - return { - "LLM calls": self.event_counts["llms"], - "Tool calls": self.event_counts["tools"], - "Actions": self.event_counts["actions"], - "Errors": self.event_counts["errors"], - "Duration": formatted_duration, - "Cost": self._format_token_cost(self.token_cost), - } - - @property - def session_url(self) -> str: - """Returns the URL for this session in the AgentOps dashboard.""" - assert self.session_id, "Session ID is required to generate a session URL" - return f"https://app.agentops.ai/drilldown?session_id={self.session_id}" - - # @session_url.setter - # def session_url(self, url: str): - # pass - - -active_sessions: List[Session] = [] diff --git a/agentops/session/README.md b/agentops/session/README.md new file mode 100644 index 000000000..177f50f2c --- /dev/null +++ b/agentops/session/README.md @@ -0,0 +1,98 @@ +# Session Package + +This package contains the core session management functionality for AgentOps. + +## Architecture + +```mermaid +graph TD + S[Session] --> |delegates to| M[SessionManager] + M --> |uses| A[SessionApiClient] + M --> |uses| T[SessionTelemetry] + T --> |uses| E[SessionExporter] + M --> |manages| R[Registry] + R --> |tracks| S +``` + +## Component Responsibilities + +### Session (`session.py`) +- Data container for session state +- Provides public API for session operations +- Delegates all operations to SessionManager + +### SessionManager (`manager.py`) +- Handles session lifecycle and state management +- Coordinates between API, telemetry, and registry +- Manages session analytics and event counts + +### SessionApiClient (`api.py`) +- Handles all HTTP communication with AgentOps API +- Manages authentication headers and JWT +- Serializes session state for API calls + +### SessionTelemetry (`telemetry.py`) +- Sets up OpenTelemetry infrastructure +- Records events with proper context +- Manages event batching and flushing + +### SessionExporter (`../telemetry/exporters/session.py`) +- Exports OpenTelemetry spans as AgentOps events +- Handles event formatting and delivery +- Manages export batching and retries + +### Registry (`registry.py`) +- Tracks active sessions +- Provides global session access +- Maintains backward compatibility with old code + +### LogCapture (`log_capture.py`) +- Captures stdout/stderr using OpenTelemetry logging +- Integrates with SessionTelemetry for consistent configuration +- Manages log buffering and export + +## Data Flow + +```mermaid +sequenceDiagram + participant C as Client + participant S as Session + participant M as SessionManager + participant A as SessionApiClient + participant T as SessionTelemetry + participant E as SessionExporter + + C->>S: start_session() + S->>M: create() + M->>A: create_session() + A-->>M: jwt + M->>T: setup() + T->>E: init() + + C->>S: record(event) + S->>M: record_event() + M->>T: record_event() + T->>E: export() + E->>A: create_events() +``` + +## Usage Example + +```python +from agentops import Client + +# Create client +client = Client(api_key="your-key") + +# Start session +session = client.start_session(tags=["test"]) + +# Record events +session.record(some_event) + +# Add tags +session.add_tags(["new_tag"]) + +# End session +session.end_session(end_state="Success") +``` diff --git a/agentops/session/__init__.py b/agentops/session/__init__.py new file mode 100644 index 000000000..14144bb4e --- /dev/null +++ b/agentops/session/__init__.py @@ -0,0 +1,9 @@ +"""Session management module""" + +from .session import Session +from .registry import get_active_sessions, add_session, remove_session + +# For backward compatibility +active_sessions = get_active_sessions() + +__all__ = ["Session", "active_sessions", "add_session", "remove_session"] diff --git a/agentops/session/api.py b/agentops/session/api.py new file mode 100644 index 000000000..02dc24487 --- /dev/null +++ b/agentops/session/api.py @@ -0,0 +1 @@ +from agentops.api.session import * diff --git a/agentops/session/log_capture.py b/agentops/session/log_capture.py new file mode 100644 index 000000000..70cc0e6d5 --- /dev/null +++ b/agentops/session/log_capture.py @@ -0,0 +1,205 @@ +import sys +import logging +from typing import Optional +from uuid import UUID +from opentelemetry.sdk._logs import LoggingHandler, LoggerProvider +from opentelemetry.sdk._logs.export import ConsoleLogExporter, BatchLogRecordProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry import trace + + +class LogCapture: + """Captures terminal output for a session using OpenTelemetry logging. + + Integrates with TelemetryManager to use consistent configuration and logging setup. + If no telemetry manager is available, creates a standalone logging setup. + """ + + def __init__(self, session): + self.session = session + # Use unique logger names to avoid conflicts + self._stdout_logger = logging.getLogger(f"agentops.stdout.{id(self)}") + self._stderr_logger = logging.getLogger(f"agentops.stderr.{id(self)}") + self._stdout = None + self._stderr = None + self._handler = None + self._logger_provider = None + self._owns_handler = False # Track if we created our own handler + + # Configure loggers to not propagate to parent loggers + for logger in (self._stdout_logger, self._stderr_logger): + logger.setLevel(logging.INFO) + logger.propagate = False + logger.handlers.clear() + + def start(self): + """Start capturing output using OTEL logging handler""" + if self._stdout is not None: + return + + # Try to get handler from telemetry manager + if hasattr(self.session, "_telemetry") and self.session._telemetry: + self._handler = self.session._telemetry.get_log_handler() + + # Create our own handler if none exists + if not self._handler: + self._owns_handler = True + + # Use session's resource attributes if available + resource_attrs = {"service.name": "agentops", "session.id": str(getattr(self.session, "id", "unknown"))} + + if ( + hasattr(self.session, "_telemetry") + and self.session._telemetry + and self.session._telemetry.config + and self.session._telemetry.config.resource_attributes + ): + resource_attrs.update(self.session._telemetry.config.resource_attributes) + + # Setup logger provider with console exporter + resource = Resource.create(resource_attrs) + self._logger_provider = LoggerProvider(resource=resource) + exporter = ConsoleLogExporter() + self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) + + self._handler = LoggingHandler( + level=logging.INFO, + logger_provider=self._logger_provider, + ) + + # Register with telemetry manager if available + if hasattr(self.session, "_telemetry") and self.session._telemetry: + self.session._telemetry.set_log_handler(self._handler) + + # Add handler to both loggers + self._stdout_logger.addHandler(self._handler) + self._stderr_logger.addHandler(self._handler) + + # Save original stdout/stderr + self._stdout = sys.stdout + self._stderr = sys.stderr + + # Replace with logging proxies + sys.stdout = self._StdoutProxy(self._stdout_logger) + sys.stderr = self._StderrProxy(self._stderr_logger) + + def stop(self): + """Stop capturing output and restore stdout/stderr""" + if self._stdout is None: + return + + # Restore original stdout/stderr + sys.stdout = self._stdout + sys.stderr = self._stderr + self._stdout = None + self._stderr = None + + # Clean up handlers + if self._handler: + self._stdout_logger.removeHandler(self._handler) + self._stderr_logger.removeHandler(self._handler) + + # Only close/shutdown if we own the handler + if self._owns_handler: + self._handler.close() + if self._logger_provider: + self._logger_provider.shutdown() + + # Clear from telemetry manager if we created it + if hasattr(self.session, "_telemetry") and self.session._telemetry: + self.session._telemetry.set_log_handler(None) + + self._handler = None + self._logger_provider = None + + def flush(self): + """Flush any buffered logs""" + if self._handler: + self._handler.flush() + + class _StdoutProxy: + """Proxies stdout to logger""" + + def __init__(self, logger): + self._logger = logger + + def write(self, text): + if text.strip(): # Only log non-empty strings + self._logger.info(text.rstrip()) + + def flush(self): + pass + + class _StderrProxy: + """Proxies stderr to logger""" + + def __init__(self, logger): + self._logger = logger + + def write(self, text): + if text.strip(): # Only log non-empty strings + self._logger.error(text.rstrip()) + + def flush(self): + pass + + +if __name__ == "__main__": + import time + from dataclasses import dataclass + from uuid import uuid4 + from agentops.telemetry.config import OTELConfig + from agentops.telemetry.manager import TelemetryManager + + # Create a mock session with telemetry + @dataclass + class MockSession: + id: UUID + _telemetry: Optional[TelemetryManager] = None + + # Setup telemetry + telemetry = TelemetryManager() + config = OTELConfig(resource_attributes={"test.attribute": "demo"}, endpoint="http://localhost:4317") + telemetry.initialize(config) + + # Create session + session = MockSession(id=uuid4(), _telemetry=telemetry) + + # Create and start capture + capture = LogCapture(session) + capture.start() + + try: + print("Regular stdout message") + print("Multi-line stdout message\nwith a second line") + sys.stderr.write("Error message to stderr\n") + + # Show that empty lines are ignored + print("") + print("\n\n") + + # Demonstrate concurrent output + def background_prints(): + for i in range(3): + time.sleep(0.5) + print(f"Background message {i}") + sys.stderr.write(f"Background error {i}\n") + + import threading + + thread = threading.Thread(target=background_prints) + thread.start() + + # Main thread output + for i in range(3): + time.sleep(0.7) + print(f"Main thread message {i}") + + thread.join() + + finally: + # Stop capture and show normal output is restored + capture.stop() + telemetry.shutdown() + print("\nCapture stopped - this prints normally to stdout") + sys.stderr.write("This error goes normally to stderr\n") diff --git a/agentops/session/manager.py b/agentops/session/manager.py new file mode 100644 index 000000000..b3fe329f3 --- /dev/null +++ b/agentops/session/manager.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import threading +from datetime import datetime +from decimal import Decimal +from typing import TYPE_CHECKING, Optional, Union, Dict, List, Any + +from termcolor import colored +from agentops.enums import EndState +from agentops.helpers import get_ISO_time +from agentops.log_config import logger + +if TYPE_CHECKING: + from agentops.event import Event, ErrorEvent + from .session import Session + from .registry import add_session, remove_session + from .api import SessionApiClient + from .telemetry import SessionTelemetry + + +class SessionManager: + """Handles session lifecycle and state management""" + + def __init__(self, session: "Session"): + self._state = session + self._lock = threading.Lock() + self._end_session_lock = threading.Lock() + + # Import at runtime to avoid circular imports + from .registry import add_session, remove_session + + self._add_session = add_session + self._remove_session = remove_session + + # Initialize telemetry + from .telemetry import SessionTelemetry + + self._telemetry = SessionTelemetry(self._state) + + # Store reference on session for backward compatibility + self._state._telemetry = self._telemetry + self._state._otel_exporter = self._telemetry._exporter + + def start_session(self) -> bool: + """Start and initialize session""" + with self._lock: + if not self._state._api: + return False + + success, jwt = self._state._api.create_session(self._serialize_session(), self._state.config.parent_key) + if success: + self._state.jwt = jwt + self._state._api.jwt = jwt # Update JWT on API client + self._add_session(self._state) + return success + + def create_agent(self, name: str, agent_id: Optional[str] = None) -> Optional[str]: + """Create a new agent""" + with self._lock: + if agent_id is None: + from uuid import uuid4 + + agent_id = str(uuid4()) + + if not self._state._api: + return None + + success = self._state._api.create_agent(name=name, agent_id=agent_id) + return agent_id if success else None + + def add_tags(self, tags: Union[str, List[str]]) -> None: + """Add tags to session""" + with self._lock: + if isinstance(tags, str): + if tags not in self._state.tags: + self._state.tags.append(tags) + elif isinstance(tags, list): + self._state.tags.extend(t for t in tags if t not in self._state.tags) + + if self._state._api: + self._state._api.update_session({"tags": self._state.tags}) + + def set_tags(self, tags: Union[str, List[str]]) -> None: + """Set session tags""" + with self._lock: + if isinstance(tags, str): + self._state.tags = [tags] + elif isinstance(tags, list): + self._state.tags = list(tags) + + if self._state._api: + self._state._api.update_session({"tags": self._state.tags}) + + def record_event(self, event: Union["Event", "ErrorEvent"], flush_now: bool = False) -> None: + """Update event counts and record event""" + with self._lock: + # Update counts + if event.event_type in self._state.event_counts: + self._state.event_counts[event.event_type] += 1 + + # Record via telemetry + if self._telemetry: + self._telemetry.record_event(event, flush_now) + + def end_session( + self, end_state: str, end_state_reason: Optional[str], video: Optional[str] + ) -> Union[Decimal, None]: + """End session and cleanup""" + with self._end_session_lock: + if not self._state.is_running: + return None + + try: + # Flush any pending telemetry + if self._telemetry: + self._telemetry.flush(timeout_millis=5000) + + self._state.end_timestamp = get_ISO_time() + self._state.end_state = end_state + self._state.end_state_reason = end_state_reason + self._state.video = video if video else self._state.video + self._state.is_running = False + + if analytics := self._get_analytics(): + self._log_analytics(analytics) + self._remove_session(self._state) + return self._state.token_cost + return None + except Exception as e: + logger.exception(f"Error ending session: {e}") + return None + + def _get_analytics(self) -> Optional[Dict[str, Union[int, str]]]: + """Get session analytics""" + if not self._state.end_timestamp: + self._state.end_timestamp = get_ISO_time() + + formatted_duration = self._format_duration(self._state.init_timestamp, self._state.end_timestamp) + + if not self._state._api: + return None + + response = self._state._api.update_session(self._serialize_session()) + if not response: + return None + + # Update token cost from API response + if "token_cost" in response: + self._state.token_cost = Decimal(str(response["token_cost"])) + + return { + "LLM calls": self._state.event_counts["llms"], + "Tool calls": self._state.event_counts["tools"], + "Actions": self._state.event_counts["actions"], + "Errors": self._state.event_counts["errors"], + "Duration": formatted_duration, + "Cost": self._format_token_cost(self._state.token_cost), + } + + def _serialize_session(self) -> Dict[str, Any]: + """Convert session to API-friendly dict format""" + # Get only the public fields we want to send to API + return { + "session_id": str(self._state.session_id), + "tags": self._state.tags, + "host_env": self._state.host_env, + "token_cost": float(self._state.token_cost), + "end_state": self._state.end_state, + "end_state_reason": self._state.end_state_reason, + "end_timestamp": self._state.end_timestamp, + "jwt": self._state.jwt, + "video": self._state.video, + "event_counts": self._state.event_counts, + "init_timestamp": self._state.init_timestamp, + "is_running": self._state.is_running, + } + + def _format_duration(self, start_time: str, end_time: str) -> str: + """Format duration between two timestamps""" + start = datetime.fromisoformat(start_time.replace("Z", "+00:00")) + end = datetime.fromisoformat(end_time.replace("Z", "+00:00")) + duration = end - start + + hours, remainder = divmod(duration.total_seconds(), 3600) + minutes, seconds = divmod(remainder, 60) + + parts = [] + if hours > 0: + parts.append(f"{int(hours)}h") + if minutes > 0: + parts.append(f"{int(minutes)}m") + parts.append(f"{seconds:.1f}s") + + return " ".join(parts) + + def _format_token_cost(self, token_cost: Decimal) -> str: + """Format token cost for display""" + # Always format with 6 decimal places for consistency with tests + return "{:.6f}".format(token_cost) + + def _log_analytics(self, stats: Dict[str, Union[int, str]]) -> None: + """Log analytics in a consistent format""" + analytics = ( + f"Session Stats - " + f"{colored('Duration:', attrs=['bold'])} {stats['Duration']} | " + f"{colored('Cost:', attrs=['bold'])} ${stats['Cost']} | " + f"{colored('LLMs:', attrs=['bold'])} {str(stats['LLM calls'])} | " + f"{colored('Tools:', attrs=['bold'])} {str(stats['Tool calls'])} | " + f"{colored('Actions:', attrs=['bold'])} {str(stats['Actions'])} | " + f"{colored('Errors:', attrs=['bold'])} {str(stats['Errors'])}" + ) + logger.info(analytics) diff --git a/agentops/session/registry.py b/agentops/session/registry.py new file mode 100644 index 000000000..51e5e655c --- /dev/null +++ b/agentops/session/registry.py @@ -0,0 +1,25 @@ +"""Registry for tracking active sessions""" + +from typing import List, TYPE_CHECKING + +if TYPE_CHECKING: + from .session import Session + +_active_sessions = [] # type: List["Session"] + + +def add_session(session: "Session") -> None: + """Add session to active sessions list""" + if session not in _active_sessions: + _active_sessions.append(session) + + +def remove_session(session: "Session") -> None: + """Remove session from active sessions list""" + if session in _active_sessions: + _active_sessions.remove(session) + + +def get_active_sessions() -> List["Session"]: + """Get list of active sessions""" + return _active_sessions diff --git a/agentops/session/session.py b/agentops/session/session.py new file mode 100644 index 000000000..25120a394 --- /dev/null +++ b/agentops/session/session.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from decimal import Decimal +from typing import TYPE_CHECKING, Dict, List, Optional, Union, Any +from uuid import UUID + +from agentops.config import Configuration +from agentops.enums import EndState +from agentops.helpers import get_ISO_time +from .log_capture import LogCapture + +if TYPE_CHECKING: + from agentops.event import Event, ErrorEvent + + +@dataclass +class Session: + """Data container for session state with minimal public API""" + + session_id: UUID + config: Configuration + tags: List[str] = field(default_factory=list) + host_env: Optional[dict] = None + token_cost: Decimal = field(default_factory=lambda: Decimal(0)) + end_state: str = field(default_factory=lambda: EndState.INDETERMINATE.value) + end_state_reason: Optional[str] = None + end_timestamp: Optional[str] = None + jwt: Optional[str] = None + video: Optional[str] = None + event_counts: Dict[str, int] = field( + default_factory=lambda: {"llms": 0, "tools": 0, "actions": 0, "errors": 0, "apis": 0} + ) + init_timestamp: str = field(default_factory=get_ISO_time) + is_running: bool = field(default=True) + _telemetry: Any = field(init=False, repr=False, default=None) + _api: Any = field(init=False, repr=False, default=None) + _manager: Any = field(init=False, repr=False, default=None) + _otel_exporter: Any = field(init=False, repr=False, default=None) + _log_capture: LogCapture = field(init=False, repr=False, default=None) + + def __post_init__(self): + """Initialize session manager""" + # Convert tags to list first + if isinstance(self.tags, (str, set)): + self.tags = list(self.tags) + elif self.tags is None: + self.tags = [] + + # Initialize API client + from ..api.session import SessionApiClient + + if not self.config.api_key: + raise ValueError("API key is required") + + self._api = SessionApiClient( + endpoint=self.config.endpoint, session_id=self.session_id, api_key=self.config.api_key + ) + + # Then initialize manager + from .manager import SessionManager + + self._manager = SessionManager(self) + self.is_running = self._manager.start_session() + + self._log_capture = LogCapture(self) + + # Public API - All delegate to manager + def add_tags(self, tags: Union[str, List[str]]) -> None: + """Add tags to session""" + if self.is_running and self._manager: + self._manager.add_tags(tags) + + def set_tags(self, tags: Union[str, List[str]]) -> None: + """Set session tags""" + if self.is_running and self._manager: + self._manager.set_tags(tags) + + def record(self, event: Union["Event", "ErrorEvent"], flush_now: bool = False) -> None: + """Record an event""" + if self._manager: + self._manager.record_event(event, flush_now) + + def end_session( + self, + end_state: str = EndState.INDETERMINATE.value, + end_state_reason: Optional[str] = None, + video: Optional[str] = None, + ) -> Union[Decimal, None]: + """End the session""" + if self._manager: + return self._manager.end_session(end_state, end_state_reason, video) + return None + + def create_agent(self, name: str, agent_id: Optional[str] = None) -> Optional[str]: + """Create a new agent for this session""" + if self.is_running and self._manager: + return self._manager.create_agent(name, agent_id) + return None + + def get_analytics(self) -> Optional[Dict[str, Union[int, str]]]: + """Get session analytics""" + if self._manager: + return self._manager._get_analytics() + return None + + @property + def session_url(self) -> str: + return f"https://app.agentops.ai/drilldown?session_id={self.session_id}" + + @property + def _tracer_provider(self): + """For testing compatibility""" + return self._telemetry._tracer_provider if self._telemetry else None + + def start_log_capture(self): + """Start capturing terminal output""" + self._log_capture.start() + + def stop_log_capture(self): + """Stop capturing terminal output""" + self._log_capture.stop() diff --git a/agentops/session/telemetry.py b/agentops/session/telemetry.py new file mode 100644 index 000000000..ae98ffe50 --- /dev/null +++ b/agentops/session/telemetry.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Optional, Union +from uuid import UUID +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.context import attach, detach, set_value + +from agentops.helpers import get_ISO_time, filter_unjsonable + +if TYPE_CHECKING: + from agentops.session import Session + from agentops.event import Event, ErrorEvent + + +class SessionTelemetry: + """Handles telemetry setup and event recording""" + + def __init__(self, session: "Session"): + self.session = session + self._setup_telemetry() + + def _setup_telemetry(self): + """Initialize OpenTelemetry components""" + self._tracer_provider = TracerProvider() + self._otel_tracer = self._tracer_provider.get_tracer( + f"agentops.session.{str(self.session.session_id)}", + ) + + from agentops.telemetry.exporters.session import SessionExporter + + self._exporter = SessionExporter(session=self.session) + + # Configure batch processor + self._span_processor = BatchSpanProcessor( + self._exporter, + max_queue_size=self.session.config.max_queue_size, + schedule_delay_millis=self.session.config.max_wait_time, + max_export_batch_size=min( + max(self.session.config.max_queue_size // 20, 1), + min(self.session.config.max_queue_size, 32), + ), + export_timeout_millis=20000, + ) + + self._tracer_provider.add_span_processor(self._span_processor) + + def record_event(self, event: Union[Event, ErrorEvent], flush_now: bool = False) -> None: + """Record telemetry for an event""" + if not hasattr(self, "_otel_tracer"): + return + + # Create session context + token = set_value("session.id", str(self.session.session_id)) + try: + token = attach(token) + + # Filter out non-serializable data + event_data = filter_unjsonable(event.__dict__) + + with self._otel_tracer.start_as_current_span( + name=event.event_type, + attributes={ + "event.id": str(event.id), + "event.type": event.event_type, + "event.timestamp": event.init_timestamp or get_ISO_time(), + "event.end_timestamp": event.end_timestamp or get_ISO_time(), + "session.id": str(self.session.session_id), + "session.tags": ",".join(self.session.tags) if self.session.tags else "", + "event.data": json.dumps(event_data), + }, + ) as span: + if hasattr(event, "error_type"): + span.set_attribute("error", True) + if hasattr(event, "trigger_event") and event.trigger_event: + span.set_attribute("trigger_event.id", str(event.trigger_event.id)) + span.set_attribute("trigger_event.type", event.trigger_event.event_type) + + if flush_now and hasattr(self, "_span_processor"): + self._span_processor.force_flush() + finally: + detach(token) + + def flush(self, timeout_millis: Optional[int] = None) -> None: + """Force flush pending spans""" + if hasattr(self, "_span_processor"): + self._span_processor.force_flush(timeout_millis=timeout_millis) diff --git a/agentops/telemetry/DESIGN.mermaid.md b/agentops/telemetry/DESIGN.mermaid.md new file mode 100644 index 000000000..69bb94d04 --- /dev/null +++ b/agentops/telemetry/DESIGN.mermaid.md @@ -0,0 +1,60 @@ +```mermaid +flowchart TB + subgraph Client["AgentOps Client"] + Session["Session"] + Events["Events (LLM/Action/Tool/Error)"] + LLMTracker["LLM Tracker"] + LogCapture["LogCapture"] + end + + subgraph Providers["LLM Providers"] + OpenAI["OpenAI Provider"] + Anthropic["Anthropic Provider"] + Mistral["Mistral Provider"] + Other["Other Providers..."] + end + + subgraph TelemetrySystem["Telemetry System"] + TelemetryManager["TelemetryManager"] + + subgraph Processors["Processors"] + EventProcessor["EventProcessor"] + LogProcessor["LogProcessor"] + end + + SpanEncoder["EventToSpanEncoder"] + BatchProcessor["BatchSpanProcessor"] + end + + subgraph Export["Export Layer"] + SessionExporter["SessionExporter"] + EventExporter["EventExporter"] + end + + subgraph Backend["AgentOps Backend"] + API["AgentOps API"] + Storage["Storage"] + end + + %% Flow connections + Session -->|Creates| Events + Session -->|Initializes| LogCapture + LogCapture -->|Captures| StdOut["stdout/stderr"] + LogCapture -->|Queues Logs| LogProcessor + + LLMTracker -->|Instruments| Providers + Providers -->|Generates| Events + Events -->|Processed by| TelemetryManager + + TelemetryManager -->|Creates| Processors + EventProcessor -->|Converts via| SpanEncoder + LogProcessor -->|Converts via| SpanEncoder + EventProcessor & LogProcessor -->|Forward to| BatchProcessor + + BatchProcessor -->|Exports via| SessionExporter + BatchProcessor -->|Exports via| EventExporter + + SessionExporter -->|Sends to| API + EventExporter -->|Sends to| API + API -->|Stores in| Storage +``` diff --git a/agentops/telemetry/PROPOSAL.md b/agentops/telemetry/PROPOSAL.md new file mode 100644 index 000000000..87992827a --- /dev/null +++ b/agentops/telemetry/PROPOSAL.md @@ -0,0 +1,176 @@ +# AgentOps OTEL Integration Redesign + +## Current Architecture Limitations +- Limited distributed tracing capabilities +- OTEL used primarily as transport +- Tight coupling between sessions and telemetry +- Provider instrumentation lacks observability context + +## Proposed Architecture Overview +````mermaid +graph TD + A[Agent Application] --> B[AgentOps Client] + B --> C[Session Manager] + C --> D[Instrumented Providers] + D --> E1[AgentOps Events] + D --> E2[OTEL Telemetry] + E1 --> F1[AgentOps Backend] + E2 --> F2[OTEL Collector] + F2 --> G[Observability Backend] +```` + +## Implementation Plan + +### 1. Core Instrumentation Layer +**Goal**: Create a foundation for dual-purpose instrumentation + +- [ ] **Create Base Instrumentation Interface** + ```python + class BaseInstrumentation: + def create_span(...) + def record_event(...) + def propagate_context(...) + ``` + +- [ ] **Implement Provider-Specific Instrumentation** + - OpenAI instrumentation + - Anthropic instrumentation + - Base class for custom providers + +### 2. Session Management Refactor +**Goal**: Decouple session management from telemetry + +- [ ] **Split Session Responsibilities** + - Create SessionManager class + - Move telemetry to dedicated TelemetryManager + - Implement context propagation + +- [ ] **Update Session Interface** + ```python + class Session: + def __init__(self, telemetry_manager: TelemetryManager) + def record(self, event: Event) + def propagate_context(self) + ``` + +### 3. Telemetry Pipeline +**Goal**: Support multiple telemetry backends + +- [ ] **Create Telemetry Manager** + - Implement span creation/management + - Handle event correlation + - Support multiple exporters + +- [ ] **Update Event Processing** + - Add trace context to events + - Implement sampling strategies + - Add batch processing support + +### 4. Provider Integration +**Goal**: Enhance provider instrumentation + +- [ ] **Update InstrumentedProvider Base Class** + ```python + class InstrumentedProvider: + def __init__(self, instrumentation: BaseInstrumentation) + def handle_response(self, response, context) + def create_span(self, operation) + ``` + +- [ ] **Implement Provider-Specific Features** + - Token counting with spans + - Latency tracking + - Error handling with trace context + +### 5. Context Propagation +**Goal**: Enable distributed tracing + +- [ ] **Implement Context Management** + - Create TraceContext class + - Add context injection/extraction + - Support async operations + +- [ ] **Add Cross-Service Tracing** + - HTTP header propagation + - gRPC metadata support + - Async context management + +### 6. Configuration Updates +**Goal**: Make instrumentation configurable + +- [ ] **Create Configuration Interface** + ```python + class TelemetryConfig: + sampling_rate: float + exporters: List[Exporter] + trace_context: ContextConfig + ``` + +- [ ] **Update Client Configuration** + - Add OTEL configuration + - Support multiple backends + - Configure sampling + +### 7. Migration Support +**Goal**: Ensure backward compatibility + +- [ ] **Create Migration Tools** + - Add compatibility layer + - Create migration guide + - Add version detection + +- [ ] **Update Documentation** + - Update README.md + - Add migration examples + - Document new features + +## File Structure Changes +``` +agentops/ +├── instrumentation/ +│ ├── base.py +│ ├── providers/ +│ └── context.py +├── telemetry/ +│ ├── manager.py +│ ├── exporters/ +│ └── sampling.py +├── session/ +│ ├── manager.py +│ └── context.py +└── providers/ + └── instrumented_provider.py +``` + +## Configuration Example +```yaml +telemetry: + sampling: + rate: 1.0 + rules: [] + exporters: + - type: agentops + endpoint: https://api.agentops.ai + - type: otlp + endpoint: localhost:4317 + context: + propagation: + - b3 + - w3c +``` + +## Benefits +1. **Enhanced Observability** + - Full distributed tracing + - Better debugging capabilities + - Cross-service correlation + +2. **Improved Architecture** + - Clear separation of concerns + - More flexible instrumentation + - Better extensibility + +3. **Better Performance** + - Optimized sampling + - Efficient context propagation + - Reduced overhead diff --git a/agentops/telemetry/README.md b/agentops/telemetry/README.md new file mode 100644 index 000000000..9be1aa641 --- /dev/null +++ b/agentops/telemetry/README.md @@ -0,0 +1,115 @@ +# AgentOps OpenTelemetry Integration + +## Architecture Overview + +```mermaid +flowchart TB + subgraph AgentOps + Client[AgentOps Client] + Session[Session] + Events[Events] + LogCapture[LogCapture] + TelemetryManager[Telemetry Manager] + end + + subgraph OpenTelemetry + TracerProvider[Tracer Provider] + EventProcessor[Event Processor] + LogProcessor[Log Processor] + EventToSpanEncoder[Event To Span Encoder] + BatchProcessor[Batch Processor] + SessionExporter[Session Exporter] + EventExporter[Event Exporter] + end + + Client --> Session + Session --> Events + Session --> LogCapture + LogCapture --> LogProcessor + Events --> TelemetryManager + + TelemetryManager --> TracerProvider + TracerProvider --> EventProcessor + EventProcessor --> EventToSpanEncoder + LogProcessor --> EventToSpanEncoder + EventToSpanEncoder --> BatchProcessor + BatchProcessor --> SessionExporter + BatchProcessor --> EventExporter +``` + +## Component Overview + +### TelemetryManager (`manager.py`) +- Central configuration and management of OpenTelemetry setup +- Handles TracerProvider lifecycle and sampling configuration +- Manages session-specific exporters and processors +- Coordinates telemetry initialization and shutdown +- Configures logging telemetry + +### EventProcessor (`processors.py`) +- Processes spans for AgentOps events +- Adds session context to spans +- Tracks event counts by type +- Handles error propagation +- Forwards spans to wrapped processor + +### SessionExporter & EventExporter (`exporters/`) +- Exports session spans and events +- Implements retry logic with exponential backoff +- Supports custom formatters +- Handles batched export +- Manages error handling and recovery + +### EventToSpanEncoder (`encoders.py`) +- Converts AgentOps events into OpenTelemetry spans +- Handles different event types (LLM, Action, Tool, Error) +- Maintains proper span relationships +- Supports custom attribute mapping + +## Configuration Options + +The `OTELConfig` class supports: +```python +@dataclass +class OTELConfig: + additional_exporters: Optional[List[SpanExporter]] = None + resource_attributes: Optional[Dict] = None + sampler: Optional[Sampler] = None + retry_config: Optional[Dict] = None + custom_formatters: Optional[List[Callable]] = None + enable_metrics: bool = False + metric_readers: Optional[List] = None + max_queue_size: int = 512 + max_export_batch_size: int = 256 + max_wait_time: int = 5000 + endpoint: str = "https://api.agentops.ai" + api_key: Optional[str] = None +``` + +## Usage Example + +```python +from agentops.telemetry import OTELConfig, TelemetryManager + +# Configure telemetry with retry and custom formatting +config = OTELConfig( + endpoint="https://api.agentops.ai", + api_key="your-api-key", + retry_config={ + "retry_count": 3, + "retry_delay": 1.0 + }, + custom_formatters=[your_formatter_function], + enable_metrics=True +) + +# Initialize telemetry manager +manager = TelemetryManager() +manager.initialize(config) + +# Create session tracer +tracer = manager.create_session_tracer( + session_id=session_id, + jwt=jwt_token +) +``` diff --git a/agentops/telemetry/__init__.py b/agentops/telemetry/__init__.py new file mode 100644 index 000000000..0b8032fd8 --- /dev/null +++ b/agentops/telemetry/__init__.py @@ -0,0 +1,4 @@ +from .manager import TelemetryManager +from .config import OTELConfig + +__all__ = [OTELConfig, TelemetryManager] diff --git a/agentops/telemetry/attributes.py b/agentops/telemetry/attributes.py new file mode 100644 index 000000000..428845c7c --- /dev/null +++ b/agentops/telemetry/attributes.py @@ -0,0 +1,68 @@ +"""Semantic conventions for AgentOps spans""" + +# Time attributes +TIME_START = "time.start" +TIME_END = "time.end" + +# Common attributes (from Event base class) +EVENT_ID = "event.id" +EVENT_TYPE = "event.type" +EVENT_DATA = "event.data" +EVENT_START_TIME = "event.start_time" +EVENT_END_TIME = "event.end_time" +EVENT_PARAMS = "event.params" +EVENT_RETURNS = "event.returns" + +# Session attributes +SESSION_ID = "session.id" +SESSION_TAGS = "session.tags" + +# Agent attributes +AGENT_ID = "agent.id" + +# Thread attributes +THREAD_ID = "thread.id" + +# Error attributes +ERROR = "error" +ERROR_TYPE = "error.type" +ERROR_MESSAGE = "error.message" +ERROR_STACKTRACE = "error.stacktrace" +ERROR_DETAILS = "error.details" +ERROR_CODE = "error.code" +TRIGGER_EVENT_ID = "trigger_event.id" +TRIGGER_EVENT_TYPE = "trigger_event.type" + +# LLM attributes +LLM_MODEL = "llm.model" +LLM_PROMPT = "llm.prompt" +LLM_COMPLETION = "llm.completion" +LLM_TOKENS_TOTAL = "llm.tokens.total" +LLM_TOKENS_PROMPT = "llm.tokens.prompt" +LLM_TOKENS_COMPLETION = "llm.tokens.completion" +LLM_COST = "llm.cost" + +# Action attributes +ACTION_TYPE = "action.type" +ACTION_PARAMS = "action.params" +ACTION_RESULT = "action.result" +ACTION_LOGS = "action.logs" +ACTION_SCREENSHOT = "action.screenshot" + +# Tool attributes +TOOL_NAME = "tool.name" +TOOL_PARAMS = "tool.params" +TOOL_RESULT = "tool.result" +TOOL_LOGS = "tool.logs" + +# Execution attributes +EXECUTION_START_TIME = "execution.start_time" +EXECUTION_END_TIME = "execution.end_time" + +# Log attributes +LOG_SEVERITY = "log.severity" +LOG_MESSAGE = "log.message" +LOG_TIMESTAMP = "log.timestamp" +LOG_THREAD_ID = "log.thread_id" +LOG_SESSION_ID = "log.session_id" +LOG_CONTEXT = "log.context" diff --git a/agentops/telemetry/config.py b/agentops/telemetry/config.py new file mode 100644 index 000000000..0dba712c6 --- /dev/null +++ b/agentops/telemetry/config.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional + +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.sdk.trace.sampling import Sampler + + +@dataclass +class OTELConfig: + """Configuration for OpenTelemetry integration""" + + additional_exporters: Optional[List[SpanExporter]] = None + resource_attributes: Optional[Dict] = None + sampler: Optional[Sampler] = None + retry_config: Optional[Dict] = None + custom_formatters: Optional[List[Callable]] = None + enable_metrics: bool = False + metric_readers: Optional[List] = None + max_queue_size: int = 512 + max_export_batch_size: int = 256 + max_wait_time: int = 5000 + endpoint: str = "https://api.agentops.ai" + api_key: Optional[str] = None diff --git a/agentops/telemetry/encoders.py b/agentops/telemetry/encoders.py new file mode 100644 index 000000000..f1e808248 --- /dev/null +++ b/agentops/telemetry/encoders.py @@ -0,0 +1,169 @@ +""" +Generic encoder for converting dataclasses to OpenTelemetry spans. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Sequence +import json + +from opentelemetry.trace import SpanKind +from opentelemetry.semconv.trace import SpanAttributes + +from ..event import Event, LLMEvent, ActionEvent, ToolEvent, ErrorEvent +from ..enums import EventType + + +@dataclass +class SpanDefinition: + """Definition of a span to be created. + + This class represents a span before it is created, containing + all the necessary information to create the span. + """ + + name: str + attributes: Dict[str, Any] + kind: SpanKind = SpanKind.INTERNAL + parent_span_id: Optional[str] = None + + +class SpanDefinitions(Sequence[SpanDefinition]): + """A sequence of span definitions that supports len() and iteration.""" + + def __init__(self, *spans: SpanDefinition): + self._spans = list(spans) + + def __len__(self) -> int: + return len(self._spans) + + def __iter__(self): + return iter(self._spans) + + def __getitem__(self, index: int) -> SpanDefinition: + return self._spans[index] + + +class EventToSpanEncoder: + """Encodes AgentOps events into OpenTelemetry span definitions.""" + + @classmethod + def encode(cls, event: Event) -> SpanDefinitions: + """Convert an event into span definitions. + + Args: + event: The event to convert + + Returns: + A sequence of span definitions + """ + if isinstance(event, LLMEvent): + return cls._encode_llm_event(event) + elif isinstance(event, ActionEvent): + return cls._encode_action_event(event) + elif isinstance(event, ToolEvent): + return cls._encode_tool_event(event) + elif isinstance(event, ErrorEvent): + return cls._encode_error_event(event) + else: + return cls._encode_generic_event(event) + + @classmethod + def _encode_llm_event(cls, event: LLMEvent) -> SpanDefinitions: + completion_span = SpanDefinition( + name="llm.completion", + attributes={ + "model": event.model, + "prompt": event.prompt, + "completion": event.completion, + "prompt_tokens": event.prompt_tokens, + "completion_tokens": event.completion_tokens, + "cost": event.cost, + "event.start_time": event.init_timestamp, + "event.end_time": event.end_timestamp, + SpanAttributes.CODE_NAMESPACE: event.__class__.__name__, + "event_type": "llms", + }, + ) + + api_span = SpanDefinition( + name="llm.api.call", + kind=SpanKind.CLIENT, + parent_span_id=completion_span.name, + attributes={"model": event.model, "start_time": event.init_timestamp, "end_time": event.end_timestamp}, + ) + + return SpanDefinitions(completion_span, api_span) + + @classmethod + def _encode_action_event(cls, event: ActionEvent) -> SpanDefinitions: + action_span = SpanDefinition( + name="agent.action", + attributes={ + "action_type": event.action_type, + "params": json.dumps(event.params), + "returns": event.returns, + "logs": event.logs, + "event.start_time": event.init_timestamp, + SpanAttributes.CODE_NAMESPACE: event.__class__.__name__, + "event_type": "actions", + }, + ) + + execution_span = SpanDefinition( + name="action.execution", + parent_span_id=action_span.name, + attributes={"start_time": event.init_timestamp, "end_time": event.end_timestamp}, + ) + + return SpanDefinitions(action_span, execution_span) + + @classmethod + def _encode_tool_event(cls, event: ToolEvent) -> SpanDefinitions: + tool_span = SpanDefinition( + name="agent.tool", + attributes={ + "name": event.name, + "params": json.dumps(event.params), + "returns": json.dumps(event.returns), + "logs": json.dumps(event.logs), + SpanAttributes.CODE_NAMESPACE: event.__class__.__name__, + "event_type": "tools", + }, + ) + + execution_span = SpanDefinition( + name="tool.execution", + parent_span_id=tool_span.name, + attributes={"start_time": event.init_timestamp, "end_time": event.end_timestamp}, + ) + + return SpanDefinitions(tool_span, execution_span) + + @classmethod + def _encode_error_event(cls, event: ErrorEvent) -> SpanDefinitions: + error_span = SpanDefinition( + name="error", + attributes={ + "error": True, + "error_type": event.error_type, + "details": event.details, + "trigger_event": event.trigger_event, + SpanAttributes.CODE_NAMESPACE: event.__class__.__name__, + "event_type": "errors", + }, + ) + return SpanDefinitions(error_span) + + @classmethod + def _encode_generic_event(cls, event: Event) -> SpanDefinitions: + """Handle unknown event types with basic attributes.""" + span = SpanDefinition( + name="event", + attributes={ + SpanAttributes.CODE_NAMESPACE: event.__class__.__name__, + "event_type": getattr(event, "event_type", "unknown"), + }, + ) + return SpanDefinitions(span) diff --git a/agentops/telemetry/exporters/__init__.py b/agentops/telemetry/exporters/__init__.py new file mode 100644 index 000000000..9ee7b8068 --- /dev/null +++ b/agentops/telemetry/exporters/__init__.py @@ -0,0 +1 @@ +from .event import EventExporter diff --git a/agentops/telemetry/exporters/event.py b/agentops/telemetry/exporters/event.py new file mode 100644 index 000000000..db2cdfefa --- /dev/null +++ b/agentops/telemetry/exporters/event.py @@ -0,0 +1,160 @@ +import json +import threading +from typing import Callable, Dict, List, Optional, Sequence, Any, cast +from uuid import UUID, uuid4 + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.util.types import Attributes + +from agentops.api.session import SessionApiClient +from agentops.helpers import get_ISO_time +from agentops.log_config import logger +import agentops.telemetry.attributes as attrs + +EVENT_DATA = "event.data" +EVENT_ID = "event.id" +EVENT_START_TIME = "event.timestamp" +EVENT_END_TIME = "event.end_timestamp" +AGENT_ID = "agent.id" + + +class EventExporter(SpanExporter): + """ + Exports agentops.event.Event to AgentOps servers. + """ + + def __init__( + self, + session_id: UUID, + endpoint: str, + jwt: str, + api_key: str, + retry_config: Optional[Dict] = None, + custom_formatters: Optional[List[Callable]] = None, + ): + self.session_id = session_id + self._api = SessionApiClient(endpoint=endpoint, session_id=session_id, api_key=api_key, jwt=jwt) + self._export_lock = threading.Lock() + self._shutdown = threading.Event() + self._wait_event = threading.Event() + self._wait_fn = self._wait_event.wait # Store the wait function + + # Allow custom retry configuration + retry_config = retry_config or {} + self._retry_count = retry_config.get("retry_count", 3) + self._retry_delay = retry_config.get("retry_delay", 1.0) + + # Support custom formatters + self._custom_formatters = custom_formatters or [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Export spans with retry logic and proper error handling""" + if self._shutdown.is_set(): + return SpanExportResult.SUCCESS + + with self._export_lock: + try: + if not spans: + return SpanExportResult.SUCCESS + + events = self._format_spans(spans) + if not events: # Skip if no events were formatted + return SpanExportResult.SUCCESS + + for attempt in range(self._retry_count): + try: + success = self._send_batch(events) + if success: + return SpanExportResult.SUCCESS + + # If not successful but not the last attempt, wait and retry + if attempt < self._retry_count - 1: + self._wait_before_retry(attempt) + continue + + except Exception as e: + logger.error(f"Export attempt {attempt + 1} failed: {e}") + if attempt < self._retry_count - 1: + self._wait_before_retry(attempt) + continue + return SpanExportResult.FAILURE + + # If we've exhausted all retries without success + return SpanExportResult.FAILURE + + except Exception as e: + logger.error(f"Error during span export: {e}") + return SpanExportResult.FAILURE + + def _format_spans(self, spans: Sequence[ReadableSpan]) -> List[Dict[str, Any]]: + """Format spans into AgentOps event format with custom formatters""" + events = [] + for span in spans: + try: + # Get base event data + attrs_dict = span.attributes or {} + event_data_str = attrs_dict.get(EVENT_DATA, "{}") + if isinstance(event_data_str, (str, bytes, bytearray)): + event_data = json.loads(event_data_str) + else: + event_data = {} + + # Ensure required fields + event = { + "id": attrs_dict.get(EVENT_ID) or str(uuid4()), + "event_type": span.name, + "init_timestamp": attrs_dict.get(EVENT_START_TIME) or get_ISO_time(), + "end_timestamp": attrs_dict.get(EVENT_END_TIME) or get_ISO_time(), + # Always include session_id from the exporter + "session_id": str(self.session_id), + } + + # Add agent ID if present + agent_id = attrs_dict.get(AGENT_ID) + if agent_id: + event["agent_id"] = agent_id + + # Add event-specific data, but ensure session_id isn't overwritten + event_data["session_id"] = str(self.session_id) + event.update(event_data) + + # Apply custom formatters + for formatter in self._custom_formatters: + try: + event = formatter(event) + # Ensure session_id isn't removed by formatters + event["session_id"] = str(self.session_id) + except Exception as e: + logger.error(f"Custom formatter failed: {e}") + + events.append(event) + except Exception as e: + logger.error(f"Error formatting span: {e}") + + return events + + def _send_batch(self, events: List[Dict[str, Any]]) -> bool: + """Send a batch of events to the AgentOps backend""" + try: + return self._api.create_events(events) + except Exception as e: + logger.error(f"Error sending batch: {str(e)}", exc_info=e) + return False + + def _wait_before_retry(self, attempt: int): + """Implement exponential backoff for retries""" + delay = self._retry_delay * (2**attempt) + self._wait_fn(delay) # Use the wait function + + def _set_wait_fn(self, wait_fn): + """Test helper to override wait behavior""" + self._wait_fn = wait_fn + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + """Force flush any pending exports""" + return True + + def shutdown(self) -> None: + """Shutdown the exporter gracefully""" + self._shutdown.set() diff --git a/agentops/telemetry/exporters/session.py b/agentops/telemetry/exporters/session.py new file mode 100644 index 000000000..f3ad82a2c --- /dev/null +++ b/agentops/telemetry/exporters/session.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +import json +import threading +from typing import TYPE_CHECKING, Optional, Sequence +from uuid import UUID, uuid4 +import time + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + +from agentops.helpers import filter_unjsonable, get_ISO_time +from agentops.api.base import ApiClient +from agentops.log_config import logger +from agentops.api.session import SessionApiClient + +if TYPE_CHECKING: + from agentops.session import Session + + +class SessionExporter(SpanExporter): + """Manages publishing events for Session""" + + def __init__( + self, + session: Optional[Session] = None, + session_id: Optional[UUID] = None, + endpoint: Optional[str] = None, + jwt: Optional[str] = None, + api_key: Optional[str] = None, + **kwargs, + ): + """Initialize SessionExporter with either a Session object or individual parameters. + + Args: + session: Session object containing all required parameters + session_id: UUID for the session (if not using session object) + endpoint: API endpoint (if not using session object) + jwt: JWT token for authentication (if not using session object) + api_key: API key for authentication (if not using session object) + """ + self._shutdown = threading.Event() + self._export_lock = threading.Lock() + + if session: + self.session = session + self.session_id = session.session_id + self._api = session._api + else: + if not all([session_id, endpoint, api_key]): + raise ValueError("Must provide either session object or all individual parameters") + self.session = None + self.session_id = session_id + assert session_id is not None # for type checker + assert endpoint is not None # for type checker + assert api_key is not None # for type checker + self._api = SessionApiClient( + endpoint=endpoint, + session_id=session_id, + api_key=api_key, + jwt=jwt or "", # jwt can be empty string if not provided + ) + + super().__init__(**kwargs) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if self._shutdown.is_set(): + return SpanExportResult.SUCCESS + + with self._export_lock: + try: + if not spans: + return SpanExportResult.SUCCESS + + events = [] + for span in spans: + attrs = span.attributes or {} + event_data_str = attrs.get("event.data", "{}") + if isinstance(event_data_str, (str, bytes, bytearray)): + event_data = json.loads(event_data_str) + else: + event_data = {} + + # Format event data based on event type + if span.name == "actions": + formatted_data = { + "action_type": event_data.get("action_type", event_data.get("name", "unknown_action")), + "params": event_data.get("params", {}), + "returns": event_data.get("returns"), + } + elif span.name == "tools": + formatted_data = { + "name": event_data.get("name", event_data.get("tool_name", "unknown_tool")), + "params": event_data.get("params", {}), + "returns": event_data.get("returns"), + } + else: + formatted_data = event_data + + formatted_data = {**event_data, **formatted_data} + + # Get timestamps and ID, providing defaults + init_timestamp = attrs.get("event.timestamp") or get_ISO_time() + end_timestamp = attrs.get("event.end_timestamp") or get_ISO_time() + event_id = attrs.get("event.id") or str(uuid4()) + + events.append( + filter_unjsonable( + { + "id": event_id, + "event_type": span.name, + "init_timestamp": init_timestamp, + "end_timestamp": end_timestamp, + **formatted_data, + "session_id": str(self.session_id), + } + ) + ) + + # Only make HTTP request if we have events and not shutdown + if events: + retry_count = 3 # Match EventExporter retry count + for attempt in range(retry_count): + try: + success = self._api.create_events(events) + if success: + return SpanExportResult.SUCCESS + + # If not successful but not the last attempt, wait and retry + if attempt < retry_count - 1: + delay = 1.0 * (2**attempt) # Exponential backoff + time.sleep(delay) + continue + + except Exception as e: + logger.error(f"Export attempt {attempt + 1} failed: {e}") + if attempt < retry_count - 1: + delay = 1.0 * (2**attempt) # Exponential backoff + time.sleep(delay) + continue + return SpanExportResult.FAILURE + + # If we've exhausted all retries without success + return SpanExportResult.FAILURE + + return SpanExportResult.SUCCESS + + except Exception as e: + logger.error(f"Failed to export spans: {e}") + return SpanExportResult.FAILURE + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + return True + + def shutdown(self) -> None: + """Handle shutdown gracefully""" + self._shutdown.set() diff --git a/agentops/telemetry/manager.py b/agentops/telemetry/manager.py new file mode 100644 index 000000000..ffd2b6b87 --- /dev/null +++ b/agentops/telemetry/manager.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import logging +import sys +from typing import TYPE_CHECKING, Dict, List, Optional +from uuid import UUID + +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import SpanProcessor, TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.sampling import ParentBased, Sampler, TraceIdRatioBased + +from .config import OTELConfig +from .exporters.session import SessionExporter +from .processors import EventProcessor + +if TYPE_CHECKING: + from opentelemetry.sdk._logs import LoggingHandler + + from agentops.client import Client + + +class TelemetryManager: + """Manages OpenTelemetry instrumentation for AgentOps. + + Responsibilities: + 1. Configure and manage TracerProvider + 2. Handle resource attributes and sampling + 3. Manage session-specific exporters and processors + 4. Coordinate telemetry lifecycle + 5. Handle logging setup and configuration + + Architecture: + TelemetryManager + | + |-- TracerProvider (configured with sampling) + |-- Resource (service info and attributes) + |-- SessionExporters (per session) + |-- EventProcessors (per session) + |-- LoggingHandler (OTLP logging) + """ + + def __init__(self, client: Optional[Client] = None) -> None: + self._provider: Optional[TracerProvider] = None + self._session_exporters: Dict[UUID, SessionExporter] = {} + self._processors: List[SpanProcessor] = [] + self._log_handler: Optional[LoggingHandler] = None + self.config: Optional[OTELConfig] = None + + if not client: + from agentops.client import Client + + client = Client() + self.client = client + + def set_log_handler(self, log_handler: Optional[LoggingHandler]) -> None: + """Set the OTLP log handler. + + Args: + log_handler: The logging handler to use for OTLP + """ + self._log_handler = log_handler + + def get_log_handler(self) -> Optional[LoggingHandler]: + """Get the current OTLP log handler. + + Returns: + The current logging handler if set, None otherwise + """ + return self._log_handler + + def add_telemetry_log_handler(self, logger: logging.Logger) -> None: + """Add the OTLP log handler to the given logger if configured. + + Args: + logger: The logger to add the handler to + """ + if self._log_handler: + logger.addHandler(self._log_handler) + + def initialize(self, config: OTELConfig) -> None: + """Initialize telemetry infrastructure. + + Args: + config: OTEL configuration + + Raises: + ValueError: If config is None + """ + if not config: + raise ValueError("Config is required") + + self.config = config + + # Create resource with service info + resource = Resource.create({"service.name": "agentops", **(config.resource_attributes or {})}) + + # Create provider with sampling + sampler = config.sampler or ParentBased(TraceIdRatioBased(0.5)) + self._provider = TracerProvider(resource=resource, sampler=sampler) + + # Set up logging handler with the same resource + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter + + # Set as global provider + trace.set_tracer_provider(self._provider) + + def create_session_tracer(self, session_id: UUID, jwt: str) -> trace.Tracer: + """Create tracer for a new session. + + Args: + session_id: UUID for the session + jwt: JWT token for authentication + + Returns: + Configured tracer for the session + + Raises: + RuntimeError: If telemetry is not initialized + """ + if not self._provider: + raise RuntimeError("Telemetry not initialized") + if not self.config: + raise RuntimeError("Config not initialized") + + # Create exporters + session_exporter = SessionExporter( + session_id=session_id, endpoint=self.config.endpoint, jwt=jwt, api_key=self.config.api_key + ) + + # Create processors + batch_processor = BatchSpanProcessor( + session_exporter, + max_queue_size=self.config.max_queue_size, + max_export_batch_size=self.config.max_export_batch_size, + schedule_delay_millis=self.config.max_wait_time, + ) + + # Wrap with event processor + event_processor = EventProcessor(session_id=session_id, processor=batch_processor) + + # Add processors + self._provider.add_span_processor(event_processor) + self._processors.append(event_processor) + self._session_exporters[session_id] = session_exporter + + # Return session tracer + return self._provider.get_tracer(f"agentops.session.{session_id}") + + def cleanup_session(self, session_id: UUID) -> None: + """Clean up session telemetry resources. + + Args: + session_id: UUID of session to clean up + """ + if session_id in self._session_exporters: + exporter = self._session_exporters[session_id] + exporter.shutdown() + del self._session_exporters[session_id] + + def shutdown(self) -> None: + """Shutdown all telemetry resources.""" + if self._provider: + self._provider.shutdown() + self._provider = None + for exporter in self._session_exporters.values(): + exporter.shutdown() + self._session_exporters.clear() + self._processors.clear() diff --git a/agentops/telemetry/processors.py b/agentops/telemetry/processors.py new file mode 100644 index 000000000..d8fe336b6 --- /dev/null +++ b/agentops/telemetry/processors.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional +from uuid import UUID, uuid4 + +from opentelemetry import trace +from opentelemetry.context import Context, attach, detach, set_value +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor, TracerProvider +from opentelemetry.trace import Status, StatusCode + +from agentops.event import ErrorEvent +from agentops.helpers import get_ISO_time + +from .encoders import EventToSpanEncoder + + +@dataclass +class EventProcessor(SpanProcessor): + """Processes spans for AgentOps events. + + Responsibilities: + 1. Add session context to spans + 2. Track event counts + 3. Handle error propagation + 4. Forward spans to wrapped processor + + Architecture: + EventProcessor + | + |-- Session Context + |-- Event Counting + |-- Error Handling + |-- Wrapped Processor + """ + + session_id: UUID + processor: SpanProcessor + event_counts: Dict[str, int] = field( + default_factory=lambda: {"llms": 0, "tools": 0, "actions": 0, "errors": 0, "apis": 0} + ) + + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + """Process span start, adding session context and common attributes. + + Args: + span: The span being started + parent_context: Optional parent context + """ + if not span.is_recording() or not hasattr(span, "context") or span.context is None: + return + + # Add session context + token = set_value("session.id", str(self.session_id)) + try: + token = attach(token) + + # Add common attributes + span.set_attributes( + { + "session.id": str(self.session_id), + "event.timestamp": get_ISO_time(), + } + ) + + # Update event counts if this is an AgentOps event + if hasattr(span, "attributes") and span.attributes is not None: + event_type = span.attributes.get("event.type") + if event_type in self.event_counts: + self.event_counts[event_type] += 1 + + # Forward to wrapped processor + self.processor.on_start(span, parent_context) + finally: + detach(token) + + def on_end(self, span: ReadableSpan) -> None: + """Process span end, handling error events and forwarding to wrapped processor. + + Args: + span: The span being ended + """ + # Check for None context first + if not span.context: + return + + if not span.context.trace_flags.sampled: + return + + # Handle error events by updating the current span + if hasattr(span, "attributes") and span.attributes is not None: + if "error" in span.attributes: + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + current_span.set_status(Status(StatusCode.ERROR)) + for key, value in span.attributes.items(): + if key.startswith("error."): + current_span.set_attribute(key, value) + + # Forward to wrapped processor + self.processor.on_end(span) + + def shutdown(self) -> None: + """Shutdown the processor.""" + self.processor.shutdown() + + def force_flush(self, timeout_millis: Optional[int] = 30000) -> bool: + """Force flush the processor. + + Args: + timeout_millis: Optional timeout in milliseconds + + Returns: + bool: True if flush succeeded + """ + return self.processor.force_flush(timeout_millis) diff --git a/agentops/time_travel.py b/agentops/time_travel.py index 55ad66629..6ab5ff0bf 100644 --- a/agentops/time_travel.py +++ b/agentops/time_travel.py @@ -1,7 +1,7 @@ import json import yaml import os -from .http_client import HttpClient +from .api.base import ApiClient from .exceptions import ApiServerException from .singleton import singleton diff --git a/pyproject.toml b/pyproject.toml index 8c8598a19..d5330919f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,6 +128,7 @@ line-length = 120 [tool.ruff.lint] ignore = [ "F401", # Unused imports + "F403", # Import * used "E712", # Comparison to True/False "E711", # Comparison to None "E722", # Bare except diff --git a/tests/integration/test_log_capture_integration.py b/tests/integration/test_log_capture_integration.py new file mode 100644 index 000000000..af5b24436 --- /dev/null +++ b/tests/integration/test_log_capture_integration.py @@ -0,0 +1,207 @@ +import sys +import time +import threading +from uuid import uuid4 +from dataclasses import dataclass +from typing import Optional +import pytest +from io import StringIO + +from agentops.telemetry.config import OTELConfig +from agentops.telemetry.manager import TelemetryManager +from agentops.session.log_capture import LogCapture + + +@dataclass +class MockSession: + id: uuid4 + _telemetry: Optional[TelemetryManager] = None + + +@pytest.fixture +def telemetry_setup(): + """Setup and teardown telemetry manager with config""" + telemetry = TelemetryManager() + config = OTELConfig(resource_attributes={"test.attribute": "integration_test"}, endpoint="http://localhost:4317") + telemetry.initialize(config) + yield telemetry + telemetry.shutdown() + + +@pytest.fixture +def session(telemetry_setup): + """Create a session with telemetry""" + return MockSession(id=uuid4(), _telemetry=telemetry_setup) + + +@pytest.fixture +def standalone_session(): + """Create a session without telemetry""" + return MockSession(id=uuid4()) + + +def test_basic_output_capture(session): + """Test basic stdout and stderr capture functionality. + + Verifies: + - Basic stdout message capture + - Basic stderr message capture + - Empty line handling (should be ignored) + - Proper stream restoration after capture stops + """ + original_stdout = sys.stdout + original_stderr = sys.stderr + + capture = LogCapture(session) + capture.start() + + try: + print("Test stdout message") + sys.stderr.write("Test stderr message\n") + + # Empty lines should be ignored + print("") + print("\n\n") + + finally: + capture.stop() + + # Verify stdout/stderr are restored to original + assert sys.stdout == original_stdout, "stdout was not properly restored after capture" + assert sys.stderr == original_stderr, "stderr was not properly restored after capture" + + +def test_concurrent_output(session): + """Test concurrent output capture from multiple threads. + + Verifies: + - Thread-safe capture of stdout/stderr + - Correct interleaving of messages from different threads + - Background thread output capture + - Main thread output capture + - Message ordering preservation + """ + capture = LogCapture(session) + capture.start() + + output_received = [] + + def background_task(): + for i in range(3): + time.sleep(0.1) + print(f"Background message {i}") + sys.stderr.write(f"Background error {i}\n") + output_received.append(i) + + try: + thread = threading.Thread(target=background_task) + thread.start() + + # Main thread output + for i in range(3): + time.sleep(0.15) + print(f"Main message {i}") + output_received.append(i) + + thread.join() + + finally: + capture.stop() + + assert len(output_received) == 6, ( + "Expected 6 messages (3 from each thread), but got " f"{len(output_received)} messages" + ) + + +def test_multiple_start_stop(session): + """Test multiple start/stop cycles of the LogCapture. + + Verifies: + - Multiple start/stop cycles work correctly + - Streams are properly restored after each stop + - No resource leaks across cycles + - Consistent behavior across multiple captures + """ + original_stdout = sys.stdout + original_stderr = sys.stderr + + capture = LogCapture(session) + + for cycle in range(3): + capture.start() + print("Test message") + capture.stop() + + # Verify original streams are restored + assert sys.stdout == original_stdout, f"stdout not restored after cycle {cycle + 1}" + assert sys.stderr == original_stderr, f"stderr not restored after cycle {cycle + 1}" + + +def test_standalone_capture(standalone_session): + """Test LogCapture functionality without telemetry manager. + + Verifies: + - Capture works without telemetry manager + - Proper handler creation in standalone mode + - Resource cleanup after capture + - Handler and provider are properly cleaned up + """ + capture = LogCapture(standalone_session) + capture.start() + + try: + print("Standalone test message") + sys.stderr.write("Standalone error message\n") + finally: + capture.stop() + + # Verify handler cleanup + assert capture._handler is None, "LogHandler was not properly cleaned up after standalone capture" + assert capture._logger_provider is None, "LoggerProvider was not properly cleaned up after standalone capture" + + +def test_flush_functionality(session): + """Test the flush operation of LogCapture. + + Verifies: + - Flush operation works correctly + - Messages before and after flush are captured + - No data loss during flush + - Capture continues working after flush + """ + capture = LogCapture(session) + capture.start() + + try: + print("Message before flush") + capture.flush() + print("Message after flush") + finally: + capture.stop() + + +def test_nested_capture(session): + """Test nested LogCapture instances. + + Verifies: + - Multiple capture instances can coexist + - Inner capture doesn't interfere with outer capture + - Proper cleanup of nested captures + - Correct message capture at different nesting levels + """ + outer_capture = LogCapture(session) + inner_capture = LogCapture(session) + + outer_capture.start() + try: + print("Outer message") + + inner_capture.start() + try: + print("Inner message") + finally: + inner_capture.stop() + + print("Back to outer") + finally: + outer_capture.stop() diff --git a/tests/unit/telemetry/conftest.py b/tests/unit/telemetry/conftest.py new file mode 100644 index 000000000..f6d42a46b --- /dev/null +++ b/tests/unit/telemetry/conftest.py @@ -0,0 +1,111 @@ +import pytest +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from agentops.event import ActionEvent, ErrorEvent, LLMEvent, ToolEvent + + +class InstrumentationTester: + """Helper class for testing OTEL instrumentation""" + + def __init__(self): + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = SimpleSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + + # Reset and set global tracer provider + trace_api.set_tracer_provider(self.tracer_provider) + self.memory_exporter.clear() + + def get_finished_spans(self): + return self.memory_exporter.get_finished_spans() + + def clear(self): + """Clear captured spans""" + self.memory_exporter.clear() + + +@pytest.fixture +def instrumentation(): + """Fixture providing instrumentation testing utilities""" + return InstrumentationTester() + + +@pytest.fixture +def mock_llm_event(): + """Creates an LLMEvent for testing""" + return LLMEvent( + prompt="What is the meaning of life?", + completion="42", + model="gpt-4", + prompt_tokens=10, + completion_tokens=1, + cost=0.01, + ) + + +@pytest.fixture +def mock_action_event(): + """Creates an ActionEvent for testing""" + return ActionEvent( + action_type="process_data", + params={"input_file": "data.csv"}, + returns="100 rows processed", + logs="Successfully processed all rows", + ) + + +@pytest.fixture +def mock_tool_event(): + """Creates a ToolEvent for testing""" + return ToolEvent( + name="searchWeb", + params={"query": "python testing"}, + returns=["result1", "result2"], + logs={"status": "success"}, + ) + + +@pytest.fixture +def mock_error_event(): + """Creates an ErrorEvent for testing""" + trigger = ActionEvent(action_type="risky_action") + error = ValueError("Something went wrong") + return ErrorEvent(trigger_event=trigger, exception=error, error_type="ValueError", details="Detailed error info") + + +@pytest.fixture +def mock_span_exporter(): + """Creates an InMemorySpanExporter for testing""" + return InMemorySpanExporter() + + +@pytest.fixture +def tracer_provider(mock_span_exporter): + """Creates a TracerProvider with test exporter""" + provider = TracerProvider() + processor = SimpleSpanProcessor(mock_span_exporter) + provider.add_span_processor(processor) + return provider + + +@pytest.fixture(autouse=True) +def cleanup_telemetry(): + """Cleanup telemetry after each test""" + yield + # Clean up any active telemetry + from agentops import Client + + client = Client() + if hasattr(client, "telemetry"): + try: + if client.telemetry._tracer_provider: + client.telemetry._tracer_provider.shutdown() + if client.telemetry._otel_manager: + client.telemetry._otel_manager.shutdown() + client.telemetry.shutdown() + except Exception: + pass # Ensure cleanup continues even if one step fails diff --git a/tests/unit/telemetry/test_encoders.py b/tests/unit/telemetry/test_encoders.py new file mode 100644 index 000000000..8e276d64d --- /dev/null +++ b/tests/unit/telemetry/test_encoders.py @@ -0,0 +1,111 @@ +import json +import pytest +from opentelemetry.trace import SpanKind + +from agentops.event import Event +from agentops.telemetry.encoders import EventToSpanEncoder, SpanDefinition + + +class TestEventToSpanEncoder: + """Test the Event to Span conversion logic""" + + def test_llm_event_conversion(self, mock_llm_event): + """Test converting LLMEvent to spans""" + span_defs = EventToSpanEncoder.encode(mock_llm_event) + + # Verify we get exactly two spans for LLM events + assert len(span_defs) == 2, f"Expected 2 spans for LLM event, got {len(span_defs)}" + + # Find the spans by name + completion_span = next((s for s in span_defs if s.name == "llm.completion"), None) + api_span = next((s for s in span_defs if s.name == "llm.api.call"), None) + + assert completion_span is not None, "Missing llm.completion span" + assert api_span is not None, "Missing llm.api.call span" + + # Verify completion span attributes + assert completion_span.attributes["model"] == mock_llm_event.model + assert completion_span.attributes["prompt"] == mock_llm_event.prompt + assert completion_span.attributes["completion"] == mock_llm_event.completion + assert completion_span.attributes["prompt_tokens"] == 10 + assert completion_span.attributes["completion_tokens"] == 1 + assert completion_span.attributes["cost"] == 0.01 + assert completion_span.attributes["event.start_time"] == mock_llm_event.init_timestamp + assert completion_span.attributes["event.end_time"] == mock_llm_event.end_timestamp + + # Verify API span attributes and relationships + assert api_span.parent_span_id == completion_span.name + assert api_span.kind == SpanKind.CLIENT + assert api_span.attributes["model"] == mock_llm_event.model + assert api_span.attributes["start_time"] == mock_llm_event.init_timestamp + assert api_span.attributes["end_time"] == mock_llm_event.end_timestamp + + def test_action_event_conversion(self, mock_action_event): + """Test converting ActionEvent to spans""" + span_defs = EventToSpanEncoder.encode(mock_action_event) + + assert len(span_defs) == 2 + action_span = next((s for s in span_defs if s.name == "agent.action"), None) + execution_span = next((s for s in span_defs if s.name == "action.execution"), None) + + assert action_span is not None + assert execution_span is not None + + # Verify action span attributes + assert action_span.attributes["action_type"] == "process_data" + assert json.loads(action_span.attributes["params"]) == {"input_file": "data.csv"} + assert action_span.attributes["returns"] == "100 rows processed" + assert action_span.attributes["logs"] == "Successfully processed all rows" + assert action_span.attributes["event.start_time"] == mock_action_event.init_timestamp + + # Verify execution span + assert execution_span.parent_span_id == action_span.name + assert execution_span.attributes["start_time"] == mock_action_event.init_timestamp + assert execution_span.attributes["end_time"] == mock_action_event.end_timestamp + + def test_tool_event_conversion(self, mock_tool_event): + """Test converting ToolEvent to spans""" + span_defs = EventToSpanEncoder.encode(mock_tool_event) + + assert len(span_defs) == 2 + tool_span = next((s for s in span_defs if s.name == "agent.tool"), None) + execution_span = next((s for s in span_defs if s.name == "tool.execution"), None) + + assert tool_span is not None + assert execution_span is not None + + # Verify tool span attributes + assert tool_span.attributes["name"] == "searchWeb" + assert json.loads(tool_span.attributes["params"]) == {"query": "python testing"} + assert json.loads(tool_span.attributes["returns"]) == ["result1", "result2"] + assert json.loads(tool_span.attributes["logs"]) == {"status": "success"} + + # Verify execution span + assert execution_span.parent_span_id == tool_span.name + assert execution_span.attributes["start_time"] == mock_tool_event.init_timestamp + assert execution_span.attributes["end_time"] == mock_tool_event.end_timestamp + + def test_error_event_conversion(self, mock_error_event): + """Test converting ErrorEvent to spans""" + span_defs = EventToSpanEncoder.encode(mock_error_event) + + assert len(span_defs) == 1 + error_span = span_defs[0] + + # Verify error span attributes + assert error_span.name == "error" + assert error_span.attributes["error"] is True + assert error_span.attributes["error_type"] == "ValueError" + assert error_span.attributes["details"] == "Detailed error info" + assert "trigger_event" in error_span.attributes + + def test_unknown_event_type(self): + """Test handling of unknown event types""" + + class UnknownEvent(Event): + pass + + # Should still work, just with generic event name + span_defs = EventToSpanEncoder.encode(UnknownEvent(event_type="unknown")) + assert len(span_defs) == 1 + assert span_defs[0].name == "event" diff --git a/tests/unit/telemetry/test_exporters.py b/tests/unit/telemetry/test_exporters.py new file mode 100644 index 000000000..ed90a8f72 --- /dev/null +++ b/tests/unit/telemetry/test_exporters.py @@ -0,0 +1,225 @@ +import json +import threading +import time +import uuid +from unittest.mock import Mock, patch + +import pytest +from uuid import UUID +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry.sdk.trace.export import SpanExportResult + +from agentops.telemetry.exporters.event import EventExporter +from agentops.telemetry.exporters.session import SessionExporter + + +@pytest.fixture +def mock_span(): + span = Mock(spec=ReadableSpan) + span.name = "test_span" + span.attributes = { + "event.id": str(uuid.uuid4()), + "event.data": json.dumps({"test": "data"}), + "event.timestamp": "2024-01-01T00:00:00Z", + "event.end_timestamp": "2024-01-01T00:00:01Z", + } + return span + + +@pytest.fixture +def event_exporter(): + return EventExporter( + session_id=uuid.uuid4(), endpoint="http://test-endpoint/v2/create_events", jwt="test-jwt", api_key="test-key" + ) + + +class TestEventExporter: + """Test suite for EventExporter""" + + def test_initialization(self, event_exporter: EventExporter): + """Test exporter initialization""" + assert not event_exporter._shutdown.is_set() + assert isinstance(event_exporter._export_lock, type(threading.Lock())) + assert event_exporter._retry_count == 3 + assert event_exporter._retry_delay == 1.0 + + def test_export_empty_spans(self, event_exporter): + """Test exporting empty spans list""" + result = event_exporter.export([]) + assert result == SpanExportResult.SUCCESS + + def test_export_single_span(self, event_exporter, mock_span): + """Test exporting a single span""" + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + mock_create.return_value = True + + result = event_exporter.export([mock_span]) + assert result == SpanExportResult.SUCCESS + + # Verify request + mock_create.assert_called_once() + call_args = mock_create.call_args[0] + events = call_args[0] + + assert len(events) == 1 + assert events[0]["event_type"] == "test_span" + + def test_export_multiple_spans(self, event_exporter, mock_span): + """Test exporting multiple spans""" + spans = [mock_span, mock_span] + + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + mock_create.return_value = True + + result = event_exporter.export(spans) + assert result == SpanExportResult.SUCCESS + + # Verify request + mock_create.assert_called_once() + call_args = mock_create.call_args[0] + events = call_args[0] + + assert len(events) == 2 + + def test_export_failure_retry(self, event_exporter, mock_span): + """Test retry behavior on export failure""" + mock_wait = Mock() + event_exporter._wait_fn = mock_wait + + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + # Create mock responses with proper return values + mock_create.side_effect = [False, False, True] + + result = event_exporter.export([mock_span]) + assert result == SpanExportResult.SUCCESS + assert mock_create.call_count == 3 + + # Verify exponential backoff delays + assert mock_wait.call_count == 2 + assert mock_wait.call_args_list[0][0][0] == 1.0 + assert mock_wait.call_args_list[1][0][0] == 2.0 + + def test_export_max_retries_exceeded(self, event_exporter, mock_span): + """Test behavior when max retries are exceeded""" + mock_wait = Mock() + event_exporter._wait_fn = mock_wait + + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + # Mock consistently failing response + mock_create.return_value = False + + result = event_exporter.export([mock_span]) + assert result == SpanExportResult.FAILURE + assert mock_create.call_count == event_exporter._retry_count + + # Verify all retries waited + assert mock_wait.call_count == event_exporter._retry_count - 1 + + def test_shutdown_behavior(self, event_exporter, mock_span): + """Test exporter shutdown behavior""" + event_exporter.shutdown() + assert event_exporter._shutdown.is_set() + + # Should return success without exporting + result = event_exporter.export([mock_span]) + assert result == SpanExportResult.SUCCESS + + def test_retry_logic(self, event_exporter, mock_span): + """Verify retry behavior works as expected""" + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + # Create mock responses with proper return values + mock_create.side_effect = [False, False, True] + + result = event_exporter.export([mock_span]) + assert result == SpanExportResult.SUCCESS + assert mock_create.call_count == 3 + + # Verify the events were sent correctly + for call in mock_create.call_args_list: + events = call[0][0] + assert len(events) == 1 + assert "event_type" in events[0] + assert events[0]["event_type"] == "test_span" + + +class TestSessionExporter: + """Test suite for SessionExporter""" + + @pytest.fixture + def test_span(self): + """Create a test span with required attributes""" + span = Mock(spec=ReadableSpan) + span.name = "test_span" + span.attributes = { + "event.id": str(uuid.uuid4()), + "event.data": json.dumps({"test": "data"}), + "event.timestamp": "2024-01-01T00:00:00Z", + "event.end_timestamp": "2024-01-01T00:00:01Z", + } + return span + + @pytest.fixture + def session_exporter(self): + """Create a SessionExporter instance for testing""" + from agentops.session import Session + from agentops.api.session import SessionApiClient + + mock_config = Mock() + mock_config.endpoint = "http://test-endpoint" + mock_config.api_key = "test-key" + + mock_session = Mock(spec=Session) + mock_session.session_id = UUID("00000000-0000-0000-0000-000000000000") + mock_session.jwt = "test-jwt" + mock_session.config = mock_config + + # Create a real API client for the session + mock_session._api = SessionApiClient( + endpoint=mock_config.endpoint, + session_id=mock_session.session_id, + api_key=mock_config.api_key, + jwt=mock_session.jwt, + ) + + return SessionExporter(session=mock_session) + + def test_event_formatting(self, session_exporter, test_span): + """Verify events are formatted correctly""" + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + mock_create.return_value = True + result = session_exporter.export([test_span]) + assert result == SpanExportResult.SUCCESS + + # Verify the formatted event + mock_create.assert_called_once() + call_args = mock_create.call_args[0] + events = call_args[0] + assert len(events) == 1 + event = events[0] + assert "id" in event + assert "event_type" in event + assert "session_id" in event + + def test_retry_logic(self, session_exporter, test_span): + """Verify retry behavior works as expected""" + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + mock_create.side_effect = [False, False, True] + + result = session_exporter.export([test_span]) + assert result == SpanExportResult.SUCCESS + assert mock_create.call_count == 3 + + def test_batch_processing(self, session_exporter, test_span): + """Verify batch processing works correctly""" + with patch("agentops.api.session.SessionApiClient.create_events") as mock_create: + mock_create.return_value = True + spans = [test_span for _ in range(5)] + result = session_exporter.export(spans) + assert result == SpanExportResult.SUCCESS + + # Verify batch was sent correctly + mock_create.assert_called_once() + call_args = mock_create.call_args[0] + events = call_args[0] + assert len(events) == 5 diff --git a/tests/unit/telemetry/test_manager.py b/tests/unit/telemetry/test_manager.py new file mode 100644 index 000000000..bc83cc245 --- /dev/null +++ b/tests/unit/telemetry/test_manager.py @@ -0,0 +1,123 @@ +from unittest.mock import Mock, patch +from uuid import UUID, uuid4 + +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased + +from agentops.telemetry.manager import TelemetryManager +from agentops.telemetry.config import OTELConfig +from agentops.telemetry.exporters.session import SessionExporter +from agentops.telemetry.processors import EventProcessor + + +@pytest.fixture +def config() -> OTELConfig: + """Create test config""" + return OTELConfig( + endpoint="https://test.agentops.ai", + api_key="test-key", + max_queue_size=100, + max_export_batch_size=50, + max_wait_time=1000, + ) + + +@pytest.fixture +def manager() -> TelemetryManager: + """Create test manager""" + return TelemetryManager() + + +class TestTelemetryManager: + def test_initialization(self, manager: TelemetryManager, config: OTELConfig) -> None: + """Test manager initialization""" + manager.initialize(config) + + assert manager.config == config + assert isinstance(manager._provider, TracerProvider) + assert isinstance(manager._provider.sampler, ParentBased) + + # Verify global provider was set + assert trace.get_tracer_provider() == manager._provider + + def test_initialization_with_custom_resource(self, manager: TelemetryManager) -> None: + """Test initialization with custom resource attributes""" + config = OTELConfig( + endpoint="https://test.agentops.ai", + api_key="test-key", + resource_attributes={"custom.attr": "value"}, + max_queue_size=100, + max_export_batch_size=50, + max_wait_time=1000, + ) + + manager.initialize(config) + assert manager._provider is not None + resource = manager._provider.resource + + assert resource.attributes["service.name"] == "agentops" + assert resource.attributes["custom.attr"] == "value" + + def test_create_session_tracer(self, manager: TelemetryManager, config: OTELConfig) -> None: + """Test session tracer creation""" + manager.initialize(config) + session_id = uuid4() + + tracer = manager.create_session_tracer(session_id, "test-jwt") + + # Verify exporter was created + assert session_id in manager._session_exporters + assert isinstance(manager._session_exporters[session_id], SessionExporter) + + # Verify processor was added + assert len(manager._processors) == 1 + assert isinstance(manager._processors[0], EventProcessor) + + # Skip tracer name verification since it's an implementation detail + # The important part is that the tracer is properly configured with exporters and processors + + def test_cleanup_session(self, manager: TelemetryManager, config: OTELConfig) -> None: + """Test session cleanup""" + manager.initialize(config) + session_id = uuid4() + + # Create session + manager.create_session_tracer(session_id, "test-jwt") + exporter = manager._session_exporters[session_id] + + # Clean up + with patch.object(exporter, "shutdown") as mock_shutdown: + manager.cleanup_session(session_id) + mock_shutdown.assert_called_once() + + assert session_id not in manager._session_exporters + + def test_shutdown(self, manager: TelemetryManager, config: OTELConfig) -> None: + """Test manager shutdown""" + manager.initialize(config) + session_id = uuid4() + + # Create session + manager.create_session_tracer(session_id, "test-jwt") + exporter = manager._session_exporters[session_id] + + # Shutdown + with patch.object(exporter, "shutdown") as mock_shutdown: + manager.shutdown() + assert mock_shutdown.called + + assert not manager._session_exporters + assert not manager._processors + assert manager._provider is None + + def test_error_handling(self, manager: TelemetryManager) -> None: + """Test error handling""" + # Test initialization without config + with pytest.raises(ValueError, match="Config is required"): + manager.initialize(None) # type: ignore + + # Test creating tracer without initialization + with pytest.raises(RuntimeError, match="Telemetry not initialized"): + manager.create_session_tracer(uuid4(), "test-jwt") diff --git a/tests/unit/telemetry/test_processors.py b/tests/unit/telemetry/test_processors.py new file mode 100644 index 000000000..bf3f0081b --- /dev/null +++ b/tests/unit/telemetry/test_processors.py @@ -0,0 +1,189 @@ +from unittest.mock import Mock, patch +from typing import List, Any + +import pytest +from opentelemetry.sdk.trace import ReadableSpan, Span +from opentelemetry.trace import SpanContext, TraceFlags, Status, StatusCode +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.context import Context + +from agentops.telemetry.processors import EventProcessor + + +@pytest.fixture +def mock_span_exporter() -> Mock: + """Create a mock span exporter""" + return Mock() + + +def create_mock_span(span_id: int = 123) -> Mock: + """Helper to create consistent mock spans""" + span = Mock(spec=Span) + span.context = Mock(spec=SpanContext, span_id=span_id, trace_flags=TraceFlags(TraceFlags.SAMPLED)) + + # Set up attributes dict and methods + span.attributes = {} + + def set_attributes(attrs: dict) -> None: + span.attributes.update(attrs) + + def set_attribute(key: str, value: Any) -> None: + span.attributes[key] = value + + span.set_attributes = Mock(side_effect=set_attributes) + span.set_attribute = Mock(side_effect=set_attribute) + + span.is_recording.return_value = True + span.set_status = Mock() + + # Set up readable span + mock_readable = Mock(spec=ReadableSpan) + mock_readable.attributes = span.attributes + mock_readable.context = span.context + span._readable_span.return_value = mock_readable + + return span + + +@pytest.fixture +def mock_span() -> Mock: + """Create a mock span with proper attribute handling""" + return create_mock_span() + + +@pytest.fixture +def processor(mock_span_exporter) -> EventProcessor: + """Create a processor for testing""" + batch_processor = BatchSpanProcessor(mock_span_exporter) + return EventProcessor(session_id=123, processor=batch_processor) + + +class TestEventProcessor: + def test_initialization(self, processor: EventProcessor, mock_span_exporter: Mock) -> None: + """Test processor initialization""" + assert processor.session_id == 123 + assert isinstance(processor.processor, BatchSpanProcessor) + assert processor.event_counts == { + "llms": 0, + "tools": 0, + "actions": 0, + "errors": 0, + "apis": 0, + } + + def test_span_processing_lifecycle(self, processor: EventProcessor, mock_span: Mock) -> None: + """Test complete span lifecycle""" + mock_span.attributes["event.type"] = "llms" + + processor.on_start(mock_span) + + assert mock_span.set_attributes.called + assert mock_span.attributes["session.id"] == str(processor.session_id) + assert "event.timestamp" in mock_span.attributes + assert processor.event_counts["llms"] == 1 + + readable_span = mock_span._readable_span() + processor.on_end(readable_span) + + def test_unsampled_span_ignored(self, processor: EventProcessor) -> None: + """Test that unsampled spans are ignored""" + unsampled_span = Mock(spec=Span) + unsampled_span.context = Mock(spec=SpanContext, trace_flags=TraceFlags(TraceFlags.DEFAULT)) + unsampled_span.is_recording.return_value = False + + processor.on_start(unsampled_span) + assert not unsampled_span.set_attributes.called + + def test_span_without_context(self, processor: EventProcessor) -> None: + """Test handling of spans without context""" + span_without_context = Mock(spec=Span) + span_without_context.context = None + span_without_context.is_recording.return_value = True + span_without_context.attributes = {} + + # Should not raise exception and should not call wrapped processor + processor.on_start(span_without_context) + + # Create readable span without context + readable_span = Mock(spec=ReadableSpan) + readable_span.context = None + readable_span.attributes = span_without_context.attributes + + # Should not raise exception and should not call wrapped processor + with patch.object(processor.processor, "on_end") as mock_on_end: + processor.on_end(readable_span) + mock_on_end.assert_not_called() + + # Verify processor still works after handling None context + normal_span = create_mock_span() + with patch.object(processor.processor, "on_start") as mock_on_start: + processor.on_start(normal_span) + mock_on_start.assert_called_once_with(normal_span, None) + + with patch.object(processor.processor, "on_end") as mock_on_end: + processor.on_end(normal_span._readable_span()) + mock_on_end.assert_called_once_with(normal_span._readable_span()) + + def test_concurrent_spans(self, processor: EventProcessor) -> None: + """Test handling multiple spans concurrently""" + spans: List[Mock] = [create_mock_span(i) for i in range(3)] + + for span in spans: + processor.on_start(span) + assert span.attributes["session.id"] == str(processor.session_id) + + for span in reversed(spans): + processor.on_end(span._readable_span()) + + def test_error_span_handling(self, processor: EventProcessor) -> None: + """Test handling of error spans""" + # Create parent span with proper attribute handling + parent_span = create_mock_span(1) + + # Create error span + error_span = create_mock_span(2) + error_span.attributes.update({"error": True, "error.type": "ValueError", "error.message": "Test error"}) + + with patch("opentelemetry.trace.get_current_span", return_value=parent_span): + processor.on_end(error_span._readable_span()) + + # Verify status was set + assert parent_span.set_status.called + status_args = parent_span.set_status.call_args[0][0] + assert status_args.status_code == StatusCode.ERROR + + # Verify error attributes were set correctly + assert parent_span.set_attribute.call_args_list == [ + (("error.type", "ValueError"), {}), + (("error.message", "Test error"), {}), + ] + + def test_event_counting(self, processor: EventProcessor) -> None: + """Test event counting for different event types""" + for event_type in processor.event_counts.keys(): + span = create_mock_span() + span.attributes["event.type"] = event_type + + processor.on_start(span) + assert processor.event_counts[event_type] == 1 + + def test_processor_shutdown(self, processor: EventProcessor) -> None: + """Test processor shutdown""" + with patch.object(processor.processor, "shutdown") as mock_shutdown: + processor.shutdown() + mock_shutdown.assert_called_once() + + def test_force_flush(self, processor: EventProcessor) -> None: + """Test force flush""" + with patch.object(processor.processor, "force_flush") as mock_flush: + mock_flush.return_value = True + assert processor.force_flush() is True + mock_flush.assert_called_once() + + def test_span_attributes_preserved(self, processor: EventProcessor, mock_span: Mock) -> None: + """Test that existing span attributes are preserved""" + mock_span.attributes = {"custom.attr": "value"} + processor.on_start(mock_span) + + assert mock_span.attributes["custom.attr"] == "value" + assert mock_span.attributes["session.id"] == str(processor.session_id) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 3d5d38dd5..a01ca2fd4 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -15,7 +15,7 @@ import agentops from agentops import ActionEvent, Client -from agentops.http_client import HttpClient +from agentops.api.base import ApiClient from agentops.singleton import clear_singletons