Skip to content

Commit eb59db3

Browse files
committed
Add tests, changelog entry
1 parent 62d3699 commit eb59db3

File tree

11 files changed

+170
-78
lines changed

11 files changed

+170
-78
lines changed

CHANGELOG.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10-
- Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs.
11-
A +/-20% jitter was added to all backoffs. A pointless 32 second sleep that occurred after all retries
12-
had completed/failed was removed.
13-
([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)).
1410

11+
- Update OTLP gRPC/HTTP exporters: calling shutdown will now interrupt exporters that are sleeping
12+
before a retry attempt, and cause them to return failure immediately.
13+
Update BatchSpan/LogRecodProcessors: shutdown will now complete after 30 seconds of trying to finish
14+
exporting any buffered telemetry, instead of continuing to export until all telemetry was exported.
15+
([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)).
1516
- Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs.
1617
A +/-20% jitter was added to all backoffs. A pointless 32 second sleep that occurred after all retries
1718
had completed/failed was removed.

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from collections.abc import Sequence # noqa: F401
2121
from logging import getLogger
2222
from os import environ
23-
from time import sleep, time
23+
from time import time
2424
from typing import ( # noqa: F401
2525
Any,
2626
Callable,
@@ -293,54 +293,53 @@ def _export(
293293
# FIXME remove this check if the export type for traces
294294
# gets updated to a class that represents the proto
295295
# TracesData and use the code below instead.
296-
with self._export_lock:
297-
deadline_sec = time() + self._timeout
298-
for retry_num in range(_MAX_RETRYS):
299-
try:
300-
self._client.Export(
301-
request=self._translate_data(data),
302-
metadata=self._headers,
303-
timeout=deadline_sec - time(),
296+
deadline_sec = time() + self._timeout
297+
for retry_num in range(_MAX_RETRYS):
298+
try:
299+
self._client.Export(
300+
request=self._translate_data(data),
301+
metadata=self._headers,
302+
timeout=deadline_sec - time(),
303+
)
304+
return self._result.SUCCESS
305+
except RpcError as error:
306+
retry_info_bin = dict(error.trailing_metadata()).get(
307+
"google.rpc.retryinfo-bin"
308+
)
309+
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
310+
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
311+
if retry_info_bin is not None:
312+
retry_info = RetryInfo()
313+
retry_info.ParseFromString(retry_info_bin)
314+
backoff_seconds = (
315+
retry_info.retry_delay.seconds
316+
+ retry_info.retry_delay.nanos / 1.0e9
304317
)
305-
return self._result.SUCCESS
306-
except RpcError as error:
307-
retry_info_bin = dict(error.trailing_metadata()).get(
308-
"google.rpc.retryinfo-bin"
309-
)
310-
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
311-
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
312-
if retry_info_bin is not None:
313-
retry_info = RetryInfo()
314-
retry_info.ParseFromString(retry_info_bin)
315-
backoff_seconds = (
316-
retry_info.retry_delay.seconds
317-
+ retry_info.retry_delay.nanos / 1.0e9
318-
)
319-
if (
320-
error.code() not in _RETRYABLE_ERROR_CODES
321-
or retry_num + 1 == _MAX_RETRYS
322-
or backoff_seconds > (deadline_sec - time())
323-
or self._shutdown
324-
):
325-
logger.error(
326-
"Failed to export %s to %s, error code: %s",
327-
self._exporting,
328-
self._endpoint,
329-
error.code(),
330-
exc_info=error.code() == StatusCode.UNKNOWN,
331-
)
332-
return self._result.FAILURE
333-
logger.warning(
334-
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
335-
error.code(),
318+
if (
319+
error.code() not in _RETRYABLE_ERROR_CODES
320+
or retry_num + 1 == _MAX_RETRYS
321+
or backoff_seconds > (deadline_sec - time())
322+
or self._shutdown
323+
):
324+
logger.error(
325+
"Failed to export %s to %s, error code: %s",
336326
self._exporting,
337327
self._endpoint,
338-
backoff_seconds,
328+
error.code(),
329+
exc_info=error.code() == StatusCode.UNKNOWN,
339330
)
340-
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
341-
if shutdown:
342-
logger.warning("Shutdown in progress, aborting retry.")
343-
break
331+
return self._result.FAILURE
332+
logger.warning(
333+
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
334+
error.code(),
335+
self._exporting,
336+
self._endpoint,
337+
backoff_seconds,
338+
)
339+
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
340+
if shutdown:
341+
logger.warning("Shutdown in progress, aborting retry.")
342+
break
344343
# Not possible to reach here but the linter is complaining.
345344
return self._result.FAILURE
346345

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,10 @@ def test_shutdown(self):
312312
"Exporter already shutdown, ignoring batch",
313313
)
314314

315-
def test_shutdown_interrupts_export_sleep(self):
316-
# Returns unavailable and asks for a 20 second sleep before retry.
315+
def test_shutdown_interrupts_export_retry_backoff(self):
317316
add_TraceServiceServicer_to_server(
318317
TraceServiceServicerWithExportParams(
319-
StatusCode.UNAVAILABLE
318+
StatusCode.UNAVAILABLE,
320319
),
321320
self.server,
322321
)
@@ -325,24 +324,23 @@ def test_shutdown_interrupts_export_sleep(self):
325324
target=self.exporter.export, args=([self.span],)
326325
)
327326
with self.assertLogs(level=WARNING) as warning:
327+
begin_wait = time.time()
328328
export_thread.start()
329329
# Wait a bit for export to fail and the backoff sleep to start
330-
time.sleep(.1)
331-
# The code should now be in a sleep that's between .8 and 1.2 seconds.
332-
begin_wait = time.time_ns()
330+
time.sleep(0.05)
331+
# The code should now be in a 1 second backoff.
333332
# pylint: disable=protected-access
334333
self.assertFalse(self.exporter._shutdown_is_occuring.is_set())
335334
self.exporter.shutdown()
336335
self.assertTrue(self.exporter._shutdown_is_occuring.is_set())
337336
export_result = export_thread.join()
338-
end_wait = time.time_ns()
337+
end_wait = time.time()
339338
self.assertEqual(export_result, SpanExportResult.FAILURE)
340339
# Shutdown should have interrupted the sleep.
341-
self.assertTrue((end_wait - begin_wait) / 1e9 < .1)
342-
print(warning.records)
340+
self.assertTrue(end_wait - begin_wait < 0.2)
343341
self.assertEqual(
344342
warning.records[1].message,
345-
"Shutdown in progress, aborting retry."
343+
"Shutdown in progress, aborting retry.",
346344
)
347345

348346
def test_export_over_closed_grpc_channel(self):

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
import gzip
1616
import logging
1717
import random
18+
import threading
1819
import zlib
1920
from io import BytesIO
2021
from os import environ
21-
from time import sleep, time
22+
from time import time
2223
from typing import Dict, Optional, Sequence
2324

2425
import requests
@@ -174,7 +175,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
174175
not _is_retryable(resp)
175176
or retry_num + 1 == _MAX_RETRYS
176177
or backoff_seconds > (deadline_sec - time())
177-
or self._shutdown
178+
or self._shutdown
178179
):
179180
_logger.error(
180181
"Failed to export logs batch code: %s, reason: %s",
@@ -205,6 +206,7 @@ def shutdown(self):
205206
self._shutdown_is_occuring.set()
206207
self._session.close()
207208

209+
208210
def _compression_from_env() -> Compression:
209211
compression = (
210212
environ.get(

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
import gzip
1616
import logging
1717
import random
18+
import threading
1819
import zlib
1920
from io import BytesIO
2021
from os import environ
21-
from time import sleep, time
22+
from time import time
2223
from typing import ( # noqa: F401
2324
Any,
2425
Callable,

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
import gzip
1616
import logging
1717
import random
18-
import zlib
1918
import threading
19+
import zlib
2020
from io import BytesIO
2121
from os import environ
22-
from time import sleep, time
22+
from time import time
2323
from typing import Dict, Optional, Sequence
2424

2525
import requests
@@ -35,9 +35,6 @@
3535
from opentelemetry.exporter.otlp.proto.http._common import (
3636
_is_retryable,
3737
)
38-
from opentelemetry.exporter.otlp.proto.http._common import (
39-
_is_retryable,
40-
)
4138
from opentelemetry.sdk.environment_variables import (
4239
OTEL_EXPORTER_OTLP_CERTIFICATE,
4340
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,

exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import threading
1516
import time
1617
from logging import WARNING
1718
from os import environ
@@ -541,3 +542,34 @@ def export_side_effect(*args, **kwargs):
541542
mock_post.side_effect = export_side_effect
542543
exporter = OTLPMetricExporter(timeout=0.4)
543544
exporter.export(self.metrics["sum_int"])
545+
546+
@patch.object(Session, "post")
547+
def test_shutdown_interrupts_retry_backoff(self, mock_post):
548+
exporter = OTLPMetricExporter(timeout=1.5)
549+
550+
resp = Response()
551+
resp.status_code = 503
552+
resp.reason = "UNAVAILABLE"
553+
mock_post.return_value = resp
554+
thread = threading.Thread(
555+
target=exporter.export, args=(self.metrics["sum_int"],)
556+
)
557+
with self.assertLogs(level=WARNING) as warning:
558+
before = time.time()
559+
thread.start()
560+
# Wait for the first attempt to fail, then enter a 1 second backoff.
561+
time.sleep(0.05)
562+
# Should cause export to wake up and return.
563+
exporter.shutdown()
564+
thread.join()
565+
after = time.time()
566+
self.assertIn(
567+
"Transient error UNAVAILABLE encountered while exporting metrics batch, retrying in",
568+
warning.records[0].message,
569+
)
570+
self.assertIn(
571+
"Shutdown in progress, aborting retry.",
572+
warning.records[1].message,
573+
)
574+
575+
assert after - before < 0.2

exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
# pylint: disable=protected-access
1616

17+
import threading
1718
import time
1819
import unittest
1920
from logging import WARNING
@@ -388,3 +389,34 @@ def export_side_effect(*args, **kwargs):
388389
mock_post.side_effect = export_side_effect
389390
exporter = OTLPLogExporter(timeout=0.4)
390391
exporter.export(self._get_sdk_log_data())
392+
393+
@patch.object(Session, "post")
394+
def test_shutdown_interrupts_retry_backoff(self, mock_post):
395+
exporter = OTLPLogExporter(timeout=1.5)
396+
397+
resp = Response()
398+
resp.status_code = 503
399+
resp.reason = "UNAVAILABLE"
400+
mock_post.return_value = resp
401+
thread = threading.Thread(
402+
target=exporter.export, args=(self._get_sdk_log_data(),)
403+
)
404+
with self.assertLogs(level=WARNING) as warning:
405+
before = time.time()
406+
thread.start()
407+
# Wait for the first attempt to fail, then enter a 1 second backoff.
408+
time.sleep(0.05)
409+
# Should cause export to wake up and return.
410+
exporter.shutdown()
411+
thread.join()
412+
after = time.time()
413+
self.assertIn(
414+
"Transient error UNAVAILABLE encountered while exporting logs batch, retrying in",
415+
warning.records[0].message,
416+
)
417+
self.assertIn(
418+
"Shutdown in progress, aborting retry.",
419+
warning.records[1].message,
420+
)
421+
422+
assert after - before < 0.2

exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import threading
1516
import time
1617
import unittest
1718
from logging import WARNING
@@ -288,3 +289,32 @@ def export_side_effect(*args, **kwargs):
288289
mock_post.side_effect = export_side_effect
289290
exporter = OTLPSpanExporter(timeout=0.4)
290291
exporter.export([BASIC_SPAN])
292+
293+
@patch.object(Session, "post")
294+
def test_shutdown_interrupts_retry_backoff(self, mock_post):
295+
exporter = OTLPSpanExporter(timeout=1.5)
296+
297+
resp = Response()
298+
resp.status_code = 503
299+
resp.reason = "UNAVAILABLE"
300+
mock_post.return_value = resp
301+
thread = threading.Thread(target=exporter.export, args=([BASIC_SPAN],))
302+
with self.assertLogs(level=WARNING) as warning:
303+
before = time.time()
304+
thread.start()
305+
# Wait for the first attempt to fail, then enter a 1 second backoff.
306+
time.sleep(0.05)
307+
# Should cause export to wake up and return.
308+
exporter.shutdown()
309+
thread.join()
310+
after = time.time()
311+
self.assertIn(
312+
"Transient error UNAVAILABLE encountered while exporting span batch, retrying in",
313+
warning.records[0].message,
314+
)
315+
self.assertIn(
316+
"Shutdown in progress, aborting retry.",
317+
warning.records[1].message,
318+
)
319+
320+
assert after - before < 0.2

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def emit(self, data: Telemetry) -> None:
182182
self._queue.appendleft(data)
183183
if len(self._queue) >= self._max_export_batch_size:
184184
self._worker_awaken.set()
185-
185+
186186
def shutdown(self, timeout_millis: int = 30000):
187187
if self._shutdown:
188188
return
@@ -220,4 +220,4 @@ def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
220220
return False
221221
# Blocking call to export.
222222
self._export(BatchExportStrategy.EXPORT_ALL)
223-
return True
223+
return True

0 commit comments

Comments
 (0)