Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def record(event):
def init(
api_key: Optional[str] = None,
endpoint: Optional[str] = None,
app_url: Optional[str] = None,
max_wait_time: Optional[int] = None,
max_queue_size: Optional[int] = None,
tags: Optional[List[str]] = None,
Expand All @@ -50,6 +51,8 @@ def init(
be read from the AGENTOPS_API_KEY environment variable.
endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will
be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'.
app_url (str, optional): The dashboard URL for the AgentOps app. If none is provided, key will
be read from the AGENTOPS_APP_URL environment variable. Defaults to 'https://app.agentops.ai'.
max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue.
Defaults to 5,000 (5 seconds)
max_queue_size (int, optional): The maximum size of the event queue. Defaults to 512.
Expand Down Expand Up @@ -79,6 +82,7 @@ def init(
return _client.init(
api_key=api_key,
endpoint=endpoint,
app_url=app_url,
max_wait_time=max_wait_time,
max_queue_size=max_queue_size,
default_tags=merged_tags,
Expand All @@ -101,6 +105,7 @@ def configure(**kwargs):
**kwargs: Configuration parameters. Supported parameters include:
- api_key: API Key for AgentOps services
- endpoint: The endpoint for the AgentOps service
- app_url: The dashboard URL for the AgentOps app
- max_wait_time: Maximum time to wait in milliseconds before flushing the queue
- max_queue_size: Maximum size of the event queue
- default_tags: Default tags for the sessions
Expand All @@ -118,6 +123,7 @@ def configure(**kwargs):
valid_params = {
"api_key",
"endpoint",
"app_url",
"max_wait_time",
"max_queue_size",
"default_tags",
Expand Down
11 changes: 11 additions & 0 deletions agentops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
class ConfigDict(TypedDict):
api_key: Optional[str]
endpoint: Optional[str]
app_url: Optional[str]
max_wait_time: Optional[int]
export_flush_interval: Optional[int]
max_queue_size: Optional[int]
Expand All @@ -45,6 +46,11 @@ class Config:
metadata={"description": "Base URL for the AgentOps API"},
)

app_url: str = field(
default_factory=lambda: os.getenv("AGENTOPS_APP_URL", "https://app.agentops.ai"),
metadata={"description": "Dashboard URL for the AgentOps application"},
)

max_wait_time: int = field(
default_factory=lambda: get_env_int("AGENTOPS_MAX_WAIT_TIME", 5000),
metadata={"description": "Maximum time in milliseconds to wait for API responses"},
Expand Down Expand Up @@ -124,6 +130,7 @@ def configure(
self,
api_key: Optional[str] = None,
endpoint: Optional[str] = None,
app_url: Optional[str] = None,
max_wait_time: Optional[int] = None,
export_flush_interval: Optional[int] = None,
max_queue_size: Optional[int] = None,
Expand Down Expand Up @@ -151,6 +158,9 @@ def configure(

if endpoint is not None:
self.endpoint = endpoint

if app_url is not None:
self.app_url = app_url

if max_wait_time is not None:
self.max_wait_time = max_wait_time
Expand Down Expand Up @@ -211,6 +221,7 @@ def dict(self):
return {
"api_key": self.api_key,
"endpoint": self.endpoint,
"app_url": self.app_url,
"max_wait_time": self.max_wait_time,
"export_flush_interval": self.export_flush_interval,
"max_queue_size": self.max_queue_size,
Expand Down
43 changes: 43 additions & 0 deletions agentops/helpers/dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Helpers for interacting with the AgentOps dashboard.
"""
from typing import Union
from termcolor import colored
from opentelemetry.sdk.trace import Span, ReadableSpan
from agentops.logging import logger


def get_trace_url(span: Union[Span, ReadableSpan]) -> str:
"""
Generate a trace URL for a direct link to the session on the AgentOps dashboard.

Args:
span: The span to generate the URL for.

Returns:
The session URL.
"""
trace_id: Union[int, str] = span.context.trace_id

# Convert trace_id to hex string if it's not already
# We don't add dashes to this to format it as a UUID since the dashboard doesn't either
if isinstance(trace_id, int):
trace_id = format(trace_id, "032x")

# Get the app_url from the config - import here to avoid circular imports
from agentops import get_client
app_url = get_client().config.app_url

return f"{app_url}/sessions?trace_id={trace_id}"


def log_trace_url(span: Union[Span, ReadableSpan]) -> None:
"""
Log the trace URL for the AgentOps dashboard.

Args:
span: The span to log the URL for.
"""
session_url = get_trace_url(span)
logger.info(colored(f"\x1b[34mSession Replay: {session_url}\x1b[0m", "blue"))

68 changes: 11 additions & 57 deletions agentops/sdk/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@
This module contains processors for OpenTelemetry spans.
"""

import copy
import threading
import time
from threading import Event, Lock, Thread
from typing import Any, Dict, List, Optional
from typing import Dict, Optional

from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
from termcolor import colored

import agentops.semconv as semconv
from agentops.logging import logger
from agentops.sdk.converters import trace_id_to_uuid, uuid_to_int16
from agentops.helpers.dashboard import log_trace_url
from agentops.semconv.core import CoreAttributes


Expand Down Expand Up @@ -89,14 +86,7 @@ class InternalSpanProcessor(SpanProcessor):
For session spans, it prints a URL to the AgentOps dashboard.
"""

def __init__(self, app_url: str = "https://app.agentops.ai"):
"""
Initialize the PrintSpanProcessor.

Args:
app_url: The base URL for the AgentOps dashboard.
"""
self.app_url = app_url
_root_span_id: Optional[Span] = None

def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
"""
Expand All @@ -110,29 +100,10 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None
if not span.context or not span.context.trace_flags.sampled:
return

# Get the span kind from attributes
span_kind = (
span.attributes.get(semconv.SpanAttributes.AGENTOPS_SPAN_KIND, "unknown") if span.attributes else "unknown"
)

# Print basic information about the span
logger.debug(f"Started span: {span.name} (kind: {span_kind})")

# Special handling for session spans
if span_kind == semconv.SpanKind.SESSION:
trace_id = span.context.trace_id
# Convert trace_id to hex string if it's not already
if isinstance(trace_id, int):
session_url = f"{self.app_url}/drilldown?session_id={trace_id_to_uuid(trace_id)}"
logger.info(
colored(
f"\x1b[34mSession started: {session_url}\x1b[0m",
"light_green",
)
)
else:
# Print basic information for other span kinds
logger.debug(f"Ended span: {span.name} (kind: {span_kind})")
if not self._root_span_id:
self._root_span_id = span.context.span_id
logger.debug(f"[agentops.InternalSpanProcessor] Found root span: {span.name}")
log_trace_url(span)

def on_end(self, span: ReadableSpan) -> None:
"""
Expand All @@ -145,30 +116,13 @@ def on_end(self, span: ReadableSpan) -> None:
if not span.context or not span.context.trace_flags.sampled:
return

# Get the span kind from attributes
span_kind = (
span.attributes.get(semconv.SpanAttributes.AGENTOPS_SPAN_KIND, "unknown") if span.attributes else "unknown"
)

# Special handling for session spans
if span_kind == semconv.SpanKind.SESSION:
trace_id = span.context.trace_id
# Convert trace_id to hex string if it's not already
if isinstance(trace_id, int):
session_url = f"{self.app_url}/drilldown?session_id={trace_id_to_uuid(trace_id)}"
logger.info(
colored(
f"\x1b[34mSession Replay: {session_url}\x1b[0m",
"blue",
)
)
else:
# Print basic information for other span kinds
logger.debug(f"Ended span: {span.name} (kind: {span_kind})")
if self._root_span_id and (span.context.span_id is self._root_span_id):
logger.debug(f"[agentops.InternalSpanProcessor] Ending root span: {span.name}")
log_trace_url(span)

def shutdown(self) -> None:
"""Shutdown the processor."""
pass
self._root_span_id = None

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush the processor."""
Expand Down
80 changes: 80 additions & 0 deletions tests/unit/helpers/test_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Unit tests for dashboard URL generation and logging.
"""

import unittest
from unittest.mock import patch, MagicMock

from agentops.helpers.dashboard import get_trace_url, log_trace_url


class TestDashboardHelpers(unittest.TestCase):
"""Tests for dashboard URL generation and logging functions."""

@patch('agentops.get_client')
def test_get_trace_url_with_hex_trace_id(self, mock_get_client):
"""Test get_trace_url with a hexadecimal trace ID."""
# Mock the config's app_url
mock_client = MagicMock()
mock_client.config.app_url = "https://test-app.agentops.ai"
mock_get_client.return_value = mock_client

# Create a mock span with a hex string trace ID (using a full 32-character trace ID)
mock_span = MagicMock()
mock_span.context.trace_id = "1234567890abcdef1234567890abcdef"

# Call get_trace_url
url = get_trace_url(mock_span)

# Assert that the URL is correctly formed with the config's app_url
self.assertEqual(url, "https://test-app.agentops.ai/sessions?trace_id=1234567890abcdef1234567890abcdef")

@patch('agentops.get_client')
def test_get_trace_url_with_int_trace_id(self, mock_get_client):
"""Test get_trace_url with an integer trace ID."""
# Mock the config's app_url
mock_client = MagicMock()
mock_client.config.app_url = "https://test-app.agentops.ai"
mock_get_client.return_value = mock_client

# Create a mock span with an int trace ID
mock_span = MagicMock()
mock_span.context.trace_id = 12345

# Call get_trace_url
url = get_trace_url(mock_span)

# Assert that the URL follows the expected format with a 32-character hex string
self.assertTrue(url.startswith("https://test-app.agentops.ai/sessions?trace_id="))

# Verify the format is a 32-character hex string (no dashes)
hex_part = url.split("trace_id=")[1]
self.assertRegex(hex_part, r"^[0-9a-f]{32}$")

# Verify the value is correctly formatted from the integer 12345
expected_hex = format(12345, "032x")
self.assertEqual(hex_part, expected_hex)

@patch('agentops.helpers.dashboard.logger')
@patch('agentops.get_client')
def test_log_trace_url(self, mock_get_client, mock_logger):
"""Test log_trace_url includes the session URL in the log message."""
# Mock the config's app_url
mock_client = MagicMock()
mock_client.config.app_url = "https://test-app.agentops.ai"
mock_get_client.return_value = mock_client

# Create a mock span
mock_span = MagicMock()
mock_span.context.trace_id = "test-trace-id"

# Mock get_trace_url to return a known value that uses the app_url
expected_url = "https://test-app.agentops.ai/sessions?trace_id=test-trace-id"
with patch('agentops.helpers.dashboard.get_trace_url', return_value=expected_url):
# Call log_trace_url
log_trace_url(mock_span)

# Assert that logger.info was called with a message containing the URL
mock_logger.info.assert_called_once()
log_message = mock_logger.info.call_args[0][0]
self.assertIn(expected_url, log_message)
Loading
Loading