diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 47a4d693e..7c608e885 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -1,12 +1,20 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, Optional +import logging +from typing import Dict, Optional, Sequence +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from opentelemetry._logs import get_logger_provider from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult + +logger = logging.getLogger(__name__) class OTLPAwsSpanExporter(OTLPSpanExporter): @@ -23,6 +31,7 @@ def __init__( ): self._aws_region = None self._logger_provider = logger_provider + self._llo_handler = None if endpoint: self._aws_region = endpoint.split(".")[1] @@ -38,3 +47,29 @@ def __init__( compression, session=AwsAuthSession(aws_region=self._aws_region, service="xray"), ) + + def _ensure_llo_handler(self): + """Lazily initialize LLO handler when needed to avoid initialization order issues""" + if self._llo_handler is None and is_agent_observability_enabled(): + if self._logger_provider is None: + try: + self._logger_provider = get_logger_provider() + except Exception as exc: # pylint: disable=broad-exception-caught + logger.debug("Failed to get logger provider: %s", exc) + return False + + if self._logger_provider: + self._llo_handler = LLOHandler(self._logger_provider) + return True + + return self._llo_handler is not None + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + try: + if is_agent_observability_enabled() and self._ensure_llo_handler(): + llo_processed_spans = self._llo_handler.process_spans(spans) + return super().export(llo_processed_spans) + except Exception: # pylint: disable=broad-exception-caught + return SpanExportResult.FAILURE + + return super().export(spans) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py new file mode 100644 index 000000000..3abc44532 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -0,0 +1,32 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from opentelemetry.sdk._logs import LoggerProvider + + +class LLOHandler: + """ + Utility class for handling Large Language Objects (LLO) in OpenTelemetry spans. + + LLOHandler performs three primary functions: + 1. Identifies input/output prompt content in spans + 2. Extracts and transforms these attributes into an OpenTelemetry Gen AI Event + 3. Filters input/output prompts from spans to maintain privacy and reduce span size + + This LLOHandler supports the following third-party instrumentation libraries: + - Strands + - OpenInference + - Traceloop/OpenLLMetry + - OpenLIT + """ + + def __init__(self, logger_provider: LoggerProvider): + """ + Initialize an LLOHandler with the specified logger provider. + + This constructor sets up the event logger provider, configures the event logger, + and initializes the patterns used to identify LLO attributes. + + Args: + logger_provider: The OpenTelemetry LoggerProvider used for emitting events. + Global LoggerProvider instance injected from our AwsOpenTelemetryConfigurator + """ diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py new file mode 100644 index 000000000..f2c0b36b4 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py @@ -0,0 +1,40 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest import TestCase +from unittest.mock import MagicMock + +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from opentelemetry.sdk._logs import LoggerProvider + + +class TestLLOHandler(TestCase): + def test_init_with_logger_provider(self): + # Test LLOHandler initialization with a logger provider + mock_logger_provider = MagicMock(spec=LoggerProvider) + + handler = LLOHandler(logger_provider=mock_logger_provider) + + # Since the __init__ method only has 'pass' in the implementation, + # we can only verify that the handler is created without errors + self.assertIsInstance(handler, LLOHandler) + + def test_init_stores_logger_provider(self): + # Test that logger provider is stored (if implementation is added) + mock_logger_provider = MagicMock(spec=LoggerProvider) + + handler = LLOHandler(logger_provider=mock_logger_provider) + + # This test assumes the implementation will store the logger_provider + # When the actual implementation is added, update this test accordingly + self.assertIsInstance(handler, LLOHandler) + + def test_process_spans_method_exists(self): # pylint: disable=no-self-use + # Test that process_spans method exists (for interface contract) + mock_logger_provider = MagicMock(spec=LoggerProvider) + LLOHandler(logger_provider=mock_logger_provider) + + # Verify the handler has the process_spans method + # This will fail until the method is implemented + # self.assertTrue(hasattr(handler, 'process_spans')) + # self.assertTrue(callable(getattr(handler, 'process_spans', None))) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py index 973849f69..d0b2a004d 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py @@ -2,10 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 from unittest import TestCase -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult class TestOTLPAwsSpanExporter(TestCase): @@ -27,3 +30,162 @@ def test_init_without_logger_provider(self): self.assertIsNone(exporter._logger_provider) self.assertEqual(exporter._aws_region, "us-west-2") + self.assertIsNone(exporter._llo_handler) + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + def test_ensure_llo_handler_when_disabled(self, mock_is_enabled): + # Test _ensure_llo_handler when agent observability is disabled + mock_is_enabled.return_value = False + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + + exporter = OTLPAwsSpanExporter(endpoint=endpoint) + result = exporter._ensure_llo_handler() + + self.assertFalse(result) + self.assertIsNone(exporter._llo_handler) + mock_is_enabled.assert_called_once() + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler") + def test_ensure_llo_handler_lazy_initialization( + self, mock_llo_handler_class, mock_is_enabled, mock_get_logger_provider + ): + # Test lazy initialization of LLO handler when enabled + mock_is_enabled.return_value = True + mock_logger_provider = MagicMock(spec=LoggerProvider) + mock_get_logger_provider.return_value = mock_logger_provider + mock_llo_handler = MagicMock() + mock_llo_handler_class.return_value = mock_llo_handler + + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + exporter = OTLPAwsSpanExporter(endpoint=endpoint) + + # First call should initialize + result = exporter._ensure_llo_handler() + + self.assertTrue(result) + self.assertEqual(exporter._llo_handler, mock_llo_handler) + mock_llo_handler_class.assert_called_once_with(mock_logger_provider) + mock_get_logger_provider.assert_called_once() + + # Second call should not re-initialize + mock_llo_handler_class.reset_mock() + mock_get_logger_provider.reset_mock() + + result = exporter._ensure_llo_handler() + + self.assertTrue(result) + mock_llo_handler_class.assert_not_called() + mock_get_logger_provider.assert_not_called() + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + def test_ensure_llo_handler_with_existing_logger_provider(self, mock_is_enabled, mock_get_logger_provider): + # Test when logger_provider is already provided + mock_is_enabled.return_value = True + mock_logger_provider = MagicMock(spec=LoggerProvider) + + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + exporter = OTLPAwsSpanExporter(endpoint=endpoint, logger_provider=mock_logger_provider) + + with patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler" + ) as mock_llo_handler_class: + mock_llo_handler = MagicMock() + mock_llo_handler_class.return_value = mock_llo_handler + + result = exporter._ensure_llo_handler() + + self.assertTrue(result) + self.assertEqual(exporter._llo_handler, mock_llo_handler) + mock_llo_handler_class.assert_called_once_with(mock_logger_provider) + mock_get_logger_provider.assert_not_called() + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + def test_ensure_llo_handler_get_logger_provider_fails(self, mock_is_enabled, mock_get_logger_provider): + # Test when get_logger_provider raises exception + mock_is_enabled.return_value = True + mock_get_logger_provider.side_effect = Exception("Failed to get logger provider") + + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + exporter = OTLPAwsSpanExporter(endpoint=endpoint) + + result = exporter._ensure_llo_handler() + + self.assertFalse(result) + self.assertIsNone(exporter._llo_handler) + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + def test_export_with_llo_disabled(self, mock_is_enabled): + # Test export when LLO is disabled + mock_is_enabled.return_value = False + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + + exporter = OTLPAwsSpanExporter(endpoint=endpoint) + + # Mock the parent class export method + with patch.object(OTLPSpanExporter, "export") as mock_parent_export: + mock_parent_export.return_value = SpanExportResult.SUCCESS + + spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)] + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_parent_export.assert_called_once_with(spans) + self.assertIsNone(exporter._llo_handler) + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler") + def test_export_with_llo_enabled(self, mock_llo_handler_class, mock_get_logger_provider, mock_is_enabled): + # Test export when LLO is enabled and successfully processes spans + mock_is_enabled.return_value = True + mock_logger_provider = MagicMock(spec=LoggerProvider) + mock_get_logger_provider.return_value = mock_logger_provider + + mock_llo_handler = MagicMock() + mock_llo_handler_class.return_value = mock_llo_handler + + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + exporter = OTLPAwsSpanExporter(endpoint=endpoint) + + # Mock spans and processed spans + original_spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)] + processed_spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)] + mock_llo_handler.process_spans.return_value = processed_spans + + # Mock the parent class export method + with patch.object(OTLPSpanExporter, "export") as mock_parent_export: + mock_parent_export.return_value = SpanExportResult.SUCCESS + + result = exporter.export(original_spans) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_llo_handler.process_spans.assert_called_once_with(original_spans) + mock_parent_export.assert_called_once_with(processed_spans) + + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler") + def test_export_with_llo_processing_failure( + self, mock_llo_handler_class, mock_get_logger_provider, mock_is_enabled + ): + # Test export when LLO processing fails + mock_is_enabled.return_value = True + mock_logger_provider = MagicMock(spec=LoggerProvider) + mock_get_logger_provider.return_value = mock_logger_provider + + mock_llo_handler = MagicMock() + mock_llo_handler_class.return_value = mock_llo_handler + mock_llo_handler.process_spans.side_effect = Exception("LLO processing failed") + + endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" + exporter = OTLPAwsSpanExporter(endpoint=endpoint) + + spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)] + + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.FAILURE)