Skip to content

Commit 8b397a7

Browse files
refactor(exporter): simplify reconnection logic and address review comments
- Remove aggressive gRPC keepalive and retry settings to rely on defaults. - Fix compression precedence logic to correctly handle NoCompression (0). - Refactor channel initialization to be stateless (remove _channel_reconnection_enabled).- Update documentation to refer to 'OTLP-compatible receiver'
1 parent 2c848f4 commit 8b397a7

File tree

2 files changed

+23
-51
lines changed

2 files changed

+23
-51
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,12 @@ class OTLPExporterMixin(
273273
"""OTLP gRPC exporter mixin.
274274
275275
This class provides the base functionality for OTLP exporters that send
276-
telemetry data (spans or metrics) to an OpenTelemetry Collector via gRPC.
276+
telemetry data (spans or metrics) to an OTLP-compatible receiver via gRPC.
277277
It includes a configurable reconnection mechanism to handle transient
278-
collector outages.
278+
receiver outages.
279279
280280
Args:
281-
endpoint: OpenTelemetry Collector receiver endpoint
281+
endpoint: OTLP-compatible receiver endpoint
282282
insecure: Connection type
283283
credentials: ChannelCredentials object for server authentication
284284
headers: Headers to send when exporting
@@ -353,15 +353,26 @@ def __init__(
353353
self._collector_kwargs = None
354354

355355
self._compression = (
356-
compression
357-
or environ_to_compression(
358-
environ.get(OTEL_EXPORTER_OTLP_COMPRESSION)
359-
)
360-
or Compression.NoCompression
361-
)
356+
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
357+
if compression is None
358+
else compression
359+
) or Compression.NoCompression
360+
362361
self._channel = None
363362
self._client = None
364-
self._channel_reconnection_enabled = False
363+
364+
self._shutdown_in_progress = threading.Event()
365+
self._shutdown = False
366+
367+
if not self._insecure:
368+
self._credentials = _get_credentials(
369+
self._credentials,
370+
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
371+
OTEL_EXPORTER_OTLP_CERTIFICATE,
372+
OTEL_EXPORTER_OTLP_CLIENT_KEY,
373+
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
374+
)
375+
365376
self._initialize_channel_and_stub()
366377

367378
def _initialize_channel_and_stub(self):
@@ -371,56 +382,21 @@ def _initialize_channel_and_stub(self):
371382
This method is used during initialization and by the reconnection
372383
mechanism to reinitialize the channel on transient errors.
373384
"""
374-
# Add channel options for better reconnection behavior
375-
# Only add these if we're dealing with reconnection scenarios
376-
channel_options = []
377-
if self._channel_reconnection_enabled:
378-
channel_options = [
379-
("grpc.keepalive_time_ms", 30000),
380-
("grpc.keepalive_timeout_ms", 15000),
381-
("grpc.keepalive_permit_without_calls", 1),
382-
("grpc.initial_reconnect_backoff_ms", 5000),
383-
("grpc.min_reconnect_backoff_ms", 5000),
384-
("grpc.max_reconnect_backoff_ms", 30000),
385-
]
386-
387-
# Merge reconnection options with existing channel options
388-
current_options = list(self._channel_options)
389-
# Filter out options that we are about to override
390-
reconnection_keys = {key for key, _ in channel_options}
391-
current_options = [
392-
(key, value)
393-
for key, value in current_options
394-
if key not in reconnection_keys
395-
]
396-
final_options = tuple(current_options + channel_options)
397-
398385
if self._insecure:
399386
self._channel = insecure_channel(
400387
self._endpoint,
401388
compression=self._compression,
402-
options=final_options,
389+
options=self._channel_options,
403390
)
404391
else:
405-
self._credentials = _get_credentials(
406-
self._credentials,
407-
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
408-
OTEL_EXPORTER_OTLP_CERTIFICATE,
409-
OTEL_EXPORTER_OTLP_CLIENT_KEY,
410-
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
411-
)
412392
self._channel = secure_channel(
413393
self._endpoint,
414394
self._credentials,
415395
compression=self._compression,
416-
options=final_options,
396+
options=self._channel_options,
417397
)
418398
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]
419399

420-
if not hasattr(self, "_shutdown_in_progress"):
421-
self._shutdown_in_progress = threading.Event()
422-
self._shutdown = False
423-
424400
@abstractmethod
425401
def _translate_data(
426402
self,
@@ -478,7 +454,6 @@ def _export(
478454
str(e),
479455
)
480456
# Enable channel reconnection for subsequent calls
481-
self._channel_reconnection_enabled = True
482457
self._initialize_channel_and_stub()
483458

484459
if (

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,3 @@ def test_unavailable_reconnects(self):
569569
# must be from the reconnection logic.
570570
self.assertTrue(mock_insecure_channel.called)
571571
# Verify that reconnection enabled flag is set
572-
self.assertTrue(
573-
getattr(self.exporter, "_channel_reconnection_enabled", False)
574-
)

0 commit comments

Comments
 (0)