Skip to content

Commit e1149f5

Browse files
authored
Configure Unsampled Span Pipeline for Genesis (#390)
## What does this pull request do? Configures unsampled span pipeline when `AGENT_OBSERVABILITY_ENABLED` is enabled. We always send 100% spans to Genesis platform for agent observability because AI applications typically have low throughput traffic patterns and require comprehensive monitoring to catch subtle failure modes like hallucinations. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent aed584f commit e1149f5

File tree

2 files changed

+101
-18
lines changed

2 files changed

+101
-18
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource:
273273
)
274274

275275

276+
def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvider, resource: Resource = None):
277+
if not is_agent_observability_enabled():
278+
return
279+
280+
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
281+
282+
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
283+
284+
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))
285+
286+
276287
def _is_defer_to_workers_enabled():
277288
return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true"
278289

@@ -416,9 +427,14 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
416427
if _is_lambda_environment():
417428
provider.add_span_processor(AwsLambdaSpanProcessor())
418429

430+
# We always send 100% spans to Genesis platform for agent observability because
431+
# AI applications typically have low throughput traffic patterns and require
432+
# comprehensive monitoring to catch subtle failure modes like hallucinations
433+
# and quality degradation that sampling could miss.
419434
# Add session.id baggage attribute to span attributes to support AI Agent use cases
420435
# enabling session ID tracking in spans.
421436
if is_agent_observability_enabled():
437+
_export_unsampled_span_for_agent_observability(provider, resource)
422438

423439
def session_id_predicate(baggage_key: str) -> bool:
424440
return baggage_key == "session.id"

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
_customize_sampler,
2626
_customize_span_exporter,
2727
_customize_span_processors,
28+
_export_unsampled_span_for_agent_observability,
2829
_export_unsampled_span_for_lambda,
2930
_init_logging,
3031
_is_application_signals_enabled,
@@ -361,24 +362,21 @@ def test_customize_span_exporter_with_agent_observability(self):
361362
def test_customize_span_processors_with_agent_observability(self):
362363
mock_tracer_provider: TracerProvider = MagicMock()
363364

364-
# Test that BaggageSpanProcessor is not added when agent observability is disabled
365365
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
366366
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
367367
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
368368

369-
# Reset mock for next test
370369
mock_tracer_provider.reset_mock()
371370

372-
# Test that BaggageSpanProcessor is added when agent observability is enabled
373371
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
374372
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
375-
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
373+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 2)
376374

377-
# Verify the added processor is BaggageSpanProcessor
378-
added_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
379-
self.assertIsInstance(added_processor, BaggageSpanProcessor)
375+
first_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
376+
self.assertIsInstance(first_processor, BatchUnsampledSpanProcessor)
377+
second_processor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
378+
self.assertIsInstance(second_processor, BaggageSpanProcessor)
380379

381-
# Clean up
382380
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
383381

384382
def test_baggage_span_processor_session_id_filtering(self):
@@ -659,18 +657,15 @@ def capture_exporter(*args, **kwargs):
659657

660658
def test_customize_span_processors(self):
661659
mock_tracer_provider: TracerProvider = MagicMock()
662-
# Clean up environment to ensure consistent test state
663660
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
664661
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
665662
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None)
666663

667664
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
668665
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
669666

670-
# Reset mock for next test
671667
mock_tracer_provider.reset_mock()
672668

673-
# Test application signals only
674669
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
675670
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False")
676671
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
@@ -680,19 +675,17 @@ def test_customize_span_processors(self):
680675
second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
681676
self.assertIsInstance(second_processor, AwsSpanMetricsProcessor)
682677

683-
# Reset mock for next test
684678
mock_tracer_provider.reset_mock()
685679

686-
# Test both agent observability and application signals enabled
687680
os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true")
688681
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
689-
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 3)
682+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 4)
690683

691-
# Verify processors are added in the expected order
692684
processors = [call.args[0] for call in mock_tracer_provider.add_span_processor.call_args_list]
693-
self.assertIsInstance(processors[0], BaggageSpanProcessor) # Agent observability processor added first
694-
self.assertIsInstance(processors[1], AttributePropagatingSpanProcessor) # Application signals processors
695-
self.assertIsInstance(processors[2], AwsSpanMetricsProcessor)
685+
self.assertIsInstance(processors[0], BatchUnsampledSpanProcessor)
686+
self.assertIsInstance(processors[1], BaggageSpanProcessor)
687+
self.assertIsInstance(processors[2], AttributePropagatingSpanProcessor)
688+
self.assertIsInstance(processors[3], AwsSpanMetricsProcessor)
696689

697690
def test_customize_span_processors_lambda(self):
698691
mock_tracer_provider: TracerProvider = MagicMock()
@@ -795,6 +788,80 @@ def test_export_unsampled_span_for_lambda(self):
795788
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
796789
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)
797790

791+
# pylint: disable=no-self-use
792+
def test_export_unsampled_span_for_agent_observability(self):
793+
mock_tracer_provider: TracerProvider = MagicMock()
794+
795+
_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
796+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
797+
798+
mock_tracer_provider.reset_mock()
799+
800+
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
801+
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces"
802+
_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
803+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
804+
processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
805+
self.assertIsInstance(processor, BatchUnsampledSpanProcessor)
806+
807+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
808+
os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)
809+
810+
# pylint: disable=no-self-use
811+
def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self):
812+
"""Test that OTLPAwsSpanExporter is used for AWS endpoints"""
813+
mock_tracer_provider: TracerProvider = MagicMock()
814+
815+
with patch(
816+
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.OTLPAwsSpanExporter"
817+
) as mock_aws_exporter:
818+
with patch(
819+
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.BatchUnsampledSpanProcessor"
820+
) as mock_processor:
821+
with patch(
822+
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider"
823+
) as mock_logger_provider:
824+
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
825+
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces"
826+
827+
_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
828+
829+
# Verify OTLPAwsSpanExporter is created with correct parameters
830+
mock_aws_exporter.assert_called_once_with(
831+
endpoint="https://xray.us-east-1.amazonaws.com/v1/traces",
832+
logger_provider=mock_logger_provider.return_value,
833+
)
834+
# Verify BatchUnsampledSpanProcessor wraps the exporter
835+
mock_processor.assert_called_once_with(span_exporter=mock_aws_exporter.return_value)
836+
# Verify processor is added to tracer provider
837+
mock_tracer_provider.add_span_processor.assert_called_once_with(mock_processor.return_value)
838+
839+
# Clean up
840+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
841+
os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)
842+
843+
# pylint: disable=no-self-use
844+
def test_customize_span_processors_calls_export_unsampled_span(self):
845+
"""Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability"""
846+
mock_tracer_provider: TracerProvider = MagicMock()
847+
848+
with patch(
849+
"amazon.opentelemetry.distro.aws_opentelemetry_configurator._export_unsampled_span_for_agent_observability"
850+
) as mock_agent_observability:
851+
# Test that agent observability function is NOT called when disabled
852+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
853+
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
854+
mock_agent_observability.assert_not_called()
855+
856+
# Test that agent observability function is called when enabled
857+
mock_agent_observability.reset_mock()
858+
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
859+
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
860+
mock_agent_observability.assert_called_once_with(mock_tracer_provider, Resource.get_empty())
861+
862+
# Clean up
863+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
864+
798865
def test_customize_metric_exporter(self):
799866
metric_readers = []
800867
views = []

0 commit comments

Comments
 (0)