Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws-opentelemetry-distro/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"opentelemetry-sdk-extension-aws == 2.0.2",
"opentelemetry-propagator-aws-xray == 1.0.1",
"opentelemetry-distro == 0.48b0",
"opentelemetry-processor-baggage == 0.48b0",
"opentelemetry-propagator-ot-trace == 0.48b0",
"opentelemetry-instrumentation == 0.48b0",
"opentelemetry-instrumentation-aws-lambda == 0.48b0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.processor.baggage import BaggageSpanProcessor
from opentelemetry.sdk._configuration import (
_get_exporter_names,
_get_id_generator,
Expand Down Expand Up @@ -407,6 +408,15 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
if _is_lambda_environment():
provider.add_span_processor(AwsLambdaSpanProcessor())

# Add session.id baggage attribute to span attributes to support AI Agent use cases
# enabling session ID tracking in spans.
if is_agent_observability_enabled():

def session_id_predicate(baggage_key: str) -> bool:
return baggage_key == "session.id"

provider.add_span_processor(BaggageSpanProcessor(session_id_predicate))

if not _is_application_signals_enabled():
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.processor.baggage import BaggageSpanProcessor
from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG
from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
Expand Down Expand Up @@ -340,6 +341,73 @@ def test_customize_span_exporter_with_agent_observability(self):
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None)

def test_customize_span_processors_with_agent_observability(self):
mock_tracer_provider: TracerProvider = MagicMock()

# Test that BaggageSpanProcessor is not added when agent observability is disabled
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

# Reset mock for next test
mock_tracer_provider.reset_mock()

# Test that BaggageSpanProcessor is added when agent observability is enabled
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)

# Verify the added processor is BaggageSpanProcessor
added_processor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(added_processor, BaggageSpanProcessor)

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)

def test_baggage_span_processor_session_id_filtering(self):
"""Test that BaggageSpanProcessor only set session.id filter by default"""

# Set up agent observability
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"

# Create a new tracer provider for this test
tracer_provider = TracerProvider()

# Add our span processors
_customize_span_processors(tracer_provider, Resource.get_empty())

# Verify that the BaggageSpanProcessor was added
# The _active_span_processor is a composite processor containing all processors
active_processor = tracer_provider._active_span_processor

# Check if it's a composite processor with multiple processors
if hasattr(active_processor, "_span_processors"):
processors = active_processor._span_processors
else:
# If it's a single processor, wrap it in a list
processors = [active_processor]

baggage_processors = [
processor for processor in processors if processor.__class__.__name__ == "BaggageSpanProcessor"
]
self.assertEqual(len(baggage_processors), 1)

# Verify the predicate function only accepts session.id
baggage_processor = baggage_processors[0]
predicate = baggage_processor._baggage_key_predicate

# Test the predicate function directly
self.assertTrue(predicate("session.id"))
self.assertFalse(predicate("user.id"))
self.assertFalse(predicate("request.id"))
self.assertFalse(predicate("other.key"))
self.assertFalse(predicate(""))
self.assertFalse(predicate("session"))
self.assertFalse(predicate("id"))

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)

def test_customize_span_exporter_sigv4(self):

traces_good_endpoints = [
Expand Down Expand Up @@ -574,9 +642,18 @@ def capture_exporter(*args, **kwargs):

def test_customize_span_processors(self):
mock_tracer_provider: TracerProvider = MagicMock()
# Clean up environment to ensure consistent test state
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None)

_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

# Reset mock for next test
mock_tracer_provider.reset_mock()

# Test application signals only
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False")
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
Expand All @@ -586,8 +663,27 @@ def test_customize_span_processors(self):
second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
self.assertIsInstance(second_processor, AwsSpanMetricsProcessor)

# Reset mock for next test
mock_tracer_provider.reset_mock()

# Test both agent observability and application signals enabled
os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true")
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 3)

# Verify processors are added in the expected order
processors = [call.args[0] for call in mock_tracer_provider.add_span_processor.call_args_list]
self.assertIsInstance(processors[0], BaggageSpanProcessor) # Agent observability processor added first
self.assertIsInstance(processors[1], AttributePropagatingSpanProcessor) # Application signals processors
self.assertIsInstance(processors[2], AwsSpanMetricsProcessor)

def test_customize_span_processors_lambda(self):
mock_tracer_provider: TracerProvider = MagicMock()
# Clean up environment to ensure consistent test state
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

Expand Down
Loading