Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from typing import Dict, Optional
from typing import Dict, Optional, Sequence

from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from amazon.opentelemetry.distro.llo_handler import LLOHandler
from opentelemetry._logs import get_logger_provider
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult


class OTLPAwsSpanExporter(OTLPSpanExporter):
Expand All @@ -23,6 +28,7 @@ def __init__(
):
self._aws_region = None
self._logger_provider = logger_provider
self._llo_handler = None

if endpoint:
self._aws_region = endpoint.split(".")[1]
Expand All @@ -38,3 +44,28 @@ def __init__(
compression,
session=AwsAuthSession(aws_region=self._aws_region, service="xray"),
)

def _ensure_llo_handler(self):
"""Lazily initialize LLO handler when needed to avoid initialization order issues"""
if self._llo_handler is None and is_agent_observability_enabled():
if self._logger_provider is None:
try:
self._logger_provider = get_logger_provider()
except Exception: # pylint: disable=broad-exception-caught
return False

if self._logger_provider:
self._llo_handler = LLOHandler(self._logger_provider)
return True

return self._llo_handler is not None

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
if is_agent_observability_enabled() and self._ensure_llo_handler():
llo_processed_spans = self._llo_handler.process_spans(spans)
return super().export(llo_processed_spans)
except Exception: # pylint: disable=broad-exception-caught
return SpanExportResult.FAILURE

return super().export(spans)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from opentelemetry.sdk._logs import LoggerProvider


class LLOHandler:
"""
Utility class for handling Large Language Objects (LLO) in OpenTelemetry spans.

LLOHandler performs three primary functions:
1. Identifies input/output prompt content in spans
2. Extracts and transforms these attributes into an OpenTelemetry Gen AI Event
3. Filters input/output prompts from spans to maintain privacy and reduce span size

This LLOHandler supports the following third-party instrumentation libraries:
- Strands
- OpenInference
- Traceloop/OpenLLMetry
- OpenLIT
"""

