1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- """OTLP Exporter"""
15+ """OTLP Exporter
16+
17+ This module provides a mixin class for OTLP exporters that send telemetry data
18+ to an OpenTelemetry Collector via gRPC. It includes a configurable reconnection
19+ logic to handle transient collector outages.
20+
21+ Environment Variables:
22+ OTEL_EXPORTER_OTLP_RETRY_INTERVAL: Base retry interval in seconds (default: 2.0).
23+ OTEL_EXPORTER_OTLP_MAX_RETRIES: Maximum number of retry attempts (default: 20).
24+ OTEL_EXPORTER_OTLP_RETRY_TIMEOUT: Total retry timeout in seconds (default: 300).
25+ OTEL_EXPORTER_OTLP_RETRY_MAX_DELAY: Maximum delay between retries in seconds (default: 60.0).
26+ OTEL_EXPORTER_OTLP_RETRY_FACTOR: Exponential backoff factor (default: 1.5).
27+ OTEL_EXPORTER_OTLP_RETRY_JITTER: Jitter factor for retry delay (default: 0.2).
28+ """
1629
1730import random
1831import threading
@@ -251,17 +264,24 @@ def _get_credentials(
251264 if certificate_file :
252265 client_key_file = environ .get (client_key_file_env_key )
253266 client_certificate_file = environ .get (client_certificate_file_env_key )
254- return _load_credentials (
267+ credentials = _load_credentials (
255268 certificate_file , client_key_file , client_certificate_file
256269 )
270+ if credentials is not None :
271+ return credentials
257272 return ssl_channel_credentials ()
258273
259274
260275# pylint: disable=no-member
261276class OTLPExporterMixin (
262277 ABC , Generic [SDKDataT , ExportServiceRequestT , ExportResultT , ExportStubT ]
263278):
264- """OTLP span exporter
279+ """OTLP gRPC exporter mixin.
280+
281+ This class provides the base functionality for OTLP exporters that send
282+ telemetry data (spans or metrics) to an OpenTelemetry Collector via gRPC.
283+ It includes a configurable reconnection mechanism to handle transient
284+ collector outages.
265285
266286 Args:
267287 endpoint: OpenTelemetry Collector receiver endpoint
@@ -308,6 +328,8 @@ def __init__(
308328 if parsed_url .netloc :
309329 self ._endpoint = parsed_url .netloc
310330
331+ self ._insecure = insecure
332+ self ._credentials = credentials
311333 self ._headers = headers or environ .get (OTEL_EXPORTER_OTLP_HEADERS )
312334 if isinstance (self ._headers , str ):
313335 temp_headers = parse_env_headers (self ._headers , liberal = True )
@@ -341,16 +363,49 @@ def __init__(
341363 if compression is None
342364 else compression
343365 ) or Compression .NoCompression
366+ self ._compression = compression
367+
368+ # Initialize the channel and stub using the proper method
369+ self ._initialize_channel_and_stub ()
344370
345- if insecure :
371+ def _initialize_channel_and_stub (self ):
372+ """
373+ Create a new gRPC channel and stub.
374+
375+ This method is used during initialization and by the reconnection
376+ mechanism to reinitialize the channel on transient errors.
377+ """
378+ # Add channel options for better reconnection behavior
379+ # Only add these if we're dealing with reconnection scenarios
380+ channel_options = []
381+ if hasattr (self , "_channel_reconnection_enabled" ):
382+ channel_options = [
383+ ("grpc.keepalive_time_ms" , 30000 ),
384+ ("grpc.keepalive_timeout_ms" , 15000 ),
385+ ("grpc.keepalive_permit_without_calls" , 1 ),
386+ ("grpc.initial_reconnect_backoff_ms" , 5000 ),
387+ ("grpc.min_reconnect_backoff_ms" , 5000 ),
388+ ("grpc.max_reconnect_backoff_ms" , 30000 ),
389+ ]
390+
391+ # Merge reconnection options with existing channel options
392+ current_options = list (self ._channel_options )
393+ # Filter out options that we are about to override
394+ reconnection_keys = {opt [0 ] for opt in channel_options }
395+ current_options = [
396+ opt for opt in current_options if opt [0 ] not in reconnection_keys
397+ ]
398+ final_options = tuple (current_options + channel_options )
399+
400+ if self ._insecure :
346401 self ._channel = insecure_channel (
347402 self ._endpoint ,
348- compression = compression ,
349- options = self . _channel_options ,
403+ compression = self . _compression ,
404+ options = final_options ,
350405 )
351406 else :
352407 self ._credentials = _get_credentials (
353- credentials ,
408+ self . _credentials ,
354409 _OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER ,
355410 OTEL_EXPORTER_OTLP_CERTIFICATE ,
356411 OTEL_EXPORTER_OTLP_CLIENT_KEY ,
@@ -359,13 +414,14 @@ def __init__(
359414 self ._channel = secure_channel (
360415 self ._endpoint ,
361416 self ._credentials ,
362- compression = compression ,
363- options = self . _channel_options ,
417+ compression = self . _compression ,
418+ options = final_options ,
364419 )
365420 self ._client = self ._stub (self ._channel ) # type: ignore [reportCallIssue]
366421
367- self ._shutdown_in_progress = threading .Event ()
368- self ._shutdown = False
422+ if not hasattr (self , "_shutdown_in_progress" ):
423+ self ._shutdown_in_progress = threading .Event ()
424+ self ._shutdown = False
369425
370426 @abstractmethod
371427 def _translate_data (
@@ -407,6 +463,26 @@ def _export(
407463 retry_info .retry_delay .seconds
408464 + retry_info .retry_delay .nanos / 1.0e9
409465 )
466+
467+ # For UNAVAILABLE errors, reinitialize the channel to force reconnection
468+ if error .code () == StatusCode .UNAVAILABLE : # type: ignore
469+ logger .debug (
470+ "Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error" ,
471+ self ._exporting ,
472+ )
473+ try :
474+ self ._channel .close ()
475+ except Exception as e :
476+ logger .debug (
477+ "Error closing channel for %s exporter to %s: %s" ,
478+ self ._exporting ,
479+ self ._endpoint ,
480+ str (e ),
481+ )
482+ # Enable channel reconnection for subsequent calls
483+ self ._channel_reconnection_enabled = True
484+ self ._initialize_channel_and_stub ()
485+
410486 if (
411487 error .code () not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
412488 or retry_num + 1 == _MAX_RETRYS
@@ -436,6 +512,12 @@ def _export(
436512 return self ._result .FAILURE # type: ignore [reportReturnType]
437513
438514 def shutdown (self , timeout_millis : float = 30_000 , ** kwargs ) -> None :
515+ """
516+ Shut down the exporter.
517+
518+ Args:
519+ timeout_millis: Timeout in milliseconds for shutting down the exporter.
520+ """
439521 if self ._shutdown :
440522 logger .warning ("Exporter already shutdown, ignoring call" )
441523 return
0 commit comments