27
27
Dict ,
28
28
Generic ,
29
29
List ,
30
+ Literal ,
31
+ NewType ,
30
32
Optional ,
31
33
Tuple ,
34
+ Type ,
32
35
TypeVar ,
33
36
Union ,
34
37
)
53
56
from opentelemetry .exporter .otlp .proto .grpc import (
54
57
_OTLP_GRPC_CHANNEL_OPTIONS ,
55
58
)
59
+ from opentelemetry .proto .collector .logs .v1 .logs_service_pb2 import (
60
+ ExportLogsServiceRequest ,
61
+ )
62
+ from opentelemetry .proto .collector .logs .v1 .logs_service_pb2_grpc import (
63
+ LogsServiceStub ,
64
+ )
65
+ from opentelemetry .proto .collector .metrics .v1 .metrics_service_pb2 import (
66
+ ExportMetricsServiceRequest ,
67
+ )
68
+ from opentelemetry .proto .collector .metrics .v1 .metrics_service_pb2_grpc import (
69
+ MetricsServiceStub ,
70
+ )
71
+ from opentelemetry .proto .collector .trace .v1 .trace_service_pb2 import (
72
+ ExportTraceServiceRequest ,
73
+ )
74
+ from opentelemetry .proto .collector .trace .v1 .trace_service_pb2_grpc import (
75
+ TraceServiceStub ,
76
+ )
56
77
from opentelemetry .proto .common .v1 .common_pb2 import ( # noqa: F401
57
78
AnyValue ,
58
79
ArrayValue ,
59
80
KeyValue ,
60
81
)
61
82
from opentelemetry .proto .resource .v1 .resource_pb2 import Resource # noqa: F401
83
+ from opentelemetry .sdk ._logs import LogData
84
+ from opentelemetry .sdk ._logs .export import LogExportResult
62
85
from opentelemetry .sdk ._shared_internal import DuplicateFilter
63
86
from opentelemetry .sdk .environment_variables import (
64
87
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER ,
71
94
OTEL_EXPORTER_OTLP_INSECURE ,
72
95
OTEL_EXPORTER_OTLP_TIMEOUT ,
73
96
)
74
- from opentelemetry .sdk .metrics .export import MetricsData
97
+ from opentelemetry .sdk .metrics .export import MetricExportResult , MetricsData
75
98
from opentelemetry .sdk .resources import Resource as SDKResource
76
99
from opentelemetry .sdk .trace import ReadableSpan
100
+ from opentelemetry .sdk .trace .export import SpanExportResult
77
101
from opentelemetry .util ._importlib_metadata import entry_points
78
102
from opentelemetry .util .re import parse_env_headers
79
103
92
116
logger = getLogger (__name__ )
93
117
# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc.
94
118
logger .addFilter (DuplicateFilter ())
95
- SDKDataT = TypeVar ("SDKDataT" )
119
+ SDKDataT = TypeVar (
120
+ "SDKDataT" ,
121
+ TypingSequence [LogData ],
122
+ MetricsData ,
123
+ TypingSequence [ReadableSpan ],
124
+ )
96
125
ResourceDataT = TypeVar ("ResourceDataT" )
97
126
TypingResourceT = TypeVar ("TypingResourceT" )
98
- ExportServiceRequestT = TypeVar ("ExportServiceRequestT" )
99
- ExportResultT = TypeVar ("ExportResultT" )
127
+ ExportServiceRequestT = TypeVar (
128
+ "ExportServiceRequestT" ,
129
+ ExportTraceServiceRequest ,
130
+ ExportMetricsServiceRequest ,
131
+ ExportLogsServiceRequest ,
132
+ )
133
+ ExportResultT = TypeVar (
134
+ "ExportResultT" ,
135
+ LogExportResult ,
136
+ MetricExportResult ,
137
+ SpanExportResult ,
138
+ )
139
+ ExportStubT = TypeVar (
140
+ "ExportStubT" , TraceServiceStub , MetricsServiceStub , LogsServiceStub
141
+ )
100
142
101
143
_ENVIRON_TO_COMPRESSION = {
102
144
None : None ,
@@ -119,7 +161,10 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:
119
161
if environ_key in environ
120
162
else None
121
163
)
122
- if environ_value not in _ENVIRON_TO_COMPRESSION :
164
+ if (
165
+ environ_value not in _ENVIRON_TO_COMPRESSION
166
+ and environ_value is not None
167
+ ):
123
168
raise InvalidCompressionValueException (environ_key , environ_value )
124
169
return _ENVIRON_TO_COMPRESSION [environ_value ]
125
170
@@ -151,7 +196,7 @@ def _load_credentials(
151
196
certificate_file : Optional [str ],
152
197
client_key_file : Optional [str ],
153
198
client_certificate_file : Optional [str ],
154
- ) -> Optional [ ChannelCredentials ] :
199
+ ) -> ChannelCredentials :
155
200
root_certificates = (
156
201
_read_file (certificate_file ) if certificate_file else None
157
202
)
@@ -214,7 +259,7 @@ def _get_credentials(
214
259
215
260
# pylint: disable=no-member
216
261
class OTLPExporterMixin (
217
- ABC , Generic [SDKDataT , ExportServiceRequestT , ExportResultT ]
262
+ ABC , Generic [SDKDataT , ExportServiceRequestT , ExportResultT , ExportStubT ]
218
263
):
219
264
"""OTLP span exporter
220
265
@@ -230,6 +275,8 @@ class OTLPExporterMixin(
230
275
231
276
def __init__ (
232
277
self ,
278
+ stub : ExportStubT ,
279
+ result : ExportResultT ,
233
280
endpoint : Optional [str ] = None ,
234
281
insecure : Optional [bool ] = None ,
235
282
credentials : Optional [ChannelCredentials ] = None ,
@@ -238,10 +285,11 @@ def __init__(
238
285
] = None ,
239
286
timeout : Optional [float ] = None ,
240
287
compression : Optional [Compression ] = None ,
241
- channel_options : Optional [TypingSequence [Tuple [str , str ]]] = None ,
288
+ channel_options : Optional [Tuple [Tuple [str , str ]]] = None ,
242
289
):
243
290
super ().__init__ ()
244
-
291
+ self ._result = result
292
+ self ._stub = stub
245
293
self ._endpoint = endpoint or environ .get (
246
294
OTEL_EXPORTER_OTLP_ENDPOINT , "http://localhost:4317"
247
295
)
@@ -250,15 +298,12 @@ def __init__(
250
298
251
299
if parsed_url .scheme == "https" :
252
300
insecure = False
301
+ insecure_exporter = environ .get (OTEL_EXPORTER_OTLP_INSECURE )
253
302
if insecure is None :
254
- insecure = environ .get (OTEL_EXPORTER_OTLP_INSECURE )
255
- if insecure is not None :
256
- insecure = insecure .lower () == "true"
303
+ if insecure_exporter is not None :
304
+ insecure = insecure_exporter .lower () == "true"
257
305
else :
258
- if parsed_url .scheme == "http" :
259
- insecure = True
260
- else :
261
- insecure = False
306
+ insecure = parsed_url .scheme == "http"
262
307
263
308
if parsed_url .netloc :
264
309
self ._endpoint = parsed_url .netloc
@@ -277,12 +322,12 @@ def __init__(
277
322
overridden_options = {
278
323
opt_name for (opt_name , _ ) in channel_options
279
324
}
280
- default_options = [
325
+ default_options = tuple (
281
326
(opt_name , opt_value )
282
327
for opt_name , opt_value in _OTLP_GRPC_CHANNEL_OPTIONS
283
328
if opt_name not in overridden_options
284
- ]
285
- self ._channel_options = tuple ( default_options ) + channel_options
329
+ )
330
+ self ._channel_options = default_options + channel_options
286
331
else :
287
332
self ._channel_options = tuple (_OTLP_GRPC_CHANNEL_OPTIONS )
288
333
@@ -317,24 +362,25 @@ def __init__(
317
362
compression = compression ,
318
363
options = self ._channel_options ,
319
364
)
320
- self ._client = self ._stub (self ._channel )
365
+ self ._client = self ._stub (self ._channel ) # type: ignore [reportCallIssue]
321
366
322
367
self ._shutdown_in_progress = threading .Event ()
323
368
self ._shutdown = False
324
369
325
370
@abstractmethod
326
371
def _translate_data (
327
- self , data : TypingSequence [SDKDataT ]
372
+ self ,
373
+ data : SDKDataT ,
328
374
) -> ExportServiceRequestT :
329
375
pass
330
376
331
377
def _export (
332
378
self ,
333
- data : Union [ TypingSequence [ ReadableSpan ], MetricsData ] ,
379
+ data : SDKDataT ,
334
380
) -> ExportResultT :
335
381
if self ._shutdown :
336
382
logger .warning ("Exporter already shutdown, ignoring batch" )
337
- return self ._result .FAILURE
383
+ return self ._result .FAILURE # type: ignore [reportReturnType]
338
384
339
385
# FIXME remove this check if the export type for traces
340
386
# gets updated to a class that represents the proto
@@ -347,10 +393,10 @@ def _export(
347
393
metadata = self ._headers ,
348
394
timeout = deadline_sec - time (),
349
395
)
350
- return self ._result .SUCCESS
396
+ return self ._result .SUCCESS # type: ignore [reportReturnType]
351
397
except RpcError as error :
352
- retry_info_bin = dict (error .trailing_metadata ()).get (
353
- "google.rpc.retryinfo-bin"
398
+ retry_info_bin = dict (error .trailing_metadata ()).get ( # type: ignore [reportAttributeAccessIssue]
399
+ "google.rpc.retryinfo-bin" # type: ignore [reportArgumentType]
354
400
)
355
401
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
356
402
backoff_seconds = 2 ** retry_num * random .uniform (0.8 , 1.2 )
@@ -362,7 +408,7 @@ def _export(
362
408
+ retry_info .retry_delay .nanos / 1.0e9
363
409
)
364
410
if (
365
- error .code () not in _RETRYABLE_ERROR_CODES
411
+ error .code () not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
366
412
or retry_num + 1 == _MAX_RETRYS
367
413
or backoff_seconds > (deadline_sec - time ())
368
414
or self ._shutdown
@@ -371,13 +417,13 @@ def _export(
371
417
"Failed to export %s to %s, error code: %s" ,
372
418
self ._exporting ,
373
419
self ._endpoint ,
374
- error .code (),
375
- exc_info = error .code () == StatusCode .UNKNOWN ,
420
+ error .code (), # type: ignore [reportAttributeAccessIssue]
421
+ exc_info = error .code () == StatusCode .UNKNOWN , # type: ignore [reportAttributeAccessIssue]
376
422
)
377
- return self ._result .FAILURE
423
+ return self ._result .FAILURE # type: ignore [reportReturnType]
378
424
logger .warning (
379
425
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs." ,
380
- error .code (),
426
+ error .code (), # type: ignore [reportAttributeAccessIssue]
381
427
self ._exporting ,
382
428
self ._endpoint ,
383
429
backoff_seconds ,
@@ -387,7 +433,7 @@ def _export(
387
433
logger .warning ("Shutdown in progress, aborting retry." )
388
434
break
389
435
# Not possible to reach here but the linter is complaining.
390
- return self ._result .FAILURE
436
+ return self ._result .FAILURE # type: ignore [reportReturnType]
391
437
392
438
def shutdown (self , timeout_millis : float = 30_000 , ** kwargs ) -> None :
393
439
if self ._shutdown :
0 commit comments