Skip to content

Commit 288df57

Browse files
committed
Export unsampled span in AWS Lambda environment
1 parent f2b642e commit 288df57

File tree

6 files changed

+113
-5
lines changed

6 files changed

+113
-5
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
AWS_REMOTE_RESOURCE_IDENTIFIER: str = "aws.remote.resource.identifier"
1212
AWS_SDK_DESCENDANT: str = "aws.sdk.descendant"
1313
AWS_CONSUMER_PARENT_SPAN_KIND: str = "aws.consumer.parent.span.kind"
14+
AWS_TRACE_FLAG_UNSAMPLED: str = "aws.trace.flag.unsampled"
1415

1516
# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
1617
# TODO:Move to Semantic Conventions when these attributes are added.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import logging
2+
from typing import Optional
3+
4+
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_TRACE_FLAG_UNSAMPLED
5+
from opentelemetry.context import Context
6+
from opentelemetry.sdk.trace import ReadableSpan, Span
7+
from opentelemetry.sdk.trace.export import BatchSpanProcessor as BaseBatchSpanProcessor
8+
9+
logger = logging.getLogger(__name__)
10+
11+
SPANUNSAMPLED_FLAG = "OTEL_AWS_APP_SIGNALS_ENABLED"
12+
13+
14+
class BatchUnsampledSpanProcessor(BaseBatchSpanProcessor):
15+
16+
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
17+
if not span.context.trace_flags.sampled:
18+
span.set_attribute(AWS_TRACE_FLAG_UNSAMPLED, "True")
19+
20+
def on_end(self, span: ReadableSpan) -> None:
21+
if span.context.trace_flags.sampled:
22+
return
23+
24+
if self.done:
25+
logger.warning("Already shutdown, dropping span.")
26+
return
27+
28+
if len(self.queue) == self.max_queue_size:
29+
if not self._spans_dropped:
30+
logger.warning("Queue is full, likely spans will be dropped.")
31+
self._spans_dropped = True
32+
33+
self.queue.appendleft(span)
34+
35+
if len(self.queue) >= self.max_export_batch_size:
36+
with self.condition:
37+
self.condition.notify()

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1313
AttributePropagatingSpanProcessorBuilder,
1414
)
15+
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
1516
from amazon.opentelemetry.distro.aws_metric_attributes_span_exporter_builder import (
1617
AwsMetricAttributesSpanExporterBuilder,
1718
)
@@ -153,6 +154,7 @@ def _init_tracing(
153154
span_exporter: SpanExporter = exporter_class(**exporter_args)
154155
span_exporter = _customize_exporter(span_exporter, resource)
155156
trace_provider.add_span_processor(BatchSpanProcessor(span_exporter))
157+
_export_unsampled_span_for_lambda(trace_provider, resource)
156158

157159
_customize_span_processors(trace_provider, resource)
158160

@@ -162,6 +164,21 @@ def _init_tracing(
162164
# END The OpenTelemetry Authors code
163165

164166

167+
def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource: Resource = None):
168+
if not _is_application_signals_enabled():
169+
return
170+
if not _is_lambda_environment():
171+
return
172+
173+
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
174+
175+
span_exporter = AwsMetricAttributesSpanExporterBuilder(
176+
OTLPUdpSpanExporter(endpoint=traces_endpoint, sampled=False), resource
177+
).build()
178+
179+
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter))
180+
181+
165182
def _is_defer_to_workers_enabled():
166183
return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true"
167184

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_udp_exporter.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
DEFAULT_ENDPOINT = "127.0.0.1:2000"
2121
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
2222
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
23-
FORMAT_OTEL_TRACES_BINARY_PREFIX = "T1"
23+
24+
# TODO: update sampled and unsampled prefix later
25+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1"
26+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1"
2427

2528
_logger: Logger = getLogger(__name__)
2629

@@ -98,15 +101,21 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
98101

99102

100103
class OTLPUdpSpanExporter(SpanExporter):
101-
def __init__(self, endpoint: Optional[str] = None):
104+
def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):
102105
self._udp_exporter = UdpExporter(endpoint=endpoint)
106+
self._sampled = sampled
103107

104108
@override
105109
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
106110
serialized_data = encode_spans(spans).SerializeToString()
107111

108112
try:
109-
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_TRACES_BINARY_PREFIX)
113+
prefix = (
114+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
115+
if self._sampled
116+
else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
117+
)
118+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)
110119
return SpanExportResult.SUCCESS
111120
except Exception as exc: # pylint: disable=broad-except
112121
_logger.error("Error exporting spans: %s", exc)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from unittest import TestCase
2+
from unittest.mock import MagicMock, patch
3+
4+
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_TRACE_FLAG_UNSAMPLED
5+
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
6+
from opentelemetry.trace import TraceFlags
7+
8+
9+
class TestBatchUnsampledSpanProcessor(TestCase):
10+
11+
def setUp(self):
12+
self.mock_exporter = MagicMock()
13+
self.processor = BatchUnsampledSpanProcessor(self.mock_exporter)
14+
15+
@patch("opentelemetry.sdk.trace.Span")
16+
def test_on_end_sampled(self, mock_span_class):
17+
trace_flags = TraceFlags(TraceFlags.SAMPLED)
18+
19+
mock_span = mock_span_class.return_value
20+
mock_span.context.trace_flags = trace_flags
21+
22+
self.processor.on_start(mock_span)
23+
self.processor.on_end(mock_span)
24+
25+
self.assertEqual(len(self.processor.queue), 0)
26+
mock_span.set_attribute.assert_not_called()
27+
28+
@patch("opentelemetry.sdk.trace.Span")
29+
def test_on_end_not_sampled(self, mock_span_class):
30+
31+
trace_flags = TraceFlags(0)
32+
mock_span = mock_span_class.return_value
33+
mock_span.context.trace_flags = trace_flags
34+
35+
self.processor.on_start(mock_span)
36+
self.processor.on_end(mock_span)
37+
38+
self.assertEqual(len(self.processor.queue), 1)
39+
self.assertIn(AWS_TRACE_FLAG_UNSAMPLED, mock_span.set_attribute.call_args_list[0][0][0])

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_udp_exporter.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
OTLPUdpMetricExporter,
1313
OTLPUdpSpanExporter,
1414
UdpExporter,
15+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
1516
)
1617
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
1718
from opentelemetry.sdk.trace.export import SpanExportResult
1819

20+
from build.python.amazon.opentelemetry.distro.otlp_udp_exporter import FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
21+
1922

2023
class TestUdpExporter(TestCase):
2124

@@ -102,9 +105,11 @@ def test_export(self, mock_udp_exporter, mock_encode_spans):
102105
mock_udp_exporter_instance = mock_udp_exporter.return_value
103106
mock_encoded_data = MagicMock()
104107
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
105-
exporter = OTLPUdpSpanExporter()
108+
exporter = OTLPUdpSpanExporter(sampled=False)
106109
result = exporter.export(MagicMock())
107-
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="T1")
110+
mock_udp_exporter_instance.send_data.assert_called_once_with(
111+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
112+
)
108113
self.assertEqual(result, SpanExportResult.SUCCESS)
109114

110115
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")

0 commit comments

Comments
 (0)