Skip to content

Commit 19c2098

Browse files
authored
Add BaggageSpanProcessor to populate baggage attributes into OTel spans (#391)
Add BaggageSpanProcessor support for AI Agent observability for AI Agent use cases - Add BaggageSpanProcessor to span pipeline when AGENT_OBSERVABILITY_ENABLED=true - Support propagating baggage attributes to spans Test updates: - Add test_customize_span_processors_with_agent_observability() to verify BaggageSpanProcessor integration - Update test_customize_span_processors() to test combinations of agent observability and application signals By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 671f5e2 commit 19c2098

File tree

3 files changed

+107
-0
lines changed

3 files changed

+107
-0
lines changed

aws-opentelemetry-distro/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dependencies = [
3434
"opentelemetry-sdk-extension-aws == 2.0.2",
3535
"opentelemetry-propagator-aws-xray == 1.0.1",
3636
"opentelemetry-distro == 0.48b0",
37+
"opentelemetry-processor-baggage == 0.48b0",
3738
"opentelemetry-propagator-ot-trace == 0.48b0",
3839
"opentelemetry-instrumentation == 0.48b0",
3940
"opentelemetry-instrumentation-aws-lambda == 0.48b0",

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
3434
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
3535
from opentelemetry.metrics import set_meter_provider
36+
from opentelemetry.processor.baggage import BaggageSpanProcessor
3637
from opentelemetry.sdk._configuration import (
3738
_get_exporter_names,
3839
_get_id_generator,
@@ -407,6 +408,15 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
407408
if _is_lambda_environment():
408409
provider.add_span_processor(AwsLambdaSpanProcessor())
409410

411+
# Add session.id baggage attribute to span attributes to support AI Agent use cases
412+
# enabling session ID tracking in spans.
413+
if is_agent_observability_enabled():
414+
415+
def session_id_predicate(baggage_key: str) -> bool:
416+
return baggage_key == "session.id"
417+
418+
provider.add_span_processor(BaggageSpanProcessor(session_id_predicate))
419+
410420
if not _is_application_signals_enabled():
411421
return
412422

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
5151
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
5252
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
53+
from opentelemetry.processor.baggage import BaggageSpanProcessor
5354
from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG
5455
from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader
5556
from opentelemetry.sdk.resources import Resource
@@ -340,6 +341,73 @@ def test_customize_span_exporter_with_agent_observability(self):
340341
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
341342
os.environ.pop(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None)
342343

344+
def test_customize_span_processors_with_agent_observability(self):
345+
mock_tracer_provider: TracerProvider = MagicMock()
346+
347+
# Test that BaggageSpanProcessor is not added when agent observability is disabled
348+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
349+
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
350+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
351+
352+
# Reset mock for next test
353+
mock_tracer_provider.reset_mock()
354+
355+
# Test that BaggageSpanProcessor is added when agent observability is enabled
356+
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
357+
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
358+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
359+
360+
# Verify the added processor is BaggageSpanProcessor
361+
added_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
362+
self.assertIsInstance(added_processor, BaggageSpanProcessor)
363+
364+
# Clean up
365+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
366+
367+
def test_baggage_span_processor_session_id_filtering(self):
368+
"""Test that BaggageSpanProcessor only set session.id filter by default"""
369+
370+
# Set up agent observability
371+
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
372+
373+
# Create a new tracer provider for this test
374+
tracer_provider = TracerProvider()
375+
376+
# Add our span processors
377+
_customize_span_processors(tracer_provider, Resource.get_empty())
378+
379+
# Verify that the BaggageSpanProcessor was added
380+
# The _active_span_processor is a composite processor containing all processors
381+
active_processor = tracer_provider._active_span_processor
382+
383+
# Check if it's a composite processor with multiple processors
384+
if hasattr(active_processor, "_span_processors"):
385+
processors = active_processor._span_processors
386+
else:
387+
# If it's a single processor, wrap it in a list
388+
processors = [active_processor]
389+
390+
baggage_processors = [
391+
processor for processor in processors if processor.__class__.__name__ == "BaggageSpanProcessor"
392+
]
393+
self.assertEqual(len(baggage_processors), 1)
394+
395+
# Verify the predicate function only accepts session.id
396+
baggage_processor = baggage_processors[0]
397+
predicate = baggage_processor._baggage_key_predicate
398+
399+
# Test the predicate function directly
400+
self.assertTrue(predicate("session.id"))
401+
self.assertFalse(predicate("user.id"))
402+
self.assertFalse(predicate("request.id"))
403+
self.assertFalse(predicate("other.key"))
404+
self.assertFalse(predicate(""))
405+
self.assertFalse(predicate("session"))
406+
self.assertFalse(predicate("id"))
407+
408+
# Clean up
409+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
410+
343411
def test_customize_span_exporter_sigv4(self):
344412

345413
traces_good_endpoints = [
@@ -574,9 +642,18 @@ def capture_exporter(*args, **kwargs):
574642

575643
def test_customize_span_processors(self):
576644
mock_tracer_provider: TracerProvider = MagicMock()
645+
# Clean up environment to ensure consistent test state
646+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
647+
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
648+
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None)
649+
577650
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
578651
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
579652

653+
# Reset mock for next test
654+
mock_tracer_provider.reset_mock()
655+
656+
# Test application signals only
580657
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
581658
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False")
582659
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
@@ -586,8 +663,27 @@ def test_customize_span_processors(self):
586663
second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
587664
self.assertIsInstance(second_processor, AwsSpanMetricsProcessor)
588665

666+
# Reset mock for next test
667+
mock_tracer_provider.reset_mock()
668+
669+
# Test both agent observability and application signals enabled
670+
os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true")
671+
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
672+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 3)
673+
674+
# Verify processors are added in the expected order
675+
processors = [call.args[0] for call in mock_tracer_provider.add_span_processor.call_args_list]
676+
self.assertIsInstance(processors[0], BaggageSpanProcessor) # Agent observability processor added first
677+
self.assertIsInstance(processors[1], AttributePropagatingSpanProcessor) # Application signals processors
678+
self.assertIsInstance(processors[2], AwsSpanMetricsProcessor)
679+
589680
def test_customize_span_processors_lambda(self):
590681
mock_tracer_provider: TracerProvider = MagicMock()
682+
# Clean up environment to ensure consistent test state
683+
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
684+
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
685+
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)
686+
591687
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
592688
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
593689

0 commit comments

Comments
 (0)