Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AWS_REMOTE_RESOURCE_IDENTIFIER: str = "aws.remote.resource.identifier"
AWS_SDK_DESCENDANT: str = "aws.sdk.descendant"
AWS_CONSUMER_PARENT_SPAN_KIND: str = "aws.consumer.parent.span.kind"
AWS_TRACE_FLAG_UNSAMPLED: str = "aws.trace.flag.unsampled"

# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
# TODO:Move to Semantic Conventions when these attributes are added.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Optional

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_TRACE_FLAG_UNSAMPLED
from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor as BaseBatchSpanProcessor

logger = logging.getLogger(__name__)

SPANUNSAMPLED_FLAG = "OTEL_AWS_APP_SIGNALS_ENABLED"


class BatchUnsampledSpanProcessor(BaseBatchSpanProcessor):

# pylint: disable=no-self-use
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
if not span.context.trace_flags.sampled:
span.set_attribute(AWS_TRACE_FLAG_UNSAMPLED, True)

def on_end(self, span: ReadableSpan) -> None:
if span.context.trace_flags.sampled:
return

if self.done:
logger.warning("Already shutdown, dropping span.")
return

if len(self.queue) == self.max_queue_size:
# pylint: disable=access-member-before-definition
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
# pylint: disable=attribute-defined-outside-init
self._spans_dropped = True

self.queue.appendleft(span)

if len(self.queue) >= self.max_export_batch_size:
with self.condition:
self.condition.notify()
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
)
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
from amazon.opentelemetry.distro.aws_metric_attributes_span_exporter_builder import (
AwsMetricAttributesSpanExporterBuilder,
)
Expand Down Expand Up @@ -162,6 +163,21 @@ def _init_tracing(
# END The OpenTelemetry Authors code


def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource: Resource = None):
if not _is_application_signals_enabled():
return
if not _is_lambda_environment():
return

traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")

span_exporter = AwsMetricAttributesSpanExporterBuilder(
OTLPUdpSpanExporter(endpoint=traces_endpoint, sampled=False), resource
).build()

trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter))


def _is_defer_to_workers_enabled():
return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true"

Expand Down Expand Up @@ -259,6 +275,8 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
if not _is_application_signals_enabled():
return

_export_unsampled_span_for_lambda(provider, resource)

# Construct and set local and remote attributes span processor
provider.add_span_processor(AttributePropagatingSpanProcessorBuilder().build())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
DEFAULT_ENDPOINT = "127.0.0.1:2000"
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
FORMAT_OTEL_TRACES_BINARY_PREFIX = "T1"

# TODO: update sampled and unsampled prefix later
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1"
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1"

_logger: Logger = getLogger(__name__)

Expand Down Expand Up @@ -98,15 +101,21 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:


class OTLPUdpSpanExporter(SpanExporter):
def __init__(self, endpoint: Optional[str] = None):
def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):
self._udp_exporter = UdpExporter(endpoint=endpoint)
self._sampled = sampled

@override
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
serialized_data = encode_spans(spans).SerializeToString()

