Skip to content

Commit f42ecd3

Browse files
committed
Refactor tests, HTTP exporters a bit
1 parent d1e04e1 commit f42ecd3

File tree

12 files changed

+221
-202
lines changed

12 files changed

+221
-202
lines changed

exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ classifiers = [
2929
]
3030
dependencies = [
3131
"opentelemetry-proto == 1.34.0.dev",
32+
"requests ~= 2.7",
3233
]
3334

3435
[project.urls]

exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
TypeVar,
2828
)
2929

30+
import requests
31+
3032
from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue
3133
from opentelemetry.proto.common.v1.common_pb2 import (
3234
ArrayValue as PB2ArrayValue,
@@ -108,6 +110,14 @@ def _encode_key_value(
108110
)
109111

110112

113+
def _is_retryable(resp: requests.Response) -> bool:
114+
if resp.status_code == 408:
115+
return True
116+
if resp.status_code >= 500 and resp.status_code <= 599:
117+
return True
118+
return False
119+
120+
111121
def _encode_array(
112122
array: Sequence[Any], allow_null: bool = False
113123
) -> Sequence[PB2AnyValue]:

exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ asgiref==3.7.2
22
Deprecated==1.2.14
33
importlib-metadata==6.11.0
44
iniconfig==2.0.0
5+
requests == 2.7.0
56
packaging==24.0
67
pluggy==1.5.0
78
protobuf==5.26.1

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,18 @@
7272
from opentelemetry.sdk.trace import ReadableSpan
7373
from opentelemetry.util.re import parse_env_headers
7474

75-
_JSON_CONFIG = json.dumps(
75+
# 5 is the maximum allowable attempts allowed by grpc retry policy.
76+
# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt,
77+
# plus or minus a 20% jitter. Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting
78+
# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the
79+
# timeout, and all fail. A header `grpc-retry-pushback-ms` when set by the server will override
80+
# and take precedence over this backoff. See https://grpc.io/docs/guides/retry/ for more details.
81+
_GRPC_RETRY_POLICY = json.dumps(
7682
{
7783
"methodConfig": [
7884
{
7985
"name": [dict()],
8086
"retryPolicy": {
81-
# 5 is the maximum allowable attempts allowed by grpc retry policy.
82-
# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt.
83-
# Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting
84-
# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the
85-
# timeout, and all fail. See https://grpc.io/docs/guides/retry/ for more details.
8687
"maxAttempts": 5,
8788
"initialBackoff": "1s",
8889
"maxBackoff": "9s",
@@ -213,8 +214,6 @@ class OTLPExporterMixin(
213214
compression: gRPC compression method to use
214215
"""
215216

216-
_MAX_RETRY_TIMEOUT = 64
217-
218217
def __init__(
219218
self,
220219
endpoint: Optional[str] = None,
@@ -276,7 +275,7 @@ def __init__(
276275
self._endpoint,
277276
compression=compression,
278277
options=[
279-
("grpc.service_config", _JSON_CONFIG),
278+
("grpc.service_config", _GRPC_RETRY_POLICY),
280279
],
281280
)
282281
else:
@@ -291,7 +290,7 @@ def __init__(
291290
credentials,
292291
compression=compression,
293292
options=[
294-
("grpc.service_config", _JSON_CONFIG),
293+
("grpc.service_config", _GRPC_RETRY_POLICY),
295294
],
296295
)
297296
self._client = self._stub(self._channel)

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 73 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,7 @@
2020
from unittest import TestCase
2121
from unittest.mock import ANY, Mock, patch
2222

23-
from google.rpc.code_pb2 import ( # pylint: disable=no-name-in-module
24-
ALREADY_EXISTS,
25-
OK,
26-
UNAVAILABLE,
27-
)
28-
from google.rpc.status_pb2 import Status # pylint: disable=no-name-in-module
29-
from grpc import Compression, server
30-
from grpc_status import rpc_status
23+
from grpc import Compression, StatusCode, server
3124

3225
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
3326
encode_spans,
@@ -90,25 +83,36 @@ def shutdown(self, timeout_millis=30_000):
9083
class TraceServiceServicerWithExportParams(TraceServiceServicer):
9184
def __init__(
9285
self,
93-
export_result: int,
86+
export_result: StatusCode,
9487
optional_export_sleep: Optional[float] = None,
88+
optional_first_time_retry_millis: Optional[int] = None,
9589
):
9690
self.export_result = export_result
9791
self.optional_export_sleep = optional_export_sleep
92+
self.optional_first_time_retry_millis = (
93+
optional_first_time_retry_millis
94+
)
95+
self.first_attempt = True
96+
self.num_requests = 0
97+
self.now = time.time()
9898

9999
# pylint: disable=invalid-name,unused-argument
100100
def Export(self, request, context):
101-
logger.warning("Export Request Received")
101+
self.num_requests += 1
102102
if self.optional_export_sleep:
103103
time.sleep(self.optional_export_sleep)
104-
if self.export_result != OK:
105-
context.abort_with_status(
106-
rpc_status.to_status(
107-
Status(
108-
code=self.export_result,
104+
if self.export_result != StatusCode.OK:
105+
if self.optional_first_time_retry_millis and self.first_attempt:
106+
self.first_attempt = False
107+
context.set_trailing_metadata(
108+
(
109+
(
110+
"grpc-retry-pushback-ms",
111+
str(self.optional_first_time_retry_millis),
112+
),
109113
)
110114
)
111-
)
115+
context.abort(self.export_result, "")
112116

113117
return ExportTraceServiceResponse()
114118

@@ -287,7 +291,7 @@ def test_otlp_exporter_otlp_compression_envvar(
287291

288292
def test_shutdown(self):
289293
add_TraceServiceServicer_to_server(
290-
TraceServiceServicerWithExportParams(OK),
294+
TraceServiceServicerWithExportParams(StatusCode.OK),
291295
self.server,
292296
)
293297
self.assertEqual(
@@ -305,7 +309,9 @@ def test_shutdown(self):
305309

306310
def test_shutdown_wait_last_export(self):
307311
add_TraceServiceServicer_to_server(
308-
TraceServiceServicerWithExportParams(OK, optional_export_sleep=1),
312+
TraceServiceServicerWithExportParams(
313+
StatusCode.OK, optional_export_sleep=1
314+
),
309315
self.server,
310316
)
311317

@@ -324,7 +330,9 @@ def test_shutdown_wait_last_export(self):
324330

325331
def test_shutdown_doesnot_wait_last_export(self):
326332
add_TraceServiceServicer_to_server(
327-
TraceServiceServicerWithExportParams(OK, optional_export_sleep=3),
333+
TraceServiceServicerWithExportParams(
334+
StatusCode.OK, optional_export_sleep=3
335+
),
328336
self.server,
329337
)
330338

@@ -346,7 +354,7 @@ def test_export_over_closed_grpc_channel(self):
346354
# pylint: disable=protected-access
347355

348356
add_TraceServiceServicer_to_server(
349-
TraceServiceServicerWithExportParams(OK),
357+
TraceServiceServicerWithExportParams(StatusCode.OK),
350358
self.server,
351359
)
352360
self.exporter.export([self.span])
@@ -358,61 +366,56 @@ def test_export_over_closed_grpc_channel(self):
358366
str(err.exception), "Cannot invoke RPC on closed channel!"
359367
)
360368

369+
def test_retry_with_server_pushback(self):
370+
mock_trace_service = TraceServiceServicerWithExportParams(
371+
StatusCode.UNAVAILABLE, optional_first_time_retry_millis=200
372+
)
373+
add_TraceServiceServicer_to_server(
374+
mock_trace_service,
375+
self.server,
376+
)
377+
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=10)
378+
before = time.time()
379+
self.assertEqual(
380+
exporter.export([self.span]),
381+
SpanExportResult.FAILURE,
382+
)
383+
after = time.time()
384+
# We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response.
385+
# So we expect the first request at time 0, second at time 0.2,
386+
# third at 1.2 (start of backoff policy), fourth at time 3.2, last at time 7.2.
387+
self.assertEqual(mock_trace_service.num_requests, 5)
388+
# The backoffs have a jitter +- 20%, so we have to put a higher bound than 7.2.
389+
self.assertTrue(after - before < 8.8)
390+
361391
def test_retry_timeout(self):
392+
mock_trace_service = TraceServiceServicerWithExportParams(
393+
StatusCode.UNAVAILABLE
394+
)
362395
add_TraceServiceServicer_to_server(
363-
TraceServiceServicerWithExportParams(UNAVAILABLE),
396+
mock_trace_service,
364397
self.server,
365398
)
366399
# Set timeout to 1.5 seconds.
367400
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=1.5)
368-
with self.assertLogs(level=WARNING) as warning:
369-
self.assertEqual(
370-
exporter.export([self.span]),
371-
SpanExportResult.FAILURE,
372-
)
373-
# Our GRPC retry policy starts with a 1 second backoff then doubles.
374-
# So we expect just two calls: one at time 0, one at time 1.
375-
# The final log is from when export fails.
376-
self.assertEqual(len(warning.records), 3)
377-
for idx, log in enumerate(warning.records):
378-
if idx != 2:
379-
self.assertEqual(
380-
"Export Request Received",
381-
log.message,
382-
)
383-
else:
384-
self.assertEqual(
385-
"Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED",
386-
log.message,
387-
)
388-
with self.assertLogs(level=WARNING) as warning:
389-
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=5)
390-
# pylint: disable=protected-access
391-
self.assertEqual(exporter._timeout, 5)
392-
self.assertEqual(
393-
exporter.export([self.span]),
394-
SpanExportResult.FAILURE,
395-
)
396-
# We expect 3 calls: time 0, time 1, time 3, but not time 7.
397-
# The final log is from when export fails.
398-
self.assertEqual(len(warning.records), 4)
399-
for idx, log in enumerate(warning.records):
400-
if idx != 3:
401-
self.assertEqual(
402-
"Export Request Received",
403-
log.message,
404-
)
405-
else:
406-
self.assertEqual(
407-
"Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED",
408-
log.message,
409-
)
401+
before = time.time()
402+
self.assertEqual(
403+
exporter.export([self.span]),
404+
SpanExportResult.FAILURE,
405+
)
406+
after = time.time()
407+
# Our retry starts with a 1 second backoff then doubles.
408+
# So we expect just two calls: one at time 0, one at time 1.
409+
self.assertEqual(mock_trace_service.num_requests, 2)
410+
# gRPC retry config waits for the timeout (1.5) before cancelling the request.
411+
self.assertTrue(after - before < 1.6)
410412

411413
def test_timeout_set_correctly(self):
414+
mock_trace_service = TraceServiceServicerWithExportParams(
415+
StatusCode.OK, optional_export_sleep=0.5
416+
)
412417
add_TraceServiceServicer_to_server(
413-
TraceServiceServicerWithExportParams(
414-
OK, optional_export_sleep=0.5
415-
),
418+
mock_trace_service,
416419
self.server,
417420
)
418421
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.4)
@@ -427,6 +430,7 @@ def test_timeout_set_correctly(self):
427430
"Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED",
428431
warning.records[-1].message,
429432
)
433+
self.assertEqual(mock_trace_service.num_requests, 1)
430434
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.8)
431435
self.assertEqual(
432436
exporter.export([self.span]),
@@ -444,7 +448,9 @@ def test_otlp_headers_from_env(self):
444448
def test_permanent_failure(self):
445449
with self.assertLogs(level=WARNING) as warning:
446450
add_TraceServiceServicer_to_server(
447-
TraceServiceServicerWithExportParams(ALREADY_EXISTS),
451+
TraceServiceServicerWithExportParams(
452+
StatusCode.ALREADY_EXISTS
453+
),
448454
self.server,
449455
)
450456
self.assertEqual(

0 commit comments

Comments
 (0)