Skip to content

Commit 07badbc

Browse files
authored
Export unsampled span in AWS Lambda environment (#247)
*Description of changes:* When AppSignals is enabled in AWS Lambda, we export both sampled and unsampled spans to X-Ray to generate Application Signals metrics. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 51419cf commit 07badbc

File tree

7 files changed

+164
-5
lines changed

7 files changed

+164
-5
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
AWS_REMOTE_RESOURCE_IDENTIFIER: str = "aws.remote.resource.identifier"
1212
AWS_SDK_DESCENDANT: str = "aws.sdk.descendant"
1313
AWS_CONSUMER_PARENT_SPAN_KIND: str = "aws.consumer.parent.span.kind"
14+
AWS_TRACE_FLAG_UNSAMPLED: str = "aws.trace.flag.unsampled"
1415

1516
# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
1617
# TODO:Move to Semantic Conventions when these attributes are added.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
from typing import Optional
5+
6+
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_TRACE_FLAG_UNSAMPLED
7+
from opentelemetry.context import Context
8+
from opentelemetry.sdk.trace import ReadableSpan, Span
9+
from opentelemetry.sdk.trace.export import BatchSpanProcessor as BaseBatchSpanProcessor
10+
11+
logger = logging.getLogger(__name__)
12+
13+
SPANUNSAMPLED_FLAG = "OTEL_AWS_APP_SIGNALS_ENABLED"
14+
15+
16+
class BatchUnsampledSpanProcessor(BaseBatchSpanProcessor):
17+
18+
# pylint: disable=no-self-use
19+
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
20+
if not span.context.trace_flags.sampled:
21+
span.set_attribute(AWS_TRACE_FLAG_UNSAMPLED, True)
22+
23+
def on_end(self, span: ReadableSpan) -> None:
24+
if span.context.trace_flags.sampled:
25+
return
26+
27+
if self.done:
28+
logger.warning("Already shutdown, dropping span.")
29+
return
30+
31+
if len(self.queue) == self.max_queue_size:
32+
# pylint: disable=access-member-before-definition
33+
if not self._spans_dropped:
34+
logger.warning("Queue is full, likely spans will be dropped.")
35+
# pylint: disable=attribute-defined-outside-init
36+
self._spans_dropped = True
37+
38+
self.queue.appendleft(span)
39+
40+
if len(self.queue) >= self.max_export_batch_size:
41+
with self.condition:
42+
self.condition.notify()

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1313
AttributePropagatingSpanProcessorBuilder,
1414
)
15+
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
1516
from amazon.opentelemetry.distro.aws_metric_attributes_span_exporter_builder import (
1617
AwsMetricAttributesSpanExporterBuilder,
1718
)
@@ -162,6 +163,21 @@ def _init_tracing(
162163
# END The OpenTelemetry Authors code
163164

164165

166+
def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource: Resource = None):
167+
if not _is_application_signals_enabled():
168+
return
169+
if not _is_lambda_environment():
170+
return
171+
172+
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
173+
174+
span_exporter = AwsMetricAttributesSpanExporterBuilder(
175+
OTLPUdpSpanExporter(endpoint=traces_endpoint, sampled=False), resource
176+
).build()
177+
178+
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter))
179+
180+
165181
def _is_defer_to_workers_enabled():
166182
return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true"
167183

@@ -259,6 +275,8 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
259275
if not _is_application_signals_enabled():
260276
return
261277

278+
_export_unsampled_span_for_lambda(provider, resource)
279+
262280
# Construct and set local and remote attributes span processor
263281
provider.add_span_processor(AttributePropagatingSpanProcessorBuilder().build())
264282

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_udp_exporter.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
DEFAULT_ENDPOINT = "127.0.0.1:2000"
2121
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
2222
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
23-
FORMAT_OTEL_TRACES_BINARY_PREFIX = "T1"
23+
24+
# TODO: update sampled and unsampled prefix later
25+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1"
26+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1"
2427

2528
_logger: Logger = getLogger(__name__)
2629

@@ -98,15 +101,21 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
98101

99102

100103
class OTLPUdpSpanExporter(SpanExporter):
101-
def __init__(self, endpoint: Optional[str] = None):
104+
def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):
102105
self._udp_exporter = UdpExporter(endpoint=endpoint)
106+
self._sampled = sampled
103107

104108
@override
105109
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
106110
serialized_data = encode_spans(spans).SerializeToString()
107111

108112
try:
109-
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_TRACES_BINARY_PREFIX)
113+
prefix = (
114+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
115+
if self._sampled
116+
else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
117+
)
118+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)
110119
return SpanExportResult.SUCCESS
111120
except Exception as exc: # pylint: disable=broad-except
112121
_logger.error("Error exporting spans: %s", exc)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from unittest import TestCase
4+
from unittest.mock import MagicMock, patch
5+
6+
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_TRACE_FLAG_UNSAMPLED
7+
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
8+
from opentelemetry.trace import TraceFlags
9+
10+
11+
class TestBatchUnsampledSpanProcessor(TestCase):
12+
13+
def setUp(self):
14+
self.mock_exporter = MagicMock()
15+
self.processor = BatchUnsampledSpanProcessor(self.mock_exporter, max_queue_size=1, max_export_batch_size=1)
16+
17+
@patch("opentelemetry.sdk.trace.Span")
18+
def test_on_end_sampled(self, mock_span_class):
19+
trace_flags = TraceFlags(TraceFlags.SAMPLED)
20+
21+
mock_span = mock_span_class.return_value
22+
mock_span.context.trace_flags = trace_flags
23+
24+
self.processor.on_start(mock_span)
25+
self.processor.on_end(mock_span)
26+
27+
self.assertEqual(len(self.processor.queue), 0)
28+
mock_span.set_attribute.assert_not_called()
29+
30+
@patch("opentelemetry.sdk.trace.Span")
31+
def test_on_end_not_sampled(self, mock_span_class):
32+
33+
trace_flags = TraceFlags(0)
34+
mock_span1 = mock_span_class.return_value
35+
mock_span1.context.trace_flags = trace_flags
36+
37+
self.processor.on_start(mock_span1)
38+
self.processor.on_end(mock_span1)
39+
40+
mock_span2 = mock_span_class.return_value
41+
mock_span2.context.trace_flags = trace_flags
42+
self.processor.on_start(mock_span2)
43+
self.processor.on_end(mock_span2)
44+
45+
self.assertEqual(len(self.processor.queue), 1)
46+
self.assertIn(AWS_TRACE_FLAG_UNSAMPLED, mock_span1.set_attribute.call_args_list[0][0][0])
47+
48+
self.processor.shutdown()
49+
mock_span2 = mock_span_class.return_value
50+
mock_span2.context.trace_flags = trace_flags
51+
self.processor.on_start(mock_span2)
52+
self.processor.on_end(mock_span2)
53+
54+
self.assertEqual(len(self.processor.queue), 0)

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
99
from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor
10+
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
1011
from amazon.opentelemetry.distro.aws_metric_attributes_span_exporter import AwsMetricAttributesSpanExporter
1112
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import (
1213
ApplicationSignalsExporterProvider,
@@ -15,6 +16,7 @@
1516
_customize_exporter,
1617
_customize_sampler,
1718
_customize_span_processors,
19+
_export_unsampled_span_for_lambda,
1820
_is_application_signals_enabled,
1921
_is_defer_to_workers_enabled,
2022
_is_wsgi_master_process,
@@ -37,6 +39,7 @@
3739
from opentelemetry.trace import get_tracer_provider
3840

3941

42+
# pylint: disable=too-many-public-methods
4043
class TestAwsOpenTelemetryConfigurator(TestCase):
4144
"""Tests AwsOpenTelemetryConfigurator and AwsOpenTelemetryDistro
4245
@@ -351,6 +354,19 @@ def test_initialize_components_called_when_deferred_disabled(self, mock_initiali
351354
mock_initialize_components.assert_called_once()
352355
os.environ.pop("IS_WSGI_MASTER_PROCESS_ALREADY_SEEN", None)
353356

357+
def test_export_unsampled_span_for_lambda(self):
358+
mock_tracer_provider: TracerProvider = MagicMock()
359+
_export_unsampled_span_for_lambda(mock_tracer_provider, Resource.get_empty())
360+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)
361+
362+
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
363+
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myfunction")
364+
_export_unsampled_span_for_lambda(mock_tracer_provider, Resource.get_empty())
365+
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
366+
first_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
367+
self.assertIsInstance(first_processor, BatchUnsampledSpanProcessor)
368+
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
369+
354370

355371
def validate_distro_environ():
356372
tc: TestCase = TestCase()

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_udp_exporter.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
from amazon.opentelemetry.distro.otlp_udp_exporter import (
1010
DEFAULT_ENDPOINT,
11+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX,
12+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
1113
PROTOCOL_HEADER,
1214
OTLPUdpMetricExporter,
1315
OTLPUdpSpanExporter,
@@ -90,6 +92,7 @@ def test_export_with_exception(self, mock_udp_exporter, mock_encode_metrics):
9092
def test_shutdown(self, mock_udp_exporter):
9193
mock_udp_exporter_instance = mock_udp_exporter.return_value
9294
exporter = OTLPUdpMetricExporter()
95+
exporter.force_flush()
9396
exporter.shutdown()
9497
mock_udp_exporter_instance.shutdown.assert_called_once()
9598

@@ -98,13 +101,28 @@ class TestOTLPUdpSpanExporter(unittest.TestCase):
98101

99102
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
100103
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
101-
def test_export(self, mock_udp_exporter, mock_encode_spans):
104+
def test_export_unsampled_span(self, mock_udp_exporter, mock_encode_spans):
105+
mock_udp_exporter_instance = mock_udp_exporter.return_value
106+
mock_encoded_data = MagicMock()
107+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
108+
exporter = OTLPUdpSpanExporter(sampled=False)
109+
result = exporter.export(MagicMock())
110+
mock_udp_exporter_instance.send_data.assert_called_once_with(
111+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
112+
)
113+
self.assertEqual(result, SpanExportResult.SUCCESS)
114+
115+
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
116+
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
117+
def test_export_sampled_span(self, mock_udp_exporter, mock_encode_spans):
102118
mock_udp_exporter_instance = mock_udp_exporter.return_value
103119
mock_encoded_data = MagicMock()
104120
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
105121
exporter = OTLPUdpSpanExporter()
106122
result = exporter.export(MagicMock())
107-
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="T1")
123+
mock_udp_exporter_instance.send_data.assert_called_once_with(
124+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
125+
)
108126
self.assertEqual(result, SpanExportResult.SUCCESS)
109127

110128
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
@@ -124,4 +142,5 @@ def test_shutdown(self, mock_udp_exporter):
124142
mock_udp_exporter_instance = mock_udp_exporter.return_value
125143
exporter = OTLPUdpSpanExporter()
126144
exporter.shutdown()
145+
exporter.force_flush()
127146
mock_udp_exporter_instance.shutdown.assert_called_once()

0 commit comments

Comments
 (0)