Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.sdk._configuration import (
Expand Down Expand Up @@ -62,6 +63,8 @@
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT"
METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL"
DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"

_logger: Logger = getLogger(__name__)

Expand Down Expand Up @@ -219,6 +222,8 @@ def _customize_sampler(sampler: Sampler) -> Sampler:
def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
if not _is_application_signals_enabled():
return span_exporter
if _is_lambda_environment():
return AwsMetricAttributesSpanExporterBuilder(OTLPUdpSpanExporter(), resource).build()
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


Expand Down Expand Up @@ -262,6 +267,11 @@ def _is_application_signals_enabled():
)


def _is_lambda_environment():
# detect if running in AWS Lambda environment
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


class ApplicationSignalsExporterProvider:
_instance: ClassVar["ApplicationSignalsExporterProvider"] = None

Expand Down Expand Up @@ -289,6 +299,11 @@ def create_exporter(self):
]:
temporality_dict[typ] = AggregationTemporality.DELTA

if _is_lambda_environment():
# When running in Lambda, export Application Signals metrics over UDP
application_signals_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
return OTLPUdpMetricExporter(endpoint=application_signals_endpoint, preferred_temporality=temporality_dict)

if protocol == "http/protobuf":
application_signals_endpoint = os.environ.get(
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
import socket
from logging import Logger, getLogger
from typing import Dict, Optional, Sequence, Tuple

from typing_extensions import override

from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
from opentelemetry.sdk.metrics._internal.point import MetricsData
from opentelemetry.sdk.metrics.export import MetricExporter
from opentelemetry.sdk.metrics.view import Aggregation
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

DEFAULT_ENDPOINT = "127.0.0.1:2000"
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
FORMAT_OTEL_TRACES_BINARY_PREFIX = "T1"

_logger: Logger = getLogger(__name__)


class UdpExporter:
def __init__(self, endpoint: Optional[str] = None):
self._endpoint = endpoint or DEFAULT_ENDPOINT
self._host, self._port = self._parse_endpoint(self._endpoint)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setblocking(False)

def send_data(self, data: bytes, signal_format_prefix: str):
# base64 encoding and then converting to string with utf-8
base64_encoded_string: str = base64.b64encode(data).decode("utf-8")
message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}"

try:
_logger.debug("Sending UDP data: %s", message)
self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))
except Exception as exc: # pylint: disable=broad-except
_logger.error("Error sending UDP data: %s", exc)
raise

def shutdown(self):
self._socket.close()

# pylint: disable=no-self-use
def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:
try:
vals = endpoint.split(":")
host = vals[0]
port = int(vals[1])
except Exception as exc: # pylint: disable=broad-except
raise ValueError(f"Invalid endpoint: {endpoint}") from exc

return host, port


class OTLPUdpMetricExporter(MetricExporter):
def __init__(
self,
endpoint: Optional[str] = None,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, Aggregation] = None,
):
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self._udp_exporter = UdpExporter(endpoint=endpoint)

@override
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
serialized_data = encode_metrics(metrics_data).SerializeToString()

try:
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_METRICS_BINARY_PREFIX)
return MetricExportResult.SUCCESS
except Exception as exc: # pylint: disable=broad-except
_logger.error("Error exporting metrics: %s", exc)
return MetricExportResult.FAILURE

# pylint: disable=no-self-use
def force_flush(self, timeout_millis: float = 10_000) -> bool:
# TODO: implement force flush
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._udp_exporter.shutdown()


class OTLPUdpSpanExporter(SpanExporter):
def __init__(self, endpoint: Optional[str] = None):
self._udp_exporter = UdpExporter(endpoint=endpoint)

@override
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
serialized_data = encode_spans(spans).SerializeToString()

try:
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_TRACES_BINARY_PREFIX)
return SpanExportResult.SUCCESS
except Exception as exc: # pylint: disable=broad-except
_logger.error("Error exporting spans: %s", exc)
return SpanExportResult.FAILURE

# pylint: disable=no-self-use
@override
def force_flush(self, timeout_millis: int = 30000) -> bool:
# TODO: implement force flush
return True

@override
def shutdown(self) -> None:
self._udp_exporter.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
)
from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import OTLPMetricExporterMixin
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPGrpcOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG
from opentelemetry.sdk.metrics._internal.export import MetricExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
Expand Down Expand Up @@ -254,6 +256,16 @@ def test_customize_exporter(self):
self.assertEqual(mock_exporter, customized_exporter._delegate)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)

