-
Notifications
You must be signed in to change notification settings - Fork 770
Fix: Reinitialize gRPC channel on UNAVAILABLE error (Fixes #4517) #4825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix: Reinitialize gRPC channel on UNAVAILABLE error (Fixes #4517) #4825
Conversation
c670f77 to
b7620d0
Compare
b7620d0 to
436ecc9
Compare
|
I understand this issue is related to the upstream gRPC bug (grpc/grpc#38290). I've analyzed that issue in depth, and the root cause appears to be a regression in the gRPC 'backup poller' (introduced in grpcio>=1.68.0) which fails to recover connections when the primary EventEngine is disabled (common in Python for fork safety). While upstream fixes are being explored (e.g., grpc/grpc#38480), the issue has persisted for months, leaving exporters stuck in an UNAVAILABLE state indefinitely after collector restarts. This PR implements a robust mitigation: detecting the persistent UNAVAILABLE state and forcing a channel re-initialization. This effectively resets the underlying poller state, allowing the exporter to recover immediately without requiring a full application restart. This approach provides stability for users while the complex upstream fix is finalized. |
| """OTLP Exporter | ||
| This module provides a mixin class for OTLP exporters that send telemetry data | ||
| to an OpenTelemetry Collector via gRPC. It includes a configurable reconnection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The collector is one of many places the OTLP exporters can send data to... any server that implements the OTLP RPC interface can also be sent data..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I've generalized the doc string to say 'OTLP-compatible receiver' instead of 'OpenTelemetry Collector'.
| OTEL_EXPORTER_OTLP_RETRY_INTERVAL: Base retry interval in seconds (default: 2.0). | ||
| OTEL_EXPORTER_OTLP_MAX_RETRIES: Maximum number of retry attempts (default: 20). | ||
| OTEL_EXPORTER_OTLP_RETRY_TIMEOUT: Total retry timeout in seconds (default: 300). | ||
| OTEL_EXPORTER_OTLP_RETRY_MAX_DELAY: Maximum delay between retries in seconds (default: 60.0). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these collector env vars ? I don't see OTEL_EXPORTER_OTLP_RETRY_INTERVAL defined in this repo..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that. I think I pulled those from the general spec or another SDK by mistake. I've removed the section entirely since they aren't used here.
| ) | ||
|
|
||
| # For UNAVAILABLE errors, reinitialize the channel to force reconnection | ||
| if error.code() == StatusCode.UNAVAILABLE: # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add and retry_num==0, I assume we don't need to do this again if we get another UNAVAILABLE error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. We really only need to force the re-init once to unstick the channel. If it fails again, standard backoff should handle it. Added the retry_num == 0 check.
| # Add channel options for better reconnection behavior | ||
| # Only add these if we're dealing with reconnection scenarios | ||
| channel_options = [] | ||
| if hasattr(self, "_channel_reconnection_enabled"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not default this to false, then just access directly via self._channel_reconnection_enabled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's much cleaner. I've moved the initialization to init so we can drop the hasattr check
| ] | ||
|
|
||
| # Merge reconnection options with existing channel options | ||
| current_options = list(self._channel_options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to cast this to a list ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the list casting to inject aggressive keepalive settings (30s ping) for the new channel. My reasoning was to prevent silent connection drops after a recovery.
However, this is a policy decision. If you prefer we stick to the standard gRPC defaults (2 hours) to minimize traffic/complexity, I am happy to remove this entire block (and the list casting). The core fix (re-initialization) works without it.
What is your preference? Should we keep the aggressive settings or revert to defaults?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't know, I haven't come across this issue and I don't know too much about how these params work..
| # Merge reconnection options with existing channel options | ||
| current_options = list(self._channel_options) | ||
| # Filter out options that we are about to override | ||
| reconnection_keys = {opt[0] for opt in channel_options} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use tuple unpacking to make it more clear, ex for opt, value in channel_options (here and below).
| """OTLP gRPC exporter mixin. | ||
| This class provides the base functionality for OTLP exporters that send | ||
| telemetry data (spans or metrics) to an OpenTelemetry Collector via gRPC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we say the same thing as above here (OTLP-compatible receiver)
| ("grpc.max_reconnect_backoff_ms", 30000), | ||
| ] | ||
|
|
||
| # Merge reconnection options with existing channel options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you clarify the merge precedence here, which one will win
| ] | ||
|
|
||
| # Merge reconnection options with existing channel options | ||
| current_options = list(self._channel_options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't know, I haven't come across this issue and I don't know too much about how these params work..
| # Add channel options for better reconnection behavior | ||
| # Only add these if we're dealing with reconnection scenarios | ||
| channel_options = [] | ||
| if self._channel_reconnection_enabled: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on second thought I think it makes most sense to pass this in as a param to this function
| self._shutdown = False | ||
| if not hasattr(self, "_shutdown_in_progress"): | ||
| self._shutdown_in_progress = threading.Event() | ||
| self._shutdown = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this here ? just leave it in the initializer ?
| self._credentials = _get_credentials( | ||
| credentials, | ||
| self._credentials, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this here ? just leave it in the initializer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve significantly simplified the approach based on your feedback.
I removed the custom keepalive and retry settings entirely, so we’re just relying on gRPC defaults now. This should resolve the concerns about those specific values and merge precedence. I also refactored the channel initialization to be stateless and moved the shutdown and credentials logic back to init as you suggested.
Also updated the docs to use "OTLP-compatible receiver".
…mments - Remove aggressive gRPC keepalive and retry settings to rely on defaults. - Fix compression precedence logic to correctly handle NoCompression (0). - Refactor channel initialization to be stateless (remove _channel_reconnection_enabled).- Update documentation to refer to 'OTLP-compatible receiver'
Description
This PR fixes issue #4517 where the OTLP gRPC exporter fails to reconnect to the collector after a restart (returning
UNAVAILABLE).Changes:
StatusCode.UNAVAILABLEin the export loop.Fixes #4517
Type of change
How Has This Been Tested?
I added a new regression test case test_unavailable_reconnects in exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py.
StatusCode.UNAVAILABLE.Does This PR Require a Contrib Repo Change?
Checklist: