1
1
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
2
# SPDX-License-Identifier: Apache-2.0
3
3
import socket
4
- from typing import Dict , Optional , Sequence
4
+ from logging import Logger , getLogger
5
+ from typing import Dict , Optional , Sequence , Tuple
5
6
6
7
from typing_extensions import override
7
8
15
16
from opentelemetry .sdk .trace import ReadableSpan
16
17
from opentelemetry .sdk .trace .export import SpanExporter , SpanExportResult
17
18
18
- from logging import getLogger , Logger
19
-
20
19
DEFAULT_ENDPOINT = "127.0.0.1:2000"
21
20
PROTOCOL_HEADER = '{"format":"json","version":1}\n '
22
21
PROTOCOL_DELIMITER = "\n "
23
22
24
23
_logger : Logger = getLogger (__name__ )
25
24
25
+
26
26
class UdpExporter :
27
27
def __init__ (self , endpoint : Optional [str ] = None ):
28
- self ._endpoint = endpoint or DEFAULT_ENDPOINT # TODO: read from some env var??
28
+ self ._endpoint = endpoint or DEFAULT_ENDPOINT # TODO: read from some env var??
29
29
self ._host , self ._port = self ._parse_endpoint (self ._endpoint )
30
30
self ._socket = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
31
31
self ._socket .setblocking (False )
@@ -44,7 +44,7 @@ def shutdown(self):
44
44
self ._socket .close ()
45
45
46
46
# pylint: disable=no-self-use
47
- def _parse_endpoint (self , endpoint : str ) -> tuple [str , int ]:
47
+ def _parse_endpoint (self , endpoint : str ) -> Tuple [str , int ]:
48
48
try :
49
49
vals = endpoint .split (":" )
50
50
host = vals [0 ]
@@ -79,7 +79,9 @@ def export(
79
79
self ._udp_exporter .send_data (data = serialized_data , signal_format = "OTEL_V1_METRICS" ) # TODO: Convert to constant
80
80
return MetricExportResult .SUCCESS # TODO: send appropriate status back. Need to??
81
81
82
+ # pylint: disable=no-self-use
82
83
def force_flush (self , timeout_millis : float = 10_000 ) -> bool :
84
+ # TODO: implement force flush
83
85
return True
84
86
85
87
def shutdown (self , timeout_millis : float = 30_000 , ** kwargs ) -> None :
@@ -96,8 +98,10 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
96
98
self ._udp_exporter .send_data (data = serialized_data , signal_format = "OTEL_V1_TRACES" ) # TODO: Convert to constant
97
99
return SpanExportResult .SUCCESS # TODO: send appropriate status back. Need to??
98
100
101
+ # pylint: disable=no-self-use
99
102
@override
100
103
def force_flush (self , timeout_millis : int = 30000 ) -> bool :
104
+ # TODO: implement force flush
101
105
return True
102
106
103
107
@override
0 commit comments