diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index db86703b7..188a25842 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -16,6 +16,7 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder +from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.sdk._configuration import ( @@ -62,6 +63,8 @@ APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT" METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL" DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0 +AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME" +AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS" _logger: Logger = getLogger(__name__) @@ -219,6 +222,8 @@ def _customize_sampler(sampler: Sampler) -> Sampler: def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter: if not _is_application_signals_enabled(): return span_exporter + if _is_lambda_environment(): + return AwsMetricAttributesSpanExporterBuilder(OTLPUdpSpanExporter(), resource).build() return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build() @@ -262,6 +267,11 @@ def _is_application_signals_enabled(): ) +def _is_lambda_environment(): + # detect if running in AWS Lambda environment + return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ + + class ApplicationSignalsExporterProvider: _instance: ClassVar["ApplicationSignalsExporterProvider"] = None @@ -289,6 +299,11 @@ def create_exporter(self): ]: temporality_dict[typ] = AggregationTemporality.DELTA + if _is_lambda_environment(): + # When running in Lambda, export Application Signals metrics over UDP + application_signals_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000") + return OTLPUdpMetricExporter(endpoint=application_signals_endpoint, preferred_temporality=temporality_dict) + if protocol == "http/protobuf": application_signals_endpoint = os.environ.get( APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG, diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_udp_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_udp_exporter.py new file mode 100644 index 000000000..0b7f8f8c0 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_udp_exporter.py @@ -0,0 +1,123 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import base64 +import socket +from logging import Logger, getLogger +from typing import Dict, Optional, Sequence, Tuple + +from typing_extensions import override + +from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics +from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans +from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality +from opentelemetry.sdk.metrics._internal.export import MetricExportResult +from opentelemetry.sdk.metrics._internal.point import MetricsData +from opentelemetry.sdk.metrics.export import MetricExporter +from opentelemetry.sdk.metrics.view import Aggregation +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + +DEFAULT_ENDPOINT = "127.0.0.1:2000" +PROTOCOL_HEADER = '{"format":"json","version":1}\n' +FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1" +FORMAT_OTEL_TRACES_BINARY_PREFIX = "T1" + +_logger: Logger = getLogger(__name__) + + +class UdpExporter: + def __init__(self, endpoint: Optional[str] = None): + self._endpoint = endpoint or DEFAULT_ENDPOINT + self._host, self._port = self._parse_endpoint(self._endpoint) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._socket.setblocking(False) + + def send_data(self, data: bytes, signal_format_prefix: str): + # base64 encoding and then converting to string with utf-8 + base64_encoded_string: str = base64.b64encode(data).decode("utf-8") + message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}" + + try: + _logger.debug("Sending UDP data: %s", message) + self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port))) + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error sending UDP data: %s", exc) + raise + + def shutdown(self): + self._socket.close() + + # pylint: disable=no-self-use + def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]: + try: + vals = endpoint.split(":") + host = vals[0] + port = int(vals[1]) + except Exception as exc: # pylint: disable=broad-except + raise ValueError(f"Invalid endpoint: {endpoint}") from exc + + return host, port + + +class OTLPUdpMetricExporter(MetricExporter): + def __init__( + self, + endpoint: Optional[str] = None, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, Aggregation] = None, + ): + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + self._udp_exporter = UdpExporter(endpoint=endpoint) + + @override + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + serialized_data = encode_metrics(metrics_data).SerializeToString() + + try: + self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_METRICS_BINARY_PREFIX) + return MetricExportResult.SUCCESS + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error exporting metrics: %s", exc) + return MetricExportResult.FAILURE + + # pylint: disable=no-self-use + def force_flush(self, timeout_millis: float = 10_000) -> bool: + # TODO: implement force flush + return True + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + self._udp_exporter.shutdown() + + +class OTLPUdpSpanExporter(SpanExporter): + def __init__(self, endpoint: Optional[str] = None): + self._udp_exporter = UdpExporter(endpoint=endpoint) + + @override + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + serialized_data = encode_spans(spans).SerializeToString() + + try: + self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_TRACES_BINARY_PREFIX) + return SpanExportResult.SUCCESS + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error exporting spans: %s", exc) + return SpanExportResult.FAILURE + + # pylint: disable=no-self-use + @override + def force_flush(self, timeout_millis: int = 30000) -> bool: + # TODO: implement force flush + return True + + @override + def shutdown(self) -> None: + self._udp_exporter.shutdown() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 62d1cc380..080ee7c7e 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -19,6 +19,7 @@ ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor +from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER @@ -26,6 +27,7 @@ from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPGrpcOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG +from opentelemetry.sdk.metrics._internal.export import MetricExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer, TracerProvider from opentelemetry.sdk.trace.export import SpanExporter @@ -254,6 +256,16 @@ def test_customize_exporter(self): self.assertEqual(mock_exporter, customized_exporter._delegate) os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) + # when Application Signals is enabled and running in lambda + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc") + customized_exporter = _customize_exporter(mock_exporter, Resource.get_empty()) + self.assertNotEqual(mock_exporter, customized_exporter) + self.assertIsInstance(customized_exporter, AwsMetricAttributesSpanExporter) + self.assertIsInstance(customized_exporter._delegate, OTLPUdpSpanExporter) + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + def test_customize_span_processors(self): mock_tracer_provider: TracerProvider = MagicMock() _customize_span_processors(mock_tracer_provider, Resource.get_empty()) @@ -286,6 +298,13 @@ def test_application_signals_exporter_provider(self): self.assertIsInstance(exporter, OTLPHttpOTLPMetricExporter) self.assertEqual("http://localhost:4316/v1/metrics", exporter._endpoint) + # When in Lambda, exporter should be UDP. + os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc") + exporter: MetricExporter = ApplicationSignalsExporterProvider().create_exporter() + self.assertIsInstance(exporter, OTLPUdpMetricExporter) + self.assertEqual("127.0.0.1:2000", exporter._udp_exporter._endpoint) + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + def validate_distro_environ(): tc: TestCase = TestCase() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_udp_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_udp_exporter.py new file mode 100644 index 000000000..a97e02321 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_udp_exporter.py @@ -0,0 +1,127 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import base64 +import socket +import unittest +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.distro.otlp_udp_exporter import ( + DEFAULT_ENDPOINT, + PROTOCOL_HEADER, + OTLPUdpMetricExporter, + OTLPUdpSpanExporter, + UdpExporter, +) +from opentelemetry.sdk.metrics._internal.export import MetricExportResult +from opentelemetry.sdk.trace.export import SpanExportResult + + +class TestUdpExporter(TestCase): + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket") + def test_udp_exporter_init_default(self, mock_socket): + exporter = UdpExporter() + self.assertEqual(exporter._endpoint, DEFAULT_ENDPOINT) + self.assertEqual(exporter._host, "127.0.0.1") + self.assertEqual(exporter._port, 2000) + mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM) + mock_socket().setblocking.assert_called_once_with(False) + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket") + def test_udp_exporter_init_with_endpoint(self, mock_socket): + exporter = UdpExporter(endpoint="localhost:5000") + self.assertNotEqual(exporter._endpoint, DEFAULT_ENDPOINT) + self.assertEqual(exporter._host, "localhost") + self.assertEqual(exporter._port, 5000) + mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM) + mock_socket().setblocking.assert_called_once_with(False) + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket") + def test_udp_exporter_init_invalid_endpoint(self, mock_socket): + with self.assertRaises(ValueError): + UdpExporter(endpoint="invalidEndpoint:port") + + # pylint: disable=no-self-use + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket") + def test_send_data(self, mock_socket): + mock_socket_instance = mock_socket.return_value + exporter = UdpExporter() + input_bytes: bytes = b"hello" + encoded_bytes: bytes = base64.b64encode(input_bytes) + exporter.send_data(input_bytes, "signal_prefix") + expected_message = PROTOCOL_HEADER + "signal_prefix" + encoded_bytes.decode("utf-8") + mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000)) + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket") + def test_shutdown(self, mock_socket): + mock_socket_instance = mock_socket.return_value + exporter = UdpExporter() + exporter.shutdown() + mock_socket_instance.close.assert_called_once() + + +class TestOTLPUdpMetricExporter(unittest.TestCase): + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_metrics") + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter") + def test_export(self, mock_udp_exporter, mock_encode_metrics): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data + exporter = OTLPUdpMetricExporter() + result = exporter.export(MagicMock()) + mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="M1") + self.assertEqual(result, MetricExportResult.SUCCESS) + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_metrics") + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter") + def test_export_with_exception(self, mock_udp_exporter, mock_encode_metrics): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data + mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong") + exporter = OTLPUdpMetricExporter() + result = exporter.export(MagicMock()) + self.assertEqual(result, MetricExportResult.FAILURE) + + # pylint: disable=no-self-use + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter") + def test_shutdown(self, mock_udp_exporter): + mock_udp_exporter_instance = mock_udp_exporter.return_value + exporter = OTLPUdpMetricExporter() + exporter.shutdown() + mock_udp_exporter_instance.shutdown.assert_called_once() + + +class TestOTLPUdpSpanExporter(unittest.TestCase): + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans") + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter") + def test_export(self, mock_udp_exporter, mock_encode_spans): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data + exporter = OTLPUdpSpanExporter() + result = exporter.export(MagicMock()) + mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="T1") + self.assertEqual(result, SpanExportResult.SUCCESS) + + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans") + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter") + def test_export_with_exception(self, mock_udp_exporter, mock_encode_spans): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data + mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong") + exporter = OTLPUdpSpanExporter() + result = exporter.export(MagicMock()) + self.assertEqual(result, SpanExportResult.FAILURE) + + # pylint: disable=no-self-use + @patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter") + def test_shutdown(self, mock_udp_exporter): + mock_udp_exporter_instance = mock_udp_exporter.return_value + exporter = OTLPUdpSpanExporter() + exporter.shutdown() + mock_udp_exporter_instance.shutdown.assert_called_once()