Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""OTLP Exporter"""
"""OTLP Exporter

This module provides a mixin class for OTLP exporters that send telemetry data
to an OTLP-compatible receiver via gRPC. It includes a configurable reconnection
logic to handle transient collector outages.


"""

import random
import threading
Expand Down Expand Up @@ -251,17 +258,24 @@ def _get_credentials(
if certificate_file:
client_key_file = environ.get(client_key_file_env_key)
client_certificate_file = environ.get(client_certificate_file_env_key)
return _load_credentials(
credentials = _load_credentials(
certificate_file, client_key_file, client_certificate_file
)
if credentials is not None:
return credentials
return ssl_channel_credentials()


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
):
"""OTLP span exporter
"""OTLP gRPC exporter mixin.

This class provides the base functionality for OTLP exporters that send
telemetry data (spans or metrics) to an OpenTelemetry Collector via gRPC.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we say the same thing as above here (OTLP-compatible receiver)

It includes a configurable reconnection mechanism to handle transient
collector outages.

Args:
endpoint: OpenTelemetry Collector receiver endpoint
Expand Down Expand Up @@ -308,6 +322,8 @@ def __init__(
if parsed_url.netloc:
self._endpoint = parsed_url.netloc

self._insecure = insecure
self._credentials = credentials
self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
if isinstance(self._headers, str):
temp_headers = parse_env_headers(self._headers, liberal=True)
Expand Down Expand Up @@ -336,21 +352,58 @@ def __init__(
)
self._collector_kwargs = None

compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
if compression is None
else compression
) or Compression.NoCompression
self._compression = (
compression
or environ_to_compression(
environ.get(OTEL_EXPORTER_OTLP_COMPRESSION)
)
or Compression.NoCompression
)
self._channel = None
self._client = None
self._channel_reconnection_enabled = False
self._initialize_channel_and_stub()

def _initialize_channel_and_stub(self):
"""
Create a new gRPC channel and stub.

if insecure:
This method is used during initialization and by the reconnection
mechanism to reinitialize the channel on transient errors.
"""
# Add channel options for better reconnection behavior
# Only add these if we're dealing with reconnection scenarios
channel_options = []
if self._channel_reconnection_enabled:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought I think it makes most sense to pass this in as a param to this function

channel_options = [
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 15000),
("grpc.keepalive_permit_without_calls", 1),
("grpc.initial_reconnect_backoff_ms", 5000),
("grpc.min_reconnect_backoff_ms", 5000),
("grpc.max_reconnect_backoff_ms", 30000),
]

# Merge reconnection options with existing channel options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify the merge precedence here, which one will win

current_options = list(self._channel_options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to cast this to a list ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the list casting to inject aggressive keepalive settings (30s ping) for the new channel. My reasoning was to prevent silent connection drops after a recovery.

However, this is a policy decision. If you prefer we stick to the standard gRPC defaults (2 hours) to minimize traffic/complexity, I am happy to remove this entire block (and the list casting). The core fix (re-initialization) works without it.

What is your preference? Should we keep the aggressive settings or revert to defaults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't know, I haven't come across this issue and I don't know too much about how these params work..

# Filter out options that we are about to override
reconnection_keys = {key for key, _ in channel_options}
current_options = [
(key, value)
for key, value in current_options
if key not in reconnection_keys
]
final_options = tuple(current_options + channel_options)

if self._insecure:
self._channel = insecure_channel(
self._endpoint,
compression=compression,
options=self._channel_options,
compression=self._compression,
options=final_options,
)
else:
self._credentials = _get_credentials(
credentials,
self._credentials,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here ? just leave it in the initializer ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve significantly simplified the approach based on your feedback.

I removed the custom keepalive and retry settings entirely, so we’re just relying on gRPC defaults now. This should resolve the concerns about those specific values and merge precedence. I also refactored the channel initialization to be stateless and moved the shutdown and credentials logic back to init as you suggested.

Also updated the docs to use "OTLP-compatible receiver".

_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_KEY,
Expand All @@ -359,13 +412,14 @@ def __init__(
self._channel = secure_channel(
self._endpoint,
self._credentials,
compression=compression,
options=self._channel_options,
compression=self._compression,
options=final_options,
)
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]

self._shutdown_in_progress = threading.Event()
self._shutdown = False
if not hasattr(self, "_shutdown_in_progress"):
self._shutdown_in_progress = threading.Event()
self._shutdown = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here ? just leave it in the initializer ?


@abstractmethod
def _translate_data(
Expand Down Expand Up @@ -407,6 +461,26 @@ def _export(
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

# For UNAVAILABLE errors, reinitialize the channel to force reconnection
if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore
logger.debug(
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
self._exporting,
)
try:
self._channel.close()
except Exception as e:
logger.debug(
"Error closing channel for %s exporter to %s: %s",
self._exporting,
self._endpoint,
str(e),
)
# Enable channel reconnection for subsequent calls
self._channel_reconnection_enabled = True
self._initialize_channel_and_stub()

if (
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
or retry_num + 1 == _MAX_RETRYS
Expand Down Expand Up @@ -436,6 +510,12 @@ def _export(
return self._result.FAILURE # type: ignore [reportReturnType]

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""
Shut down the exporter.

Args:
timeout_millis: Timeout in milliseconds for shutting down the exporter.
"""
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from unittest import TestCase
from unittest.mock import Mock, patch

import grpc
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
Expand Down Expand Up @@ -89,8 +90,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
def _exporting(self):
return "traces"

def shutdown(self, timeout_millis=30_000):
return OTLPExporterMixin.shutdown(self, timeout_millis)
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs)


class TraceServiceServicerWithExportParams(TraceServiceServicer):
Expand Down Expand Up @@ -511,6 +512,16 @@ def test_timeout_set_correctly(self):
self.assertEqual(mock_trace_service.num_requests, 2)
self.assertAlmostEqual(after - before, 1.4, 1)

def test_channel_options_set_correctly(self):
"""Test that gRPC channel options are set correctly for keepalive and reconnection"""
# This test verifies that the channel is created with the right options
# We patch grpc.insecure_channel to ensure it is called without errors
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel"
) as mock_channel:
OTLPSpanExporterForTesting(insecure=True)
self.assertTrue(mock_channel.called)

def test_otlp_headers_from_env(self):
# pylint: disable=protected-access
# This ensures that there is no other header than standard user-agent.
Expand All @@ -534,3 +545,30 @@ def test_permanent_failure(self):
warning.records[-1].message,
"Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS",
)

def test_unavailable_reconnects(self):
"""Test that the exporter reconnects on UNAVAILABLE error"""
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE),
self.server,
)

# Spy on grpc.insecure_channel to verify it's called for reconnection
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel",
side_effect=grpc.insecure_channel,
) as mock_insecure_channel:
# Mock sleep to avoid waiting
with patch("time.sleep"):
# We expect FAILURE because the server keeps returning UNAVAILABLE
# but we want to verify reconnection attempts happened
self.exporter.export([self.span])

# Verify that we attempted to reinitialize the channel (called insecure_channel)
# Since the initial channel was created in setUp (unpatched), this call
# must be from the reconnection logic.
self.assertTrue(mock_insecure_channel.called)
# Verify that reconnection enabled flag is set
self.assertTrue(
getattr(self.exporter, "_channel_reconnection_enabled", False)
)