| 
 | 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.  | 
 | 2 | +# SPDX-License-Identifier: Apache-2.0  | 
 | 3 | +import base64  | 
 | 4 | +import socket  | 
 | 5 | +from logging import Logger, getLogger  | 
 | 6 | +from typing import Dict, Optional, Sequence, Tuple  | 
 | 7 | + | 
 | 8 | +from typing_extensions import override  | 
 | 9 | + | 
 | 10 | +from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics  | 
 | 11 | +from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans  | 
 | 12 | +from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality  | 
 | 13 | +from opentelemetry.sdk.metrics._internal.export import MetricExportResult  | 
 | 14 | +from opentelemetry.sdk.metrics._internal.point import MetricsData  | 
 | 15 | +from opentelemetry.sdk.metrics.export import MetricExporter  | 
 | 16 | +from opentelemetry.sdk.metrics.view import Aggregation  | 
 | 17 | +from opentelemetry.sdk.trace import ReadableSpan  | 
 | 18 | +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult  | 
 | 19 | + | 
 | 20 | +DEFAULT_ENDPOINT = "127.0.0.1:2000"  | 
 | 21 | +PROTOCOL_HEADER = '{"format":"json","version":1}\n'  | 
 | 22 | +FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"  | 
 | 23 | + | 
 | 24 | +FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S"  | 
 | 25 | +FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U"  | 
 | 26 | + | 
 | 27 | +_logger: Logger = getLogger(__name__)  | 
 | 28 | + | 
 | 29 | + | 
 | 30 | +class UdpExporter:  | 
 | 31 | +    def __init__(self, endpoint: Optional[str] = None):  | 
 | 32 | +        self._endpoint = endpoint or DEFAULT_ENDPOINT  | 
 | 33 | +        self._host, self._port = self._parse_endpoint(self._endpoint)  | 
 | 34 | +        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  | 
 | 35 | +        self._socket.setblocking(False)  | 
 | 36 | + | 
 | 37 | +    def send_data(self, data: bytes, signal_format_prefix: str):  | 
 | 38 | +        # base64 encoding and then converting to string with utf-8  | 
 | 39 | +        base64_encoded_string: str = base64.b64encode(data).decode("utf-8")  | 
 | 40 | +        message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}"  | 
 | 41 | + | 
 | 42 | +        try:  | 
 | 43 | +            _logger.debug("Sending UDP data: %s", message)  | 
 | 44 | +            self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))  | 
 | 45 | +        except Exception as exc:  # pylint: disable=broad-except  | 
 | 46 | +            _logger.error("Error sending UDP data: %s", exc)  | 
 | 47 | +            raise  | 
 | 48 | + | 
 | 49 | +    def shutdown(self):  | 
 | 50 | +        self._socket.close()  | 
 | 51 | + | 
 | 52 | +    # pylint: disable=no-self-use  | 
 | 53 | +    def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:  | 
 | 54 | +        try:  | 
 | 55 | +            vals = endpoint.split(":")  | 
 | 56 | +            host = vals[0]  | 
 | 57 | +            port = int(vals[1])  | 
 | 58 | +        except Exception as exc:  # pylint: disable=broad-except  | 
 | 59 | +            raise ValueError(f"Invalid endpoint: {endpoint}") from exc  | 
 | 60 | + | 
 | 61 | +        return host, port  | 
 | 62 | + | 
 | 63 | + | 
 | 64 | +class OTLPUdpMetricExporter(MetricExporter):  | 
 | 65 | +    def __init__(  | 
 | 66 | +        self,  | 
 | 67 | +        endpoint: Optional[str] = None,  | 
 | 68 | +        preferred_temporality: Dict[type, AggregationTemporality] = None,  | 
 | 69 | +        preferred_aggregation: Dict[type, Aggregation] = None,  | 
 | 70 | +    ):  | 
 | 71 | +        super().__init__(  | 
 | 72 | +            preferred_temporality=preferred_temporality,  | 
 | 73 | +            preferred_aggregation=preferred_aggregation,  | 
 | 74 | +        )  | 
 | 75 | +        self._udp_exporter = UdpExporter(endpoint=endpoint)  | 
 | 76 | + | 
 | 77 | +    @override  | 
 | 78 | +    def export(  | 
 | 79 | +        self,  | 
 | 80 | +        metrics_data: MetricsData,  | 
 | 81 | +        timeout_millis: float = 10_000,  | 
 | 82 | +        **kwargs,  | 
 | 83 | +    ) -> MetricExportResult:  | 
 | 84 | +        serialized_data = encode_metrics(metrics_data).SerializeToString()  | 
 | 85 | + | 
 | 86 | +        try:  | 
 | 87 | +            self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_METRICS_BINARY_PREFIX)  | 
 | 88 | +            return MetricExportResult.SUCCESS  | 
 | 89 | +        except Exception as exc:  # pylint: disable=broad-except  | 
 | 90 | +            _logger.error("Error exporting metrics: %s", exc)  | 
 | 91 | +            return MetricExportResult.FAILURE  | 
 | 92 | + | 
 | 93 | +    # pylint: disable=no-self-use  | 
 | 94 | +    def force_flush(self, timeout_millis: float = 10_000) -> bool:  | 
 | 95 | +        # TODO: implement force flush  | 
 | 96 | +        return True  | 
 | 97 | + | 
 | 98 | +    def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:  | 
 | 99 | +        self._udp_exporter.shutdown()  | 
 | 100 | + | 
 | 101 | + | 
 | 102 | +class OTLPUdpSpanExporter(SpanExporter):  | 
 | 103 | +    def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):  | 
 | 104 | +        self._udp_exporter = UdpExporter(endpoint=endpoint)  | 
 | 105 | +        self._sampled = sampled  | 
 | 106 | + | 
 | 107 | +    @override  | 
 | 108 | +    def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:  | 
 | 109 | +        serialized_data = encode_spans(spans).SerializeToString()  | 
 | 110 | + | 
 | 111 | +        try:  | 
 | 112 | +            prefix = (  | 
 | 113 | +                FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX  | 
 | 114 | +                if self._sampled  | 
 | 115 | +                else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX  | 
 | 116 | +            )  | 
 | 117 | +            self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)  | 
 | 118 | +            return SpanExportResult.SUCCESS  | 
 | 119 | +        except Exception as exc:  # pylint: disable=broad-except  | 
 | 120 | +            _logger.error("Error exporting spans: %s", exc)  | 
 | 121 | +            return SpanExportResult.FAILURE  | 
 | 122 | + | 
 | 123 | +    # pylint: disable=no-self-use  | 
 | 124 | +    @override  | 
 | 125 | +    def force_flush(self, timeout_millis: int = 30000) -> bool:  | 
 | 126 | +        # TODO: implement force flush  | 
 | 127 | +        return True  | 
 | 128 | + | 
 | 129 | +    @override  | 
 | 130 | +    def shutdown(self) -> None:  | 
 | 131 | +        self._udp_exporter.shutdown()  | 
 | 132 | + | 
0 commit comments