From 73a82f6976f16003528a35fd19d0b407b4826815 Mon Sep 17 00:00:00 2001 From: Michael He Date: Wed, 11 Jun 2025 04:13:36 +0000 Subject: [PATCH 1/4] configure unsampled span pipeline for genesis agent observability --- .../distro/aws_opentelemetry_configurator.py | 18 +++++ .../test_aws_opentelementry_configurator.py | 73 +++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 91afbda1d..0f6873570 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -264,6 +264,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource: ) +def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvider, resource: Resource = None): + if not is_agent_observability_enabled(): + return + + traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) + + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider()) + + trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter)) + + def _is_defer_to_workers_enabled(): return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true" @@ -407,6 +418,13 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) -> if _is_lambda_environment(): provider.add_span_processor(AwsLambdaSpanProcessor()) + # We always send 100% spans to Genesis platform for agent observability because + # AI applications typically have low throughput traffic patterns and require + # comprehensive monitoring to catch subtle failure modes like hallucinations + # and quality degradation that sampling could miss. + if is_agent_observability_enabled(): + _export_unsampled_span_for_agent_observability(provider, resource) + if not _is_application_signals_enabled(): return diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 6d5cfac0f..58878249b 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -25,6 +25,7 @@ _customize_sampler, _customize_span_exporter, _customize_span_processors, + _export_unsampled_span_for_agent_observability, _export_unsampled_span_for_lambda, _init_logging, _is_application_signals_enabled, @@ -682,6 +683,78 @@ def test_export_unsampled_span_for_lambda(self): os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + def test_export_unsampled_span_for_agent_observability(self): + mock_tracer_provider: TracerProvider = MagicMock() + + # Test when agent observability is disabled (default) + _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) + + # Test when agent observability is enabled with AWS endpoint (the default case) + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" + _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1) + processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] + self.assertIsInstance(processor, BatchUnsampledSpanProcessor) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + + def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): + """Test that OTLPAwsSpanExporter is used for AWS endpoints""" + mock_tracer_provider: TracerProvider = MagicMock() + + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.OTLPAwsSpanExporter" + ) as mock_aws_exporter: + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.BatchUnsampledSpanProcessor" + ) as mock_processor: + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider" + ) as mock_logger_provider: + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" + + _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) + + # Verify OTLPAwsSpanExporter is created with correct parameters + mock_aws_exporter.assert_called_once_with( + endpoint="https://xray.us-east-1.amazonaws.com/v1/traces", + logger_provider=mock_logger_provider.return_value, + ) + # Verify BatchUnsampledSpanProcessor wraps the exporter + mock_processor.assert_called_once_with(span_exporter=mock_aws_exporter.return_value) + # Verify processor is added to tracer provider + mock_tracer_provider.add_span_processor.assert_called_once_with(mock_processor.return_value) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + + def test_customize_span_processors_with_agent_observability(self): + """Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability""" + mock_tracer_provider: TracerProvider = MagicMock() + + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._export_unsampled_span_for_agent_observability" + ) as mock_agent_observability: + # Test that agent observability function is NOT called when disabled + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + mock_agent_observability.assert_not_called() + + # Test that agent observability function is called when enabled + mock_agent_observability.reset_mock() + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + mock_agent_observability.assert_called_once_with(mock_tracer_provider, Resource.get_empty()) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + def test_customize_metric_exporter(self): metric_readers = [] views = [] From f3748726e5dcd478d78435fdcf3898ba2e9e0d25 Mon Sep 17 00:00:00 2001 From: Michael He Date: Wed, 11 Jun 2025 17:32:24 +0000 Subject: [PATCH 2/4] ignore no-self-use lint check for new test cases --- .../distro/test_aws_opentelementry_configurator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 58878249b..9cb2b58ea 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -683,6 +683,7 @@ def test_export_unsampled_span_for_lambda(self): os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + # pylint: disable=no-self-use def test_export_unsampled_span_for_agent_observability(self): mock_tracer_provider: TracerProvider = MagicMock() @@ -702,6 +703,7 @@ def test_export_unsampled_span_for_agent_observability(self): os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + # pylint: disable=no-self-use def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): """Test that OTLPAwsSpanExporter is used for AWS endpoints""" mock_tracer_provider: TracerProvider = MagicMock() @@ -734,6 +736,7 @@ def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + # pylint: disable=no-self-use def test_customize_span_processors_with_agent_observability(self): """Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability""" mock_tracer_provider: TracerProvider = MagicMock() From 36198a799299a43d81d902d6b89e3c590015604a Mon Sep 17 00:00:00 2001 From: Michael He Date: Tue, 17 Jun 2025 01:37:57 +0000 Subject: [PATCH 3/4] rename test case method since already taken by baggage session id changes --- .../distro/test_aws_opentelementry_configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 39398fd9e..6dce1946c 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -833,7 +833,7 @@ def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) # pylint: disable=no-self-use - def test_customize_span_processors_with_agent_observability(self): + def test_customize_span_processors_calls_export_unsampled_span(self): """Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability""" mock_tracer_provider: TracerProvider = MagicMock() From bb41e83ffe1b1e4aef3ed743f5b5154585abd6b5 Mon Sep 17 00:00:00 2001 From: Michael He Date: Thu, 26 Jun 2025 04:12:05 +0000 Subject: [PATCH 4/4] update unit tests after merging in baggage processor support changes --- .../test_aws_opentelementry_configurator.py | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 4abb99d0c..66fdac0a4 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -362,24 +362,21 @@ def test_customize_span_exporter_with_agent_observability(self): def test_customize_span_processors_with_agent_observability(self): mock_tracer_provider: TracerProvider = MagicMock() - # Test that BaggageSpanProcessor is not added when agent observability is disabled os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) - # Reset mock for next test mock_tracer_provider.reset_mock() - # Test that BaggageSpanProcessor is added when agent observability is enabled os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" _customize_span_processors(mock_tracer_provider, Resource.get_empty()) - self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 2) - # Verify the added processor is BaggageSpanProcessor - added_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] - self.assertIsInstance(added_processor, BaggageSpanProcessor) + first_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] + self.assertIsInstance(first_processor, BatchUnsampledSpanProcessor) + second_processor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0] + self.assertIsInstance(second_processor, BaggageSpanProcessor) - # Clean up os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) def test_baggage_span_processor_session_id_filtering(self): @@ -660,7 +657,6 @@ def capture_exporter(*args, **kwargs): def test_customize_span_processors(self): mock_tracer_provider: TracerProvider = MagicMock() - # Clean up environment to ensure consistent test state os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None) @@ -668,10 +664,8 @@ def test_customize_span_processors(self): _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) - # Reset mock for next test mock_tracer_provider.reset_mock() - # Test application signals only os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") _customize_span_processors(mock_tracer_provider, Resource.get_empty()) @@ -681,19 +675,17 @@ def test_customize_span_processors(self): second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0] self.assertIsInstance(second_processor, AwsSpanMetricsProcessor) - # Reset mock for next test mock_tracer_provider.reset_mock() - # Test both agent observability and application signals enabled os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true") _customize_span_processors(mock_tracer_provider, Resource.get_empty()) - self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 3) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 4) - # Verify processors are added in the expected order processors = [call.args[0] for call in mock_tracer_provider.add_span_processor.call_args_list] - self.assertIsInstance(processors[0], BaggageSpanProcessor) # Agent observability processor added first - self.assertIsInstance(processors[1], AttributePropagatingSpanProcessor) # Application signals processors - self.assertIsInstance(processors[2], AwsSpanMetricsProcessor) + self.assertIsInstance(processors[0], BatchUnsampledSpanProcessor) + self.assertIsInstance(processors[1], BaggageSpanProcessor) + self.assertIsInstance(processors[2], AttributePropagatingSpanProcessor) + self.assertIsInstance(processors[3], AwsSpanMetricsProcessor) def test_customize_span_processors_lambda(self): mock_tracer_provider: TracerProvider = MagicMock() @@ -800,11 +792,11 @@ def test_export_unsampled_span_for_lambda(self): def test_export_unsampled_span_for_agent_observability(self): mock_tracer_provider: TracerProvider = MagicMock() - # Test when agent observability is disabled (default) _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) - # Test when agent observability is enabled with AWS endpoint (the default case) + mock_tracer_provider.reset_mock() + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) @@ -812,7 +804,6 @@ def test_export_unsampled_span_for_agent_observability(self): processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] self.assertIsInstance(processor, BatchUnsampledSpanProcessor) - # Clean up os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)