Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""OTLP Exporter"""
"""OTLP Exporter

This module provides a mixin class for OTLP exporters that send telemetry data
to an OpenTelemetry Collector via gRPC. It includes a configurable reconnection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The collector is one of many places the OTLP exporters can send data to... any server that implements the OTLP RPC interface can also be sent data..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I've generalized the doc string to say 'OTLP-compatible receiver' instead of 'OpenTelemetry Collector'.

logic to handle transient collector outages.

Environment Variables:
OTEL_EXPORTER_OTLP_RETRY_INTERVAL: Base retry interval in seconds (default: 2.0).
OTEL_EXPORTER_OTLP_MAX_RETRIES: Maximum number of retry attempts (default: 20).
OTEL_EXPORTER_OTLP_RETRY_TIMEOUT: Total retry timeout in seconds (default: 300).
OTEL_EXPORTER_OTLP_RETRY_MAX_DELAY: Maximum delay between retries in seconds (default: 60.0).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these collector env vars ? I don't see OTEL_EXPORTER_OTLP_RETRY_INTERVAL defined in this repo..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that. I think I pulled those from the general spec or another SDK by mistake. I've removed the section entirely since they aren't used here.

OTEL_EXPORTER_OTLP_RETRY_FACTOR: Exponential backoff factor (default: 1.5).
OTEL_EXPORTER_OTLP_RETRY_JITTER: Jitter factor for retry delay (default: 0.2).
"""

import random
import threading
Expand Down Expand Up @@ -251,17 +264,24 @@ def _get_credentials(
if certificate_file:
client_key_file = environ.get(client_key_file_env_key)
client_certificate_file = environ.get(client_certificate_file_env_key)
return _load_credentials(
credentials = _load_credentials(
certificate_file, client_key_file, client_certificate_file
)
if credentials is not None:
return credentials
return ssl_channel_credentials()


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
):
"""OTLP span exporter
"""OTLP gRPC exporter mixin.

This class provides the base functionality for OTLP exporters that send
telemetry data (spans or metrics) to an OpenTelemetry Collector via gRPC.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we say the same thing as above here (OTLP-compatible receiver)

It includes a configurable reconnection mechanism to handle transient
collector outages.

Args:
endpoint: OpenTelemetry Collector receiver endpoint
Expand Down Expand Up @@ -308,6 +328,8 @@ def __init__(
if parsed_url.netloc:
self._endpoint = parsed_url.netloc

self._insecure = insecure
self._credentials = credentials
self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
if isinstance(self._headers, str):
temp_headers = parse_env_headers(self._headers, liberal=True)
Expand Down Expand Up @@ -341,16 +363,49 @@ def __init__(
if compression is None
else compression
) or Compression.NoCompression
self._compression = compression

# Initialize the channel and stub using the proper method
self._initialize_channel_and_stub()

if insecure:
def _initialize_channel_and_stub(self):
"""
Create a new gRPC channel and stub.