# when Application Signals is enabled and running in lambda
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
customized_exporter = _customize_exporter(mock_exporter, Resource.get_empty())
self.assertNotEqual(mock_exporter, customized_exporter)
self.assertIsInstance(customized_exporter, AwsMetricAttributesSpanExporter)
self.assertIsInstance(customized_exporter._delegate, OTLPUdpSpanExporter)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

def test_customize_span_processors(self):
mock_tracer_provider: TracerProvider = MagicMock()
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
Expand Down Expand Up @@ -286,6 +298,13 @@ def test_application_signals_exporter_provider(self):
self.assertIsInstance(exporter, OTLPHttpOTLPMetricExporter)
self.assertEqual("http://localhost:4316/v1/metrics", exporter._endpoint)

# When in Lambda, exporter should be UDP.
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
exporter: MetricExporter = ApplicationSignalsExporterProvider().create_exporter()
self.assertIsInstance(exporter, OTLPUdpMetricExporter)
self.assertEqual("127.0.0.1:2000", exporter._udp_exporter._endpoint)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)


def validate_distro_environ():
tc: TestCase = TestCase()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
import socket
import unittest
from unittest import TestCase
from unittest.mock import MagicMock, patch

from amazon.opentelemetry.distro.otlp_udp_exporter import (
DEFAULT_ENDPOINT,
PROTOCOL_HEADER,
OTLPUdpMetricExporter,
OTLPUdpSpanExporter,
UdpExporter,
)
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
from opentelemetry.sdk.trace.export import SpanExportResult


class TestUdpExporter(TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_udp_exporter_init_default(self, mock_socket):
exporter = UdpExporter()
self.assertEqual(exporter._endpoint, DEFAULT_ENDPOINT)
self.assertEqual(exporter._host, "127.0.0.1")
self.assertEqual(exporter._port, 2000)
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
mock_socket().setblocking.assert_called_once_with(False)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_udp_exporter_init_with_endpoint(self, mock_socket):
exporter = UdpExporter(endpoint="localhost:5000")
self.assertNotEqual(exporter._endpoint, DEFAULT_ENDPOINT)
self.assertEqual(exporter._host, "localhost")
self.assertEqual(exporter._port, 5000)
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
mock_socket().setblocking.assert_called_once_with(False)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_udp_exporter_init_invalid_endpoint(self, mock_socket):
with self.assertRaises(ValueError):
UdpExporter(endpoint="invalidEndpoint:port")

# pylint: disable=no-self-use
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_send_data(self, mock_socket):
mock_socket_instance = mock_socket.return_value
exporter = UdpExporter()
input_bytes: bytes = b"hello"
encoded_bytes: bytes = base64.b64encode(input_bytes)
exporter.send_data(input_bytes, "signal_prefix")
expected_message = PROTOCOL_HEADER + "signal_prefix" + encoded_bytes.decode("utf-8")
mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000))

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_shutdown(self, mock_socket):
mock_socket_instance = mock_socket.return_value
exporter = UdpExporter()
exporter.shutdown()
mock_socket_instance.close.assert_called_once()


class TestOTLPUdpMetricExporter(unittest.TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_metrics")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export(self, mock_udp_exporter, mock_encode_metrics):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data
exporter = OTLPUdpMetricExporter()
result = exporter.export(MagicMock())
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="M1")
self.assertEqual(result, MetricExportResult.SUCCESS)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_metrics")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export_with_exception(self, mock_udp_exporter, mock_encode_metrics):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data
mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong")
exporter = OTLPUdpMetricExporter()
result = exporter.export(MagicMock())
self.assertEqual(result, MetricExportResult.FAILURE)

# pylint: disable=no-self-use
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_shutdown(self, mock_udp_exporter):
mock_udp_exporter_instance = mock_udp_exporter.return_value
exporter = OTLPUdpMetricExporter()
exporter.shutdown()
mock_udp_exporter_instance.shutdown.assert_called_once()


class TestOTLPUdpSpanExporter(unittest.TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export(self, mock_udp_exporter, mock_encode_spans):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
exporter = OTLPUdpSpanExporter()
result = exporter.export(MagicMock())
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="T1")
self.assertEqual(result, SpanExportResult.SUCCESS)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export_with_exception(self, mock_udp_exporter, mock_encode_spans):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong")
exporter = OTLPUdpSpanExporter()
result = exporter.export(MagicMock())
self.assertEqual(result, SpanExportResult.FAILURE)

# pylint: disable=no-self-use
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_shutdown(self, mock_udp_exporter):
mock_udp_exporter_instance = mock_udp_exporter.return_value
exporter = OTLPUdpSpanExporter()
exporter.shutdown()
mock_udp_exporter_instance.shutdown.assert_called_once()
Loading