diff --git a/exporters/aws-otel-otlp-udp-exporter/README.rst b/exporters/aws-otel-otlp-udp-exporter/README.rst new file mode 100644 index 000000000..6d5e3d723 --- /dev/null +++ b/exporters/aws-otel-otlp-udp-exporter/README.rst @@ -0,0 +1,17 @@ +AWS OpenTelemetry OTLP UDP Exporter +=================================== + +Installation +------------ + +:: + + pip install aws-otel-otlp-udp-exporter + + +This package provides a UDP exporter for OpenTelemetry. + +References +---------- + +* `OpenTelemetry Project `_ diff --git a/exporters/aws-otel-otlp-udp-exporter/pyproject.toml b/exporters/aws-otel-otlp-udp-exporter/pyproject.toml new file mode 100644 index 000000000..bff54cbf7 --- /dev/null +++ b/exporters/aws-otel-otlp-udp-exporter/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "aws-otel-otlp-udp-exporter" +version = "0.1.0" +description = "OTLP UDP Exporter for OpenTelemetry" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.8" +authors = [ + { name = "Amazon Web Services" } +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", +] + +dependencies = [ + "opentelemetry-sdk == 1.27.0", + "opentelemetry-exporter-otlp-proto-common == 1.27.0", +] + +[project.urls] +Homepage = "https://github.com/aws-observability/aws-otel-python-instrumentation/tree/main/exporters/aws-otel-otlp-udp-exporter" + +[tool.hatch.build.targets.wheel] +packages = ["src/amazon"] diff --git a/exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/udp/__init__.py b/exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/udp/__init__.py new file mode 100644 index 000000000..1872b41ec --- /dev/null +++ b/exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/udp/__init__.py @@ -0,0 +1,20 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +from .exporter import ( + DEFAULT_ENDPOINT, + FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX, + FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX, + PROTOCOL_HEADER, + OTLPUdpSpanExporter, + UdpExporter, +) + +__all__ = [ + "UdpExporter", + "OTLPUdpSpanExporter", + "DEFAULT_ENDPOINT", + "FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX", + "FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX", + "PROTOCOL_HEADER", +] diff --git a/exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/udp/exporter.py b/exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/udp/exporter.py new file mode 100644 index 000000000..095d40904 --- /dev/null +++ b/exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/udp/exporter.py @@ -0,0 +1,91 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import base64 +import os +import socket +from logging import Logger, getLogger +from typing import Optional, Sequence, Tuple + +from typing_extensions import override + +from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + +DEFAULT_ENDPOINT = "127.0.0.1:2000" +PROTOCOL_HEADER = '{"format":"json","version":1}\n' + +FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S" +FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U" + +_logger: Logger = getLogger(__name__) + + +class UdpExporter: + def __init__(self, endpoint: Optional[str] = None): + if endpoint is None and "AWS_LAMBDA_FUNCTION_NAME" in os.environ: + # If in an AWS Lambda Environment, `AWS_XRAY_DAEMON_ADDRESS` will be defined + endpoint = os.environ.get("AWS_XRAY_DAEMON_ADDRESS", DEFAULT_ENDPOINT) + + self._endpoint = endpoint or DEFAULT_ENDPOINT + self._host, self._port = self._parse_endpoint(self._endpoint) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._socket.setblocking(False) + + def send_data(self, data: bytes, signal_format_prefix: str): + # base64 encoding and then converting to string with utf-8 + base64_encoded_string: str = base64.b64encode(data).decode("utf-8") + message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}" + + try: + _logger.debug("Sending UDP data: %s", message) + self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port))) + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error sending UDP data: %s", exc) + raise + + def shutdown(self): + self._socket.close() + + # pylint: disable=no-self-use + def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]: + try: + vals = endpoint.split(":") + host = vals[0] + port = int(vals[1]) + except Exception as exc: # pylint: disable=broad-except + raise ValueError(f"Invalid endpoint: {endpoint}") from exc + + return host, port + + +class OTLPUdpSpanExporter(SpanExporter): + def __init__(self, endpoint: Optional[str] = None, sampled: bool = True): + self._udp_exporter = UdpExporter(endpoint=endpoint) + self._sampled = sampled + + @override + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + serialized_data = encode_spans(spans).SerializeToString() + + try: + prefix = ( + FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX + if self._sampled + else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX + ) + self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix) + return SpanExportResult.SUCCESS + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error exporting spans: %s", exc) + return SpanExportResult.FAILURE + + # pylint: disable=no-self-use + @override + def force_flush(self, timeout_millis: int = 30000) -> bool: + # TODO: implement force flush + return True + + @override + def shutdown(self) -> None: + self._udp_exporter.shutdown() diff --git a/exporters/aws-otel-otlp-udp-exporter/tests/test_exporter.py b/exporters/aws-otel-otlp-udp-exporter/tests/test_exporter.py new file mode 100644 index 000000000..15514e14a --- /dev/null +++ b/exporters/aws-otel-otlp-udp-exporter/tests/test_exporter.py @@ -0,0 +1,110 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import base64 +import socket +import unittest +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.exporters.otlp.udp import ( + DEFAULT_ENDPOINT, + FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX, + FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX, + PROTOCOL_HEADER, + OTLPUdpSpanExporter, + UdpExporter, +) +from opentelemetry.sdk.trace.export import SpanExportResult + + +class TestUdpExporter(TestCase): + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket") + def test_udp_exporter_init_default(self, mock_socket): + exporter = UdpExporter() + self.assertEqual(exporter._endpoint, DEFAULT_ENDPOINT) + self.assertEqual(exporter._host, "127.0.0.1") + self.assertEqual(exporter._port, 2000) + mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM) + mock_socket().setblocking.assert_called_once_with(False) + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket") + def test_udp_exporter_init_with_endpoint(self, mock_socket): + exporter = UdpExporter(endpoint="localhost:5000") + self.assertNotEqual(exporter._endpoint, DEFAULT_ENDPOINT) + self.assertEqual(exporter._host, "localhost") + self.assertEqual(exporter._port, 5000) + mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM) + mock_socket().setblocking.assert_called_once_with(False) + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket") + def test_udp_exporter_init_invalid_endpoint(self, mock_socket): + with self.assertRaises(ValueError): + UdpExporter(endpoint="invalidEndpoint:port") + + # pylint: disable=no-self-use + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket") + def test_send_data(self, mock_socket): + mock_socket_instance = mock_socket.return_value + exporter = UdpExporter() + input_bytes: bytes = b"hello" + encoded_bytes: bytes = base64.b64encode(input_bytes) + exporter.send_data(input_bytes, "signal_prefix") + expected_message = PROTOCOL_HEADER + "signal_prefix" + encoded_bytes.decode("utf-8") + mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000)) + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket") + def test_shutdown(self, mock_socket): + mock_socket_instance = mock_socket.return_value + exporter = UdpExporter() + exporter.shutdown() + mock_socket_instance.close.assert_called_once() + + +class TestOTLPUdpSpanExporter(unittest.TestCase): + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans") + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter") + def test_export_unsampled_span(self, mock_udp_exporter, mock_encode_spans): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data + exporter = OTLPUdpSpanExporter(sampled=False) + result = exporter.export(MagicMock()) + mock_udp_exporter_instance.send_data.assert_called_once_with( + data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX + ) + self.assertEqual(result, SpanExportResult.SUCCESS) + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans") + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter") + def test_export_sampled_span(self, mock_udp_exporter, mock_encode_spans): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data + exporter = OTLPUdpSpanExporter() + result = exporter.export(MagicMock()) + mock_udp_exporter_instance.send_data.assert_called_once_with( + data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX + ) + self.assertEqual(result, SpanExportResult.SUCCESS) + + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans") + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter") + def test_export_with_exception(self, mock_udp_exporter, mock_encode_spans): + mock_udp_exporter_instance = mock_udp_exporter.return_value + mock_encoded_data = MagicMock() + mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data + mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong") + exporter = OTLPUdpSpanExporter() + result = exporter.export(MagicMock()) + self.assertEqual(result, SpanExportResult.FAILURE) + + # pylint: disable=no-self-use + @patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter") + def test_shutdown(self, mock_udp_exporter): + mock_udp_exporter_instance = mock_udp_exporter.return_value + exporter = OTLPUdpSpanExporter() + exporter.shutdown() + exporter.force_flush() + mock_udp_exporter_instance.shutdown.assert_called_once()