try:
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_TRACES_BINARY_PREFIX)
prefix = (
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
if self._sampled
else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
)
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)
return SpanExportResult.SUCCESS
except Exception as exc: # pylint: disable=broad-except
_logger.error("Error exporting spans: %s", exc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from unittest import TestCase
from unittest.mock import MagicMock, patch

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_TRACE_FLAG_UNSAMPLED
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
from opentelemetry.trace import TraceFlags


class TestBatchUnsampledSpanProcessor(TestCase):

def setUp(self):
self.mock_exporter = MagicMock()
self.processor = BatchUnsampledSpanProcessor(self.mock_exporter, max_queue_size=1, max_export_batch_size=1)

@patch("opentelemetry.sdk.trace.Span")
def test_on_end_sampled(self, mock_span_class):
trace_flags = TraceFlags(TraceFlags.SAMPLED)

mock_span = mock_span_class.return_value
mock_span.context.trace_flags = trace_flags

self.processor.on_start(mock_span)
self.processor.on_end(mock_span)

self.assertEqual(len(self.processor.queue), 0)
mock_span.set_attribute.assert_not_called()

@patch("opentelemetry.sdk.trace.Span")
def test_on_end_not_sampled(self, mock_span_class):

trace_flags = TraceFlags(0)
mock_span1 = mock_span_class.return_value
mock_span1.context.trace_flags = trace_flags

self.processor.on_start(mock_span1)
self.processor.on_end(mock_span1)

mock_span2 = mock_span_class.return_value
mock_span2.context.trace_flags = trace_flags
self.processor.on_start(mock_span2)
self.processor.on_end(mock_span2)

self.assertEqual(len(self.processor.queue), 1)
self.assertIn(AWS_TRACE_FLAG_UNSAMPLED, mock_span1.set_attribute.call_args_list[0][0][0])

self.processor.shutdown()
mock_span2 = mock_span_class.return_value
mock_span2.context.trace_flags = trace_flags
self.processor.on_start(mock_span2)
self.processor.on_end(mock_span2)

self.assertEqual(len(self.processor.queue), 0)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
from amazon.opentelemetry.distro.aws_metric_attributes_span_exporter import AwsMetricAttributesSpanExporter
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import (
ApplicationSignalsExporterProvider,
Expand All @@ -15,6 +16,7 @@
_customize_exporter,
_customize_sampler,
_customize_span_processors,
_export_unsampled_span_for_lambda,
_is_application_signals_enabled,
_is_defer_to_workers_enabled,
_is_wsgi_master_process,
Expand All @@ -37,6 +39,7 @@
from opentelemetry.trace import get_tracer_provider


# pylint: disable=too-many-public-methods
class TestAwsOpenTelemetryConfigurator(TestCase):
"""Tests AwsOpenTelemetryConfigurator and AwsOpenTelemetryDistro

Expand Down Expand Up @@ -351,6 +354,19 @@ def test_initialize_components_called_when_deferred_disabled(self, mock_initiali
mock_initialize_components.assert_called_once()
os.environ.pop("IS_WSGI_MASTER_PROCESS_ALREADY_SEEN", None)

def test_export_unsampled_span_for_lambda(self):
mock_tracer_provider: TracerProvider = MagicMock()
_export_unsampled_span_for_lambda(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myfunction")
_export_unsampled_span_for_lambda(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
first_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(first_processor, BatchUnsampledSpanProcessor)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)


def validate_distro_environ():
tc: TestCase = TestCase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from amazon.opentelemetry.distro.otlp_udp_exporter import (
DEFAULT_ENDPOINT,
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX,
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
PROTOCOL_HEADER,
OTLPUdpMetricExporter,
OTLPUdpSpanExporter,
Expand Down Expand Up @@ -90,6 +92,7 @@ def test_export_with_exception(self, mock_udp_exporter, mock_encode_metrics):
def test_shutdown(self, mock_udp_exporter):
mock_udp_exporter_instance = mock_udp_exporter.return_value
exporter = OTLPUdpMetricExporter()
exporter.force_flush()
exporter.shutdown()
mock_udp_exporter_instance.shutdown.assert_called_once()

Expand All @@ -98,13 +101,28 @@ class TestOTLPUdpSpanExporter(unittest.TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export(self, mock_udp_exporter, mock_encode_spans):
def test_export_unsampled_span(self, mock_udp_exporter, mock_encode_spans):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
exporter = OTLPUdpSpanExporter(sampled=False)
result = exporter.export(MagicMock())
mock_udp_exporter_instance.send_data.assert_called_once_with(
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
)
self.assertEqual(result, SpanExportResult.SUCCESS)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export_sampled_span(self, mock_udp_exporter, mock_encode_spans):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
exporter = OTLPUdpSpanExporter()
result = exporter.export(MagicMock())
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="T1")
mock_udp_exporter_instance.send_data.assert_called_once_with(
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
)
self.assertEqual(result, SpanExportResult.SUCCESS)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
Expand All @@ -124,4 +142,5 @@ def test_shutdown(self, mock_udp_exporter):
mock_udp_exporter_instance = mock_udp_exporter.return_value
exporter = OTLPUdpSpanExporter()
exporter.shutdown()
exporter.force_flush()
mock_udp_exporter_instance.shutdown.assert_called_once()
Loading