1
- from typing import Optional , Sequence , Dict
1
+ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
+ # SPDX-License-Identifier: Apache-2.0
3
+ import socket
4
+ from typing import Dict , Optional , Sequence
5
+
6
+ from typing_extensions import override
2
7
8
+ from opentelemetry .exporter .otlp .proto .common .metrics_encoder import encode_metrics
9
+ from opentelemetry .exporter .otlp .proto .common .trace_encoder import encode_spans
3
10
from opentelemetry .sdk .metrics ._internal .aggregation import AggregationTemporality
11
+ from opentelemetry .sdk .metrics ._internal .export import MetricExportResult
12
+ from opentelemetry .sdk .metrics ._internal .point import MetricsData
13
+ from opentelemetry .sdk .metrics .export import MetricExporter
4
14
from opentelemetry .sdk .metrics .view import Aggregation
5
15
from opentelemetry .sdk .trace import ReadableSpan
6
16
from opentelemetry .sdk .trace .export import SpanExporter , SpanExportResult
7
- from opentelemetry .sdk .metrics .export import MetricExporter
8
- from typing_extensions import override
9
- from opentelemetry .sdk .metrics ._internal .point import MetricsData
10
- from opentelemetry .sdk .metrics ._internal .export import MetricExportResult
11
- from opentelemetry .exporter .otlp .proto .common .metrics_encoder import (
12
- encode_metrics ,
13
- )
14
- from opentelemetry .exporter .otlp .proto .common .trace_encoder import (
15
- encode_spans ,
16
- )
17
17
18
- import socket
18
+ from logging import getLogger , Logger
19
19
20
20
DEFAULT_ENDPOINT = "127.0.0.1:2000"
21
21
PROTOCOL_HEADER = '{"format":"json","version":1}\n '
22
- PROTOCOL_DELIMITER = ' \n '
22
+ PROTOCOL_DELIMITER = " \n "
23
23
24
+ _logger : Logger = getLogger (__name__ )
24
25
25
26
class UdpExporter :
26
- def __init__ (
27
- self ,
28
- endpoint : Optional [str ] = None
29
- ):
30
- self ._endpoint = endpoint or DEFAULT_ENDPOINT
27
+ def __init__ (self , endpoint : Optional [str ] = None ):
28
+ self ._endpoint = endpoint or DEFAULT_ENDPOINT # TODO: read from some env var??
31
29
self ._host , self ._port = self ._parse_endpoint (self ._endpoint )
32
30
self ._socket = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
33
31
self ._socket .setblocking (False )
34
32
35
- def send_data (self , data : str , format : str ):
36
- udp_data = f'{{"format":"{ format } ","data":{ data } }}'
33
+ def send_data (self , data : str , signal_format : str ):
34
+ udp_data = f'{{"format":"{ signal_format } ","data":{ data } }}'
37
35
message = PROTOCOL_HEADER + udp_data
38
36
39
37
try :
40
38
print ("Sending UDP data: " , message ) # TODO: remove
41
- self ._socket .sendto (message .encode (' utf-8' ), (self ._host , int (self ._port )))
42
- except Exception as e :
43
- print ("Error sending UDP data: " , e )
39
+ self ._socket .sendto (message .encode (" utf-8" ), (self ._host , int (self ._port )))
40
+ except Exception as exc : # pylint: disable=broad-except
41
+ _logger . error ("Error sending UDP data: %s " , exc )
44
42
45
43
def shutdown (self ):
46
44
self ._socket .close ()
47
45
46
+ # pylint: disable=no-self-use
48
47
def _parse_endpoint (self , endpoint : str ) -> tuple [str , int ]:
49
48
try :
50
49
vals = endpoint .split (":" )
51
50
host = vals [0 ]
52
51
port = int (vals [1 ])
53
- except Exception as e :
54
- raise ValueError (f"Invalid endpoint: { endpoint } " ) from e
52
+ except Exception as exc : # pylint: disable=broad-except
53
+ raise ValueError (f"Invalid endpoint: { endpoint } " ) from exc
55
54
56
55
return host , port
57
56
58
57
59
58
class OtlpUdpMetricExporter (MetricExporter ):
60
59
def __init__ (
61
- self ,
62
- endpoint : Optional [str ] = None ,
63
- preferred_temporality : Dict [type , AggregationTemporality ] = None ,
64
- preferred_aggregation : Dict [type , Aggregation ] = None
60
+ self ,
61
+ endpoint : Optional [str ] = None ,
62
+ preferred_temporality : Dict [type , AggregationTemporality ] = None ,
63
+ preferred_aggregation : Dict [type , Aggregation ] = None ,
65
64
):
66
65
super ().__init__ (
67
66
preferred_temporality = preferred_temporality ,
@@ -71,13 +70,13 @@ def __init__(
71
70
72
71
@override
73
72
def export (
74
- self ,
75
- metrics_data : MetricsData ,
76
- timeout_millis : float = 10_000 ,
77
- ** kwargs ,
73
+ self ,
74
+ metrics_data : MetricsData ,
75
+ timeout_millis : float = 10_000 ,
76
+ ** kwargs ,
78
77
) -> MetricExportResult :
79
78
serialized_data = encode_metrics (metrics_data ).SerializeToString ()
80
- self ._udp_exporter .send_data (data = serialized_data , format = "OTEL_V1_METRICS" ) # TODO: Convert to constant
79
+ self ._udp_exporter .send_data (data = serialized_data , signal_format = "OTEL_V1_METRICS" ) # TODO: Convert to constant
81
80
return MetricExportResult .SUCCESS # TODO: send appropriate status back. Need to??
82
81
83
82
def force_flush (self , timeout_millis : float = 10_000 ) -> bool :
@@ -94,7 +93,7 @@ def __init__(self, endpoint: Optional[str] = None):
94
93
@override
95
94
def export (self , spans : Sequence [ReadableSpan ]) -> SpanExportResult :
96
95
serialized_data = encode_spans (spans ).SerializeToString ()
97
- self ._udp_exporter .send_data (data = serialized_data , format = "OTEL_V1_TRACES" ) # TODO: Convert to constant
96
+ self ._udp_exporter .send_data (data = serialized_data , signal_format = "OTEL_V1_TRACES" ) # TODO: Convert to constant
98
97
return SpanExportResult .SUCCESS # TODO: send appropriate status back. Need to??
99
98
100
99
@override
0 commit comments