diff --git a/aws-opentelemetry-distro/pyproject.toml b/aws-opentelemetry-distro/pyproject.toml index e49fffbb0..3d8eadbc1 100644 --- a/aws-opentelemetry-distro/pyproject.toml +++ b/aws-opentelemetry-distro/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "opentelemetry-sdk-extension-aws == 2.0.2", "opentelemetry-propagator-aws-xray == 1.0.1", "opentelemetry-distro == 0.48b0", + "opentelemetry-processor-baggage == 0.48b0", "opentelemetry-propagator-ot-trace == 0.48b0", "opentelemetry-instrumentation == 0.48b0", "opentelemetry-instrumentation-aws-lambda == 0.48b0", diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 91afbda1d..a08374bbe 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -33,6 +33,7 @@ from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.metrics import set_meter_provider +from opentelemetry.processor.baggage import BaggageSpanProcessor from opentelemetry.sdk._configuration import ( _get_exporter_names, _get_id_generator, @@ -407,6 +408,15 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) -> if _is_lambda_environment(): provider.add_span_processor(AwsLambdaSpanProcessor()) + # Add session.id baggage attribute to span attributes to support AI Agent use cases + # enabling session ID tracking in spans. + if is_agent_observability_enabled(): + + def session_id_predicate(baggage_key: str) -> bool: + return baggage_key == "session.id" + + provider.add_span_processor(BaggageSpanProcessor(session_id_predicate)) + if not _is_application_signals_enabled(): return diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 6d5cfac0f..13397a0d5 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -50,6 +50,7 @@ from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.processor.baggage import BaggageSpanProcessor from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -340,6 +341,73 @@ def test_customize_span_exporter_with_agent_observability(self): os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) os.environ.pop(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None) + def test_customize_span_processors_with_agent_observability(self): + mock_tracer_provider: TracerProvider = MagicMock() + + # Test that BaggageSpanProcessor is not added when agent observability is disabled + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) + + # Reset mock for next test + mock_tracer_provider.reset_mock() + + # Test that BaggageSpanProcessor is added when agent observability is enabled + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1) + + # Verify the added processor is BaggageSpanProcessor + added_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] + self.assertIsInstance(added_processor, BaggageSpanProcessor) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + + def test_baggage_span_processor_session_id_filtering(self): + """Test that BaggageSpanProcessor only set session.id filter by default""" + + # Set up agent observability + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + + # Create a new tracer provider for this test + tracer_provider = TracerProvider() + + # Add our span processors + _customize_span_processors(tracer_provider, Resource.get_empty()) + + # Verify that the BaggageSpanProcessor was added + # The _active_span_processor is a composite processor containing all processors + active_processor = tracer_provider._active_span_processor + + # Check if it's a composite processor with multiple processors + if hasattr(active_processor, "_span_processors"): + processors = active_processor._span_processors + else: + # If it's a single processor, wrap it in a list + processors = [active_processor] + + baggage_processors = [ + processor for processor in processors if processor.__class__.__name__ == "BaggageSpanProcessor" + ] + self.assertEqual(len(baggage_processors), 1) + + # Verify the predicate function only accepts session.id + baggage_processor = baggage_processors[0] + predicate = baggage_processor._baggage_key_predicate + + # Test the predicate function directly + self.assertTrue(predicate("session.id")) + self.assertFalse(predicate("user.id")) + self.assertFalse(predicate("request.id")) + self.assertFalse(predicate("other.key")) + self.assertFalse(predicate("")) + self.assertFalse(predicate("session")) + self.assertFalse(predicate("id")) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + def test_customize_span_exporter_sigv4(self): traces_good_endpoints = [ @@ -574,9 +642,18 @@ def capture_exporter(*args, **kwargs): def test_customize_span_processors(self): mock_tracer_provider: TracerProvider = MagicMock() + # Clean up environment to ensure consistent test state + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None) + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) + # Reset mock for next test + mock_tracer_provider.reset_mock() + + # Test application signals only os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") _customize_span_processors(mock_tracer_provider, Resource.get_empty()) @@ -586,8 +663,27 @@ def test_customize_span_processors(self): second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0] self.assertIsInstance(second_processor, AwsSpanMetricsProcessor) + # Reset mock for next test + mock_tracer_provider.reset_mock() + + # Test both agent observability and application signals enabled + os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true") + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 3) + + # Verify processors are added in the expected order + processors = [call.args[0] for call in mock_tracer_provider.add_span_processor.call_args_list] + self.assertIsInstance(processors[0], BaggageSpanProcessor) # Agent observability processor added first + self.assertIsInstance(processors[1], AttributePropagatingSpanProcessor) # Application signals processors + self.assertIsInstance(processors[2], AwsSpanMetricsProcessor) + def test_customize_span_processors_lambda(self): mock_tracer_provider: TracerProvider = MagicMock() + # Clean up environment to ensure consistent test state + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)