Skip to content

Commit dcba91a

Browse files
committed
Remove gRPC retry config
1 parent 46e15f1 commit dcba91a

File tree

4 files changed

+92
-80
lines changed

4 files changed

+92
-80
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414

1515
"""OTLP Exporter"""
1616

17-
import json
17+
import random
1818
import threading
1919
from abc import ABC, abstractmethod
2020
from collections.abc import Sequence # noqa: F401
2121
from logging import getLogger
2222
from os import environ
23+
from time import sleep, time
2324
from typing import ( # noqa: F401
2425
Any,
2526
Callable,
@@ -34,6 +35,7 @@
3435
from typing import Sequence as TypingSequence
3536
from urllib.parse import urlparse
3637

38+
from google.rpc.error_details_pb2 import RetryInfo
3739
from typing_extensions import deprecated
3840

3941
from grpc import (
@@ -72,35 +74,18 @@
7274
from opentelemetry.sdk.trace import ReadableSpan
7375
from opentelemetry.util.re import parse_env_headers
7476

75-
# 5 is the maximum allowable attempts allowed by grpc retry policy.
76-
# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt,
77-
# plus or minus a 20% jitter. Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting
78-
# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the
79-
# timeout, and all fail. A header `grpc-retry-pushback-ms` when set by the server will override
80-
# and take precedence over this backoff. See https://grpc.io/docs/guides/retry/ for more details.
81-
_GRPC_RETRY_POLICY = json.dumps(
82-
{
83-
"methodConfig": [
84-
{
85-
"name": [dict()],
86-
"retryPolicy": {
87-
"maxAttempts": 5,
88-
"initialBackoff": "1s",
89-
"maxBackoff": "9s",
90-
"backoffMultiplier": 2,
91-
"retryableStatusCodes": [
92-
"UNAVAILABLE",
93-
"CANCELLED",
94-
"RESOURCE_EXHAUSTED",
95-
"ABORTED",
96-
"OUT_OF_RANGE",
97-
"DATA_LOSS",
98-
],
99-
},
100-
}
101-
]
102-
}
77+
_RETRYABLE_ERROR_CODES = frozenset(
78+
[
79+
StatusCode.CANCELLED,
80+
StatusCode.DEADLINE_EXCEEDED,
81+
StatusCode.RESOURCE_EXHAUSTED,
82+
StatusCode.ABORTED,
83+
StatusCode.OUT_OF_RANGE,
84+
StatusCode.UNAVAILABLE,
85+
StatusCode.DATA_LOSS,
86+
]
10387
)
88+
_MAX_RETRYS = 6
10489
logger = getLogger(__name__)
10590
SDKDataT = TypeVar("SDKDataT")
10691
ResourceDataT = TypeVar("ResourceDataT")
@@ -273,9 +258,6 @@ def __init__(
273258
self._channel = insecure_channel(
274259
self._endpoint,
275260
compression=compression,
276-
options=[
277-
("grpc.service_config", _GRPC_RETRY_POLICY),
278-
],
279261
)
280262
else:
281263
credentials = _get_credentials(
@@ -288,9 +270,6 @@ def __init__(
288270
self._endpoint,
289271
credentials,
290272
compression=compression,
291-
options=[
292-
("grpc.service_config", _GRPC_RETRY_POLICY),
293-
],
294273
)
295274
self._client = self._stub(self._channel)
296275

@@ -314,23 +293,50 @@ def _export(
314293
# FIXME remove this check if the export type for traces
315294
# gets updated to a class that represents the proto
316295
# TracesData and use the code below instead.
296+
retry_info = RetryInfo()
317297
with self._export_lock:
318-
try:
319-
self._client.Export(
320-
request=self._translate_data(data),
321-
metadata=self._headers,
322-
timeout=self._timeout,
323-
)
324-
return self._result.SUCCESS
325-
except RpcError as error:
326-
logger.error(
327-
"Failed to export %s to %s, error code: %s",
328-
self._exporting,
329-
self._endpoint,
330-
error.code(),
331-
exc_info=error.code() == StatusCode.UNKNOWN,
332-
)
333-
return self._result.FAILURE
298+
deadline_sec = time() + self._timeout
299+
backoff_seconds = 1 * random.uniform(0.8, 1.2)
300+
for retry_num in range(1, _MAX_RETRYS + 1):
301+
try:
302+
self._client.Export(
303+
request=self._translate_data(data),
304+
metadata=self._headers,
305+
timeout=self._timeout,
306+
)
307+
return self._result.SUCCESS
308+
except RpcError as error:
309+
retry_info_bin = dict(error.trailing_metadata()).get(
310+
"google.rpc.retryinfo-bin"
311+
)
312+
if retry_info_bin is not None:
313+
retry_info.ParseFromString(retry_info_bin)
314+
backoff_seconds = (
315+
retry_info.retry_delay.seconds
316+
+ retry_info.retry_delay.nanos / 1.0e9
317+
)
318+
if (
319+
error.code() not in _RETRYABLE_ERROR_CODES
320+
or retry_num == _MAX_RETRYS
321+
or backoff_seconds > (deadline_sec - time())
322+
):
323+
logger.error(
324+
"Failed to export %s to %s, error code: %s",
325+
self._exporting,
326+
self._endpoint,
327+
error.code(),
328+
exc_info=error.code() == StatusCode.UNKNOWN,
329+
)
330+
return self._result.FAILURE
331+
logger.warning(
332+
"Transient error %s encountered while exporting logs batch, retrying in %.2fs.",
333+
error.code(),
334+
backoff_seconds,
335+
)
336+
sleep(backoff_seconds)
337+
backoff_seconds *= 2 * random.uniform(0.8, 1.2)
338+
# Not possible to reach here but the linter is complaining.
339+
return self._result.FAILURE
334340

335341
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
336342
if self._shutdown:

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@
1818
from logging import WARNING, getLogger
1919
from typing import Any, Optional, Sequence
2020
from unittest import TestCase
21-
from unittest.mock import ANY, Mock, patch
21+
from unittest.mock import Mock, patch
2222

23+
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
24+
Duration,
25+
)
26+
from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module
27+
RetryInfo,
28+
)
2329
from grpc import Compression, StatusCode, server
2430

2531
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
@@ -84,35 +90,39 @@ class TraceServiceServicerWithExportParams(TraceServiceServicer):
8490
def __init__(
8591
self,
8692
export_result: StatusCode,
87-
optional_export_sleep: Optional[float] = None,
8893
optional_first_time_retry_millis: Optional[int] = None,
94+
optional_export_sleep: Optional[float] = None,
8995
):
9096
self.export_result = export_result
9197
self.optional_export_sleep = optional_export_sleep
9298
self.optional_first_time_retry_millis = (
9399
optional_first_time_retry_millis
94100
)
95-
self.first_attempt = True
96101
self.num_requests = 0
97-
self.now = time.time()
98102

99103
# pylint: disable=invalid-name,unused-argument
100104
def Export(self, request, context):
101105
self.num_requests += 1
102106
if self.optional_export_sleep:
103107
time.sleep(self.optional_export_sleep)
104108
if self.export_result != StatusCode.OK:
105-
if self.optional_first_time_retry_millis and self.first_attempt:
106-
self.first_attempt = False
109+
if (
110+
self.optional_first_time_retry_millis
111+
and self.num_requests == 1
112+
):
107113
context.set_trailing_metadata(
108114
(
109115
(
110-
"grpc-retry-pushback-ms",
111-
str(self.optional_first_time_retry_millis),
116+
"google.rpc.retryinfo-bin",
117+
RetryInfo(
118+
retry_delay=Duration(
119+
nanos=self.optional_first_time_retry_millis
120+
)
121+
).SerializeToString(),
112122
),
113123
)
114124
)
115-
context.abort(self.export_result, "")
125+
context.set_code(self.export_result)
116126

117127
return ExportTraceServiceResponse()
118128

@@ -262,7 +272,6 @@ def test_otlp_exporter_otlp_compression_unspecified(
262272
mock_insecure_channel.assert_called_once_with(
263273
"localhost:4317",
264274
compression=Compression.NoCompression,
265-
options=ANY,
266275
)
267276

268277
# pylint: disable=no-self-use, disable=unused-argument
@@ -286,7 +295,7 @@ def test_otlp_exporter_otlp_compression_envvar(
286295
"""Just OTEL_EXPORTER_OTLP_COMPRESSION should work"""
287296
OTLPSpanExporterForTesting(insecure=True)
288297
mock_insecure_channel.assert_called_once_with(
289-
"localhost:4317", compression=Compression.Gzip, options=ANY
298+
"localhost:4317", compression=Compression.Gzip
290299
)
291300

292301
def test_shutdown(self):
@@ -366,7 +375,7 @@ def test_export_over_closed_grpc_channel(self):
366375
str(err.exception), "Cannot invoke RPC on closed channel!"
367376
)
368377

369-
def test_retry_with_server_pushback(self):
378+
def test_retry_info_is_respected(self):
370379
mock_trace_service = TraceServiceServicerWithExportParams(
371380
StatusCode.UNAVAILABLE, optional_first_time_retry_millis=200
372381
)
@@ -381,34 +390,34 @@ def test_retry_with_server_pushback(self):
381390
SpanExportResult.FAILURE,
382391
)
383392
after = time.time()
384-
# We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response.
393+
# We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response only,
394+
# and then we do exponential backoff using that first value.
385395
# So we expect the first request at time 0, second at time 0.2,
386-
# third at 1.2 (start of backoff policy), fourth at time 3.2, last at time 7.2.
387-
self.assertEqual(mock_trace_service.num_requests, 5)
388-
# The backoffs have a jitter +- 20%, so we have to put a higher bound than 7.2.
389-
self.assertTrue(after - before < 8.8)
396+
# third at .6, fourth at 1.4, fifth at 3, and final one at 6.2.
397+
self.assertEqual(mock_trace_service.num_requests, 6)
398+
# The backoffs have a jitter +- 20%, so we have to put a higher bound than 6.2.
399+
self.assertTrue(after - before < 7.5)
390400

391-
def test_retry_timeout(self):
401+
def test_retry_not_made_if_would_exceed_timeout(self):
392402
mock_trace_service = TraceServiceServicerWithExportParams(
393403
StatusCode.UNAVAILABLE
394404
)
395405
add_TraceServiceServicer_to_server(
396406
mock_trace_service,
397407
self.server,
398408
)
399-
# Set timeout to 1.5 seconds.
400-
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=1.5)
409+
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=4)
401410
before = time.time()
402411
self.assertEqual(
403412
exporter.export([self.span]),
404413
SpanExportResult.FAILURE,
405414
)
406415
after = time.time()
407416
# Our retry starts with a 1 second backoff then doubles.
408-
# So we expect just two calls: one at time 0, one at time 1.
409-
self.assertEqual(mock_trace_service.num_requests, 2)
410-
# gRPC retry config waits for the timeout (1.5) before cancelling the request.
411-
self.assertTrue(after - before < 1.6)
417+
# First call at time 0, second at time 1, third at time 3, fourth would exceed timeout.
418+
self.assertEqual(mock_trace_service.num_requests, 3)
419+
# There's a +/-20% jitter on each backoff.
420+
self.assertTrue(after - before < 3.7)
412421

413422
def test_timeout_set_correctly(self):
414423
mock_trace_service = TraceServiceServicerWithExportParams(

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from os.path import dirname
1919
from typing import List
2020
from unittest import TestCase
21-
from unittest.mock import ANY, patch
21+
from unittest.mock import patch
2222

2323
from grpc import ChannelCredentials, Compression
2424

@@ -299,7 +299,6 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel):
299299
mock_insecure_channel.assert_called_once_with(
300300
"localhost:4317",
301301
compression=Compression.NoCompression,
302-
options=ANY,
303302
)
304303

305304
def test_split_metrics_data_many_data_points(self):

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import os
1818
from unittest import TestCase
19-
from unittest.mock import ANY, Mock, PropertyMock, patch
19+
from unittest.mock import Mock, PropertyMock, patch
2020

2121
from grpc import ChannelCredentials, Compression
2222

@@ -335,7 +335,6 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel):
335335
mock_insecure_channel.assert_called_once_with(
336336
"localhost:4317",
337337
compression=Compression.NoCompression,
338-
options=ANY,
339338
)
340339

341340
# pylint: disable=no-self-use
@@ -354,7 +353,6 @@ def test_otlp_exporter_otlp_compression_precendence(
354353
mock_insecure_channel.assert_called_once_with(
355354
"localhost:4317",
356355
compression=Compression.Gzip,
357-
options=ANY,
358356
)
359357

360358
def test_translate_spans(self):

0 commit comments

Comments
 (0)