This method is used during initialization and by the reconnection
mechanism to reinitialize the channel on transient errors.
"""
# Add channel options for better reconnection behavior
# Only add these if we're dealing with reconnection scenarios
channel_options = []
if hasattr(self, "_channel_reconnection_enabled"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not default this to false, then just access directly via self._channel_reconnection_enabled ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's much cleaner. I've moved the initialization to init so we can drop the hasattr check

channel_options = [
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 15000),
("grpc.keepalive_permit_without_calls", 1),
("grpc.initial_reconnect_backoff_ms", 5000),
("grpc.min_reconnect_backoff_ms", 5000),
("grpc.max_reconnect_backoff_ms", 30000),
]

# Merge reconnection options with existing channel options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify the merge precedence here, which one will win

current_options = list(self._channel_options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to cast this to a list ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the list casting to inject aggressive keepalive settings (30s ping) for the new channel. My reasoning was to prevent silent connection drops after a recovery.

However, this is a policy decision. If you prefer we stick to the standard gRPC defaults (2 hours) to minimize traffic/complexity, I am happy to remove this entire block (and the list casting). The core fix (re-initialization) works without it.

What is your preference? Should we keep the aggressive settings or revert to defaults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't know, I haven't come across this issue and I don't know too much about how these params work..

# Filter out options that we are about to override
reconnection_keys = {opt[0] for opt in channel_options}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use tuple unpacking to make it more clear, ex for opt, value in channel_options (here and below).

current_options = [
opt for opt in current_options if opt[0] not in reconnection_keys
]
final_options = tuple(current_options + channel_options)

if self._insecure:
self._channel = insecure_channel(
self._endpoint,
compression=compression,
options=self._channel_options,
compression=self._compression,
options=final_options,
)
else:
self._credentials = _get_credentials(
credentials,
self._credentials,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here ? just leave it in the initializer ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve significantly simplified the approach based on your feedback.

I removed the custom keepalive and retry settings entirely, so we’re just relying on gRPC defaults now. This should resolve the concerns about those specific values and merge precedence. I also refactored the channel initialization to be stateless and moved the shutdown and credentials logic back to init as you suggested.

Also updated the docs to use "OTLP-compatible receiver".

_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_KEY,
Expand All @@ -359,13 +414,14 @@ def __init__(
self._channel = secure_channel(
self._endpoint,
self._credentials,
compression=compression,
options=self._channel_options,
compression=self._compression,
options=final_options,
)
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]

self._shutdown_in_progress = threading.Event()
self._shutdown = False
if not hasattr(self, "_shutdown_in_progress"):
self._shutdown_in_progress = threading.Event()
self._shutdown = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here ? just leave it in the initializer ?


@abstractmethod
def _translate_data(
Expand Down Expand Up @@ -407,6 +463,26 @@ def _export(
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

# For UNAVAILABLE errors, reinitialize the channel to force reconnection
if error.code() == StatusCode.UNAVAILABLE: # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add and retry_num==0, I assume we don't need to do this again if we get another UNAVAILABLE error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. We really only need to force the re-init once to unstick the channel. If it fails again, standard backoff should handle it. Added the retry_num == 0 check.

logger.debug(
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
self._exporting,
)
try:
self._channel.close()
except Exception as e:
logger.debug(
"Error closing channel for %s exporter to %s: %s",
self._exporting,
self._endpoint,
str(e),
)
# Enable channel reconnection for subsequent calls
self._channel_reconnection_enabled = True
self._initialize_channel_and_stub()

if (
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
or retry_num + 1 == _MAX_RETRYS
Expand Down Expand Up @@ -436,6 +512,12 @@ def _export(
return self._result.FAILURE # type: ignore [reportReturnType]

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""
Shut down the exporter.

Args:
timeout_millis: Timeout in milliseconds for shutting down the exporter.
"""
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from unittest import TestCase
from unittest.mock import Mock, patch

import grpc
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
Expand Down Expand Up @@ -89,8 +90,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
def _exporting(self):
return "traces"

def shutdown(self, timeout_millis=30_000):
return OTLPExporterMixin.shutdown(self, timeout_millis)
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs)


class TraceServiceServicerWithExportParams(TraceServiceServicer):
Expand Down Expand Up @@ -511,6 +512,16 @@ def test_timeout_set_correctly(self):
self.assertEqual(mock_trace_service.num_requests, 2)
self.assertAlmostEqual(after - before, 1.4, 1)

def test_channel_options_set_correctly(self):
"""Test that gRPC channel options are set correctly for keepalive and reconnection"""
# This test verifies that the channel is created with the right options
# We patch grpc.insecure_channel to ensure it is called without errors
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel"
) as mock_channel:
OTLPSpanExporterForTesting(insecure=True)
self.assertTrue(mock_channel.called)

def test_otlp_headers_from_env(self):
# pylint: disable=protected-access
# This ensures that there is no other header than standard user-agent.
Expand All @@ -534,3 +545,30 @@ def test_permanent_failure(self):
warning.records[-1].message,
"Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS",
)

def test_unavailable_reconnects(self):
"""Test that the exporter reconnects on UNAVAILABLE error"""
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE),
self.server,
)

# Spy on grpc.insecure_channel to verify it's called for reconnection
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel",
side_effect=grpc.insecure_channel,
) as mock_insecure_channel:
# Mock sleep to avoid waiting
with patch("time.sleep"):
# We expect FAILURE because the server keeps returning UNAVAILABLE
# but we want to verify reconnection attempts happened
self.exporter.export([self.span])

# Verify that we attempted to reinitialize the channel (called insecure_channel)
# Since the initial channel was created in setUp (unpatched), this call
# must be from the reconnection logic.
self.assertTrue(mock_insecure_channel.called)
# Verify that reconnection enabled flag is set
self.assertTrue(
getattr(self.exporter, "_channel_reconnection_enabled", False)
)