def __init__(self, logger_provider: LoggerProvider):
"""
Initialize an LLOHandler with the specified logger provider.

This constructor sets up the event logger provider, configures the event logger,
and initializes the patterns used to identify LLO attributes.

Args:
logger_provider: The OpenTelemetry LoggerProvider used for emitting events.
Global LoggerProvider instance injected from our AwsOpenTelemetryConfigurator
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from unittest import TestCase
from unittest.mock import MagicMock

from amazon.opentelemetry.distro.llo_handler import LLOHandler
from opentelemetry.sdk._logs import LoggerProvider


class TestLLOHandler(TestCase):
def test_init_with_logger_provider(self):
# Test LLOHandler initialization with a logger provider
mock_logger_provider = MagicMock(spec=LoggerProvider)

handler = LLOHandler(logger_provider=mock_logger_provider)

# Since the __init__ method only has 'pass' in the implementation,
# we can only verify that the handler is created without errors
self.assertIsInstance(handler, LLOHandler)

def test_init_stores_logger_provider(self):
# Test that logger provider is stored (if implementation is added)
mock_logger_provider = MagicMock(spec=LoggerProvider)

handler = LLOHandler(logger_provider=mock_logger_provider)

# This test assumes the implementation will store the logger_provider
# When the actual implementation is added, update this test accordingly
self.assertIsInstance(handler, LLOHandler)

def test_process_spans_method_exists(self): # pylint: disable=no-self-use
# Test that process_spans method exists (for interface contract)
mock_logger_provider = MagicMock(spec=LoggerProvider)
LLOHandler(logger_provider=mock_logger_provider)

# Verify the handler has the process_spans method
# This will fail until the method is implemented
# self.assertTrue(hasattr(handler, 'process_spans'))
# self.assertTrue(callable(getattr(handler, 'process_spans', None)))
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
# SPDX-License-Identifier: Apache-2.0

from unittest import TestCase
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult


class TestOTLPAwsSpanExporter(TestCase):
Expand All @@ -27,3 +30,162 @@ def test_init_without_logger_provider(self):

self.assertIsNone(exporter._logger_provider)
self.assertEqual(exporter._aws_region, "us-west-2")
self.assertIsNone(exporter._llo_handler)

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
def test_ensure_llo_handler_when_disabled(self, mock_is_enabled):
# Test _ensure_llo_handler when agent observability is disabled
mock_is_enabled.return_value = False
endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"

exporter = OTLPAwsSpanExporter(endpoint=endpoint)
result = exporter._ensure_llo_handler()

self.assertFalse(result)
self.assertIsNone(exporter._llo_handler)
mock_is_enabled.assert_called_once()

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler")
def test_ensure_llo_handler_lazy_initialization(
self, mock_llo_handler_class, mock_is_enabled, mock_get_logger_provider
):
# Test lazy initialization of LLO handler when enabled
mock_is_enabled.return_value = True
mock_logger_provider = MagicMock(spec=LoggerProvider)
mock_get_logger_provider.return_value = mock_logger_provider
mock_llo_handler = MagicMock()
mock_llo_handler_class.return_value = mock_llo_handler

endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"
exporter = OTLPAwsSpanExporter(endpoint=endpoint)

# First call should initialize
result = exporter._ensure_llo_handler()

self.assertTrue(result)
self.assertEqual(exporter._llo_handler, mock_llo_handler)
mock_llo_handler_class.assert_called_once_with(mock_logger_provider)
mock_get_logger_provider.assert_called_once()

# Second call should not re-initialize
mock_llo_handler_class.reset_mock()
mock_get_logger_provider.reset_mock()

result = exporter._ensure_llo_handler()

self.assertTrue(result)
mock_llo_handler_class.assert_not_called()
mock_get_logger_provider.assert_not_called()

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
def test_ensure_llo_handler_with_existing_logger_provider(self, mock_is_enabled, mock_get_logger_provider):
# Test when logger_provider is already provided
mock_is_enabled.return_value = True
mock_logger_provider = MagicMock(spec=LoggerProvider)

endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"
exporter = OTLPAwsSpanExporter(endpoint=endpoint, logger_provider=mock_logger_provider)

with patch(
"amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler"
) as mock_llo_handler_class:
mock_llo_handler = MagicMock()
mock_llo_handler_class.return_value = mock_llo_handler

result = exporter._ensure_llo_handler()

self.assertTrue(result)
self.assertEqual(exporter._llo_handler, mock_llo_handler)
mock_llo_handler_class.assert_called_once_with(mock_logger_provider)
mock_get_logger_provider.assert_not_called()

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
def test_ensure_llo_handler_get_logger_provider_fails(self, mock_is_enabled, mock_get_logger_provider):
# Test when get_logger_provider raises exception
mock_is_enabled.return_value = True
mock_get_logger_provider.side_effect = Exception("Failed to get logger provider")

endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"
exporter = OTLPAwsSpanExporter(endpoint=endpoint)

result = exporter._ensure_llo_handler()

self.assertFalse(result)
self.assertIsNone(exporter._llo_handler)

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
def test_export_with_llo_disabled(self, mock_is_enabled):
# Test export when LLO is disabled
mock_is_enabled.return_value = False
endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"

exporter = OTLPAwsSpanExporter(endpoint=endpoint)

# Mock the parent class export method
with patch.object(OTLPSpanExporter, "export") as mock_parent_export:
mock_parent_export.return_value = SpanExportResult.SUCCESS

spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)]
result = exporter.export(spans)

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_parent_export.assert_called_once_with(spans)
self.assertIsNone(exporter._llo_handler)

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler")
def test_export_with_llo_enabled(self, mock_llo_handler_class, mock_get_logger_provider, mock_is_enabled):
# Test export when LLO is enabled and successfully processes spans
mock_is_enabled.return_value = True
mock_logger_provider = MagicMock(spec=LoggerProvider)
mock_get_logger_provider.return_value = mock_logger_provider

mock_llo_handler = MagicMock()
mock_llo_handler_class.return_value = mock_llo_handler

endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"
exporter = OTLPAwsSpanExporter(endpoint=endpoint)

# Mock spans and processed spans
original_spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)]
processed_spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)]
mock_llo_handler.process_spans.return_value = processed_spans

# Mock the parent class export method
with patch.object(OTLPSpanExporter, "export") as mock_parent_export:
mock_parent_export.return_value = SpanExportResult.SUCCESS

result = exporter.export(original_spans)

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_llo_handler.process_spans.assert_called_once_with(original_spans)
mock_parent_export.assert_called_once_with(processed_spans)

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.is_agent_observability_enabled")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.get_logger_provider")
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler")
def test_export_with_llo_processing_failure(
self, mock_llo_handler_class, mock_get_logger_provider, mock_is_enabled
):
# Test export when LLO processing fails
mock_is_enabled.return_value = True
mock_logger_provider = MagicMock(spec=LoggerProvider)
mock_get_logger_provider.return_value = mock_logger_provider

mock_llo_handler = MagicMock()
mock_llo_handler_class.return_value = mock_llo_handler
mock_llo_handler.process_spans.side_effect = Exception("LLO processing failed")

endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces"
exporter = OTLPAwsSpanExporter(endpoint=endpoint)

spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)]

result = exporter.export(spans)

self.assertEqual(result, SpanExportResult.FAILURE)
Loading