Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource:
)


def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvider, resource: Resource = None):
if not is_agent_observability_enabled():
return

traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)

span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())

trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))


def _is_defer_to_workers_enabled():
return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true"

Expand Down Expand Up @@ -416,9 +427,14 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
if _is_lambda_environment():
provider.add_span_processor(AwsLambdaSpanProcessor())

# We always send 100% spans to Genesis platform for agent observability because
# AI applications typically have low throughput traffic patterns and require
# comprehensive monitoring to catch subtle failure modes like hallucinations
# and quality degradation that sampling could miss.
# Add session.id baggage attribute to span attributes to support AI Agent use cases
# enabling session ID tracking in spans.
if is_agent_observability_enabled():
_export_unsampled_span_for_agent_observability(provider, resource)

def session_id_predicate(baggage_key: str) -> bool:
return baggage_key == "session.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
_customize_sampler,
_customize_span_exporter,
_customize_span_processors,
_export_unsampled_span_for_agent_observability,
_export_unsampled_span_for_lambda,
_init_logging,
_is_application_signals_enabled,
Expand Down Expand Up @@ -361,24 +362,21 @@ def test_customize_span_exporter_with_agent_observability(self):
def test_customize_span_processors_with_agent_observability(self):
mock_tracer_provider: TracerProvider = MagicMock()

# Test that BaggageSpanProcessor is not added when agent observability is disabled
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

# Reset mock for next test
mock_tracer_provider.reset_mock()

# Test that BaggageSpanProcessor is added when agent observability is enabled
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 2)

# Verify the added processor is BaggageSpanProcessor
added_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(added_processor, BaggageSpanProcessor)
first_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(first_processor, BatchUnsampledSpanProcessor)
second_processor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
self.assertIsInstance(second_processor, BaggageSpanProcessor)

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)

def test_baggage_span_processor_session_id_filtering(self):
Expand Down Expand Up @@ -659,18 +657,15 @@ def capture_exporter(*args, **kwargs):

def test_customize_span_processors(self):
mock_tracer_provider: TracerProvider = MagicMock()
# Clean up environment to ensure consistent test state
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None)

_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

# Reset mock for next test
mock_tracer_provider.reset_mock()

# Test application signals only
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False")
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
Expand All @@ -680,19 +675,17 @@ def test_customize_span_processors(self):
second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
self.assertIsInstance(second_processor, AwsSpanMetricsProcessor)

# Reset mock for next test
mock_tracer_provider.reset_mock()

# Test both agent observability and application signals enabled
os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true")
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 3)
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 4)

# Verify processors are added in the expected order
processors = [call.args[0] for call in mock_tracer_provider.add_span_processor.call_args_list]
self.assertIsInstance(processors[0], BaggageSpanProcessor) # Agent observability processor added first
self.assertIsInstance(processors[1], AttributePropagatingSpanProcessor) # Application signals processors
self.assertIsInstance(processors[2], AwsSpanMetricsProcessor)
self.assertIsInstance(processors[0], BatchUnsampledSpanProcessor)
self.assertIsInstance(processors[1], BaggageSpanProcessor)
self.assertIsInstance(processors[2], AttributePropagatingSpanProcessor)
self.assertIsInstance(processors[3], AwsSpanMetricsProcessor)

def test_customize_span_processors_lambda(self):
mock_tracer_provider: TracerProvider = MagicMock()
Expand Down Expand Up @@ -795,6 +788,80 @@ def test_export_unsampled_span_for_lambda(self):
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

# pylint: disable=no-self-use
def test_export_unsampled_span_for_agent_observability(self):
mock_tracer_provider: TracerProvider = MagicMock()

_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

mock_tracer_provider.reset_mock()

os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces"
_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(processor, BatchUnsampledSpanProcessor)

os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)

# pylint: disable=no-self-use
def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self):
"""Test that OTLPAwsSpanExporter is used for AWS endpoints"""
mock_tracer_provider: TracerProvider = MagicMock()

with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.OTLPAwsSpanExporter"
) as mock_aws_exporter:
with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.BatchUnsampledSpanProcessor"
) as mock_processor:
with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider"
) as mock_logger_provider:
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces"

_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())

# Verify OTLPAwsSpanExporter is created with correct parameters
mock_aws_exporter.assert_called_once_with(
endpoint="https://xray.us-east-1.amazonaws.com/v1/traces",
logger_provider=mock_logger_provider.return_value,
)
# Verify BatchUnsampledSpanProcessor wraps the exporter
mock_processor.assert_called_once_with(span_exporter=mock_aws_exporter.return_value)
# Verify processor is added to tracer provider
mock_tracer_provider.add_span_processor.assert_called_once_with(mock_processor.return_value)

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)

# pylint: disable=no-self-use
def test_customize_span_processors_calls_export_unsampled_span(self):
"""Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability"""
mock_tracer_provider: TracerProvider = MagicMock()

with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator._export_unsampled_span_for_agent_observability"
) as mock_agent_observability:
# Test that agent observability function is NOT called when disabled
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
mock_agent_observability.assert_not_called()

# Test that agent observability function is called when enabled
mock_agent_observability.reset_mock()
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
mock_agent_observability.assert_called_once_with(mock_tracer_provider, Resource.get_empty())

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)

def test_customize_metric_exporter(self):
metric_readers = []
views = []
Expand Down
Loading