diff --git a/agentops/__init__.py b/agentops/__init__.py index 4150f839a..657ff63be 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -1,6 +1,6 @@ # agentops/__init__.py import sys -from typing import Optional, List, Union +from typing import Any, Optional, List, Union from .client import Client from .event import Event, ActionEvent, LLMEvent, ToolEvent, ErrorEvent @@ -48,6 +48,9 @@ def init( auto_start_session: Optional[bool] = None, inherited_session_id: Optional[str] = None, skip_auto_end_session: Optional[bool] = None, + exporter: Optional[Any] = None, + exporter_endpoint: Optional[str] = None, + **kwargs, ) -> Union[Session, None]: """ Initializes the AgentOps singleton pattern. @@ -69,6 +72,10 @@ def init( inherited_session_id (optional, str): Init Agentops with an existing Session skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making (i.e. Crew determining when tasks are complete and ending the session) + exporter (Any, optional): Custom OpenTelemetry exporter to use instead of the default SessionExporter. + exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will + be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable. + **kwargs: Additional configuration parameters to be passed to the client. Attributes: """ Client().unsuppress_logs() @@ -94,6 +101,9 @@ def init( instrument_llm_calls=instrument_llm_calls, auto_start_session=auto_start_session, skip_auto_end_session=skip_auto_end_session, + exporter=exporter, + exporter_endpoint=exporter_endpoint, + **kwargs, ) if inherited_session_id is not None: @@ -119,6 +129,9 @@ def configure( instrument_llm_calls: Optional[bool] = None, auto_start_session: Optional[bool] = None, skip_auto_end_session: Optional[bool] = None, + exporter: Optional[Any] = None, + exporter_endpoint: Optional[str] = None, + **kwargs, ): """ Configure the AgentOps Client @@ -134,6 +147,10 @@ def configure( auto_start_session (bool, optional): Whether to start a session automatically when the client is created. skip_auto_end_session (bool, optional): Don't automatically end session based on your framework's decision-making (i.e. Crew determining when tasks are complete and ending the session) + exporter (Any, optional): Custom OpenTelemetry exporter to use instead of the default SessionExporter. + exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will + be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable. + **kwargs: Additional configuration parameters to be passed to the client. """ Client().configure( api_key=api_key, @@ -145,6 +162,9 @@ def configure( instrument_llm_calls=instrument_llm_calls, auto_start_session=auto_start_session, skip_auto_end_session=skip_auto_end_session, + exporter=exporter, + exporter_endpoint=exporter_endpoint, + **kwargs, ) diff --git a/agentops/client.py b/agentops/client.py index fb3e17937..c0015c14c 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -15,7 +15,7 @@ import traceback from decimal import Decimal from functools import cached_property -from typing import List, Optional, Tuple, Union +from typing import Any, List, Optional, Tuple, Union from uuid import UUID, uuid4 from termcolor import colored @@ -60,6 +60,9 @@ def configure( auto_start_session: Optional[bool] = None, skip_auto_end_session: Optional[bool] = None, env_data_opt_out: Optional[bool] = None, + exporter: Optional[Any] = None, + exporter_endpoint: Optional[str] = None, + **kwargs, ): if self.has_sessions: return logger.warning( @@ -78,6 +81,9 @@ def configure( auto_start_session=auto_start_session, skip_auto_end_session=skip_auto_end_session, env_data_opt_out=env_data_opt_out, + exporter=exporter, + exporter_endpoint=exporter_endpoint, + **kwargs, ) def initialize(self) -> Union[Session, None]: diff --git a/agentops/config.py b/agentops/config.py index 7dfb574d2..f42530363 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -1,4 +1,5 @@ -from typing import List, Optional +import os +from typing import Any, List, Optional from uuid import UUID from .log_config import logger @@ -16,6 +17,8 @@ def __init__(self): self.auto_start_session: bool = True self.skip_auto_end_session: bool = False self.env_data_opt_out: bool = False + self.exporter: Optional[Any] = None + self.exporter_endpoint: Optional[str] = None def configure( self, @@ -30,6 +33,9 @@ def configure( auto_start_session: Optional[bool] = None, skip_auto_end_session: Optional[bool] = None, env_data_opt_out: Optional[bool] = None, + exporter: Optional[Any] = None, + exporter_endpoint: Optional[str] = None, + **kwargs, ): if api_key is not None: try: @@ -72,3 +78,11 @@ def configure( if env_data_opt_out is not None: self.env_data_opt_out = env_data_opt_out + + if exporter is not None: + self.exporter = exporter + + if exporter_endpoint is not None: + self.exporter_endpoint = exporter_endpoint + elif os.environ.get("AGENTOPS_EXPORTER_ENDPOINT") is not None: + self.exporter_endpoint = os.environ.get("AGENTOPS_EXPORTER_ENDPOINT") diff --git a/agentops/session.py b/agentops/session.py index 95d1fba15..80daf768c 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -88,6 +88,9 @@ def __init__(self, session: Session, **kwargs): @property def endpoint(self): + # Use exporter_endpoint if provided, otherwise use default endpoint + if self.session.config.exporter_endpoint is not None: + return f"{self.session.config.exporter_endpoint}/v2/create_events" return f"{self.session.config.endpoint}/v2/create_events" def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: @@ -249,7 +252,12 @@ def __init__( self._otel_tracer = self._tracer_provider.get_tracer( f"agentops.session.{str(session_id)}", ) - self._otel_exporter = SessionExporter(session=self) + + # Use custom exporter if provided, otherwise use default SessionExporter + if self.config.exporter is not None: + self._otel_exporter = self.config.exporter + else: + self._otel_exporter = SessionExporter(session=self) # Use smaller batch size and shorter delay to reduce buffering self._span_processor = BatchSpanProcessor( diff --git a/tests/unit/test_custom_exporter.py b/tests/unit/test_custom_exporter.py new file mode 100644 index 000000000..198bb17d6 --- /dev/null +++ b/tests/unit/test_custom_exporter.py @@ -0,0 +1,128 @@ +import os +import pytest +from unittest.mock import MagicMock, patch +from opentelemetry.sdk.trace.export import SpanExporter + +import agentops +from agentops import Client +from agentops.singleton import clear_singletons + + +class TestCustomExporterConfig: + def setup_method(self): + self.api_key = "11111111-1111-4111-8111-111111111111" + clear_singletons() + + def teardown_method(self): + """Clean up after each test""" + agentops.end_all_sessions() + clear_singletons() + + def test_custom_exporter(self, mock_req): + """Test that a custom exporter can be used""" + # Create a mock exporter + mock_exporter = MagicMock(spec=SpanExporter) + + # Initialize agentops with the mock exporter + # This will fail until the implementation is complete + agentops.init(api_key=self.api_key, exporter=mock_exporter, auto_start_session=True) + + # Verify that the mock exporter was used + session = Client()._safe_get_session() + assert session is not None + + # This assertion will fail until the implementation is complete + # The expected behavior is that the custom exporter is used instead of the default + assert session._otel_exporter == mock_exporter + + # Clean up + agentops.end_session("Success") + + def test_exporter_endpoint(self, mock_req): + """Test that exporter_endpoint is correctly configured""" + # Initialize agentops with a custom exporter_endpoint + custom_endpoint = "https://custom.endpoint/api" + + # This will fail until the implementation is complete + agentops.init(api_key=self.api_key, exporter_endpoint=custom_endpoint, auto_start_session=True) + + # Verify that the exporter_endpoint was correctly configured + session = Client()._safe_get_session() + assert session is not None + + # These assertions will fail until the implementation is complete + # The expected behavior is that the custom endpoint is stored in the config + assert session.config.exporter_endpoint == custom_endpoint + + # The SessionExporter should use the custom endpoint for its endpoint property + # The endpoint property in SessionExporter should be updated to use config.exporter_endpoint if set + full_endpoint = f"{custom_endpoint}/v2/create_events" + assert session._otel_exporter.endpoint == full_endpoint + + # Clean up + agentops.end_session("Success") + + def test_environment_variable_exporter_endpoint(self, mock_req, monkeypatch): + """Test that exporter_endpoint from environment variable is correctly configured""" + # Set environment variable + custom_endpoint = "https://env.endpoint/api" + monkeypatch.setenv("AGENTOPS_EXPORTER_ENDPOINT", custom_endpoint) + + # Initialize agentops without explicitly setting exporter_endpoint + # This will fail until the implementation is complete + agentops.init(api_key=self.api_key, auto_start_session=True) + + # Verify that the exporter_endpoint from env var was correctly configured + session = Client()._safe_get_session() + assert session is not None + + # These assertions will fail until the implementation is complete + # The expected behavior is that the environment variable is used for the endpoint + assert session.config.exporter_endpoint == custom_endpoint + + # The SessionExporter should use the environment variable endpoint + full_endpoint = f"{custom_endpoint}/v2/create_events" + assert session._otel_exporter.endpoint == full_endpoint + + # Clean up + agentops.end_session("Success") + + def test_kwargs_passing(self, mock_req): + """Test that additional kwargs are passed through the initialization chain""" + # Initialize agentops with additional kwargs + # This will fail until the implementation is complete + agentops.init(api_key=self.api_key, auto_start_session=True, custom_param1="value1", custom_param2=42) + + # Verify session was created + session = Client()._safe_get_session() + assert session is not None + + # The expected behavior is that the kwargs are passed through the initialization chain + # and stored in the configuration object + + # Clean up + agentops.end_session("Success") + + def test_custom_exporter_with_endpoint(self, mock_req): + """Test that a custom exporter can be used with a custom endpoint""" + # Create a mock exporter + mock_exporter = MagicMock(spec=SpanExporter) + custom_endpoint = "https://custom.endpoint/api" + + # Initialize agentops with both custom exporter and endpoint + # This will fail until the implementation is complete + agentops.init( + api_key=self.api_key, exporter=mock_exporter, exporter_endpoint=custom_endpoint, auto_start_session=True + ) + + # Verify that the mock exporter was used and endpoint was set + session = Client()._safe_get_session() + assert session is not None + + # These assertions will fail until the implementation is complete + # The expected behavior is that the custom exporter is used and endpoint is set + assert session._otel_exporter == mock_exporter + assert session.config.exporter_endpoint == custom_endpoint + + # Clean up + agentops.end_session("Success")