Skip to content

Commit 3076c0f

Browse files
committed
Make many changes
1 parent 4bbecf8 commit 3076c0f

File tree

7 files changed

+184
-91
lines changed

7 files changed

+184
-91
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def __init__(
273273
)
274274
self._client = self._stub(self._channel)
275275

276-
self._export_lock = threading.Lock()
276+
self._shutdown_is_occuring = threading.Event()
277277
self._shutdown = False
278278

279279
@abstractmethod
@@ -294,60 +294,60 @@ def _export(
294294
# gets updated to a class that represents the proto
295295
# TracesData and use the code below instead.
296296
retry_info = RetryInfo()
297-
with self._export_lock:
298-
deadline_sec = time() + self._timeout
299-
for retry_num in range(1, _MAX_RETRYS + 1):
300-
backoff_seconds = 2 ** (retry_num - 1) * random.uniform(
301-
0.8, 1.2
297+
deadline_sec = time() + self._timeout
298+
for retry_num in range(1, _MAX_RETRYS + 1):
299+
backoff_seconds = 2 ** (retry_num - 1) * random.uniform(
300+
0.8, 1.2
301+
)
302+
try:
303+
self._client.Export(
304+
request=self._translate_data(data),
305+
metadata=self._headers,
306+
timeout=self._timeout,
302307
)
303-
try:
304-
self._client.Export(
305-
request=self._translate_data(data),
306-
metadata=self._headers,
307-
timeout=self._timeout,
308-
)
309-
return self._result.SUCCESS
310-
except RpcError as error:
311-
retry_info_bin = dict(error.trailing_metadata()).get(
312-
"google.rpc.retryinfo-bin"
308+
return self._result.SUCCESS
309+
except RpcError as error:
310+
retry_info_bin = dict(error.trailing_metadata()).get(
311+
"google.rpc.retryinfo-bin"
312+
)
313+
if retry_info_bin is not None:
314+
retry_info.ParseFromString(retry_info_bin)
315+
backoff_seconds = (
316+
retry_info.retry_delay.seconds
317+
+ retry_info.retry_delay.nanos / 1.0e9
313318
)
314-
if retry_info_bin is not None:
315-
retry_info.ParseFromString(retry_info_bin)
316-
backoff_seconds = (
317-
retry_info.retry_delay.seconds
318-
+ retry_info.retry_delay.nanos / 1.0e9
319-
)
320-
if (
321-
error.code() not in _RETRYABLE_ERROR_CODES
322-
or retry_num == _MAX_RETRYS
323-
or backoff_seconds > (deadline_sec - time())
324-
):
325-
logger.error(
326-
"Failed to export %s to %s, error code: %s",
327-
self._exporting,
328-
self._endpoint,
329-
error.code(),
330-
exc_info=error.code() == StatusCode.UNKNOWN,
331-
)
332-
return self._result.FAILURE
333-
logger.warning(
334-
"Transient error %s encountered while exporting logs batch, retrying in %.2fs.",
319+
if (
320+
error.code() not in _RETRYABLE_ERROR_CODES
321+
or retry_num == _MAX_RETRYS
322+
or backoff_seconds > (deadline_sec - time())
323+
or self._shutdown
324+
):
325+
logger.error(
326+
"Failed to export %s to %s, error code: %s",
327+
self._exporting,
328+
self._endpoint,
335329
error.code(),
336-
backoff_seconds,
330+
exc_info=error.code() == StatusCode.UNKNOWN,
337331
)
338-
sleep(backoff_seconds)
339-
# Not possible to reach here but the linter is complaining.
332+
return self._result.FAILURE
333+
logger.warning(
334+
"Transient error %s encountered while exporting logs batch, retrying in %.2fs.",
335+
error.code(),
336+
backoff_seconds,
337+
)
338+
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
339+
if shutdown:
340+
logger.warning("Shutdown in progress, aborting retry.")
341+
break
340342
return self._result.FAILURE
341343

342344
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
343345
if self._shutdown:
344346
logger.warning("Exporter already shutdown, ignoring call")
345347
return
346-
# wait for the last export if any
347-
self._export_lock.acquire(timeout=timeout_millis / 1e3)
348+
self._shutdown_is_occuring.set()
348349
self._shutdown = True
349350
self._channel.close()
350-
self._export_lock.release()
351351

352352
@property
353353
@abstractmethod

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -311,48 +311,38 @@ def test_shutdown(self):
311311
"Exporter already shutdown, ignoring batch",
312312
)
313313

314-
def test_shutdown_wait_last_export(self):
314+
def test_shutdown_interrupts_export_sleep(self):
315+
# Returns unavailable and asks for a 20 second sleep before retry.
315316
add_TraceServiceServicer_to_server(
316317
TraceServiceServicerWithExportParams(
317-
StatusCode.OK, optional_export_sleep=1
318+
StatusCode.UNAVAILABLE
318319
),
319320
self.server,
320321
)
321322

322323
export_thread = ThreadWithReturnValue(
323324
target=self.exporter.export, args=([self.span],)
324325
)
325-
export_thread.start()
326-
# Wait a bit for exporter to get lock and make export call.
327-
time.sleep(0.25)
328-
# pylint: disable=protected-access
329-
self.assertTrue(self.exporter._export_lock.locked())
330-
self.exporter.shutdown(timeout_millis=3000)
331-
# pylint: disable=protected-access
332-
self.assertTrue(self.exporter._shutdown)
333-
self.assertEqual(export_thread.join(), SpanExportResult.SUCCESS)
334-
335-
def test_shutdown_doesnot_wait_last_export(self):
336-
add_TraceServiceServicer_to_server(
337-
TraceServiceServicerWithExportParams(
338-
StatusCode.OK, optional_export_sleep=3
339-
),
340-
self.server,
341-
)
342-
343-
export_thread = ThreadWithReturnValue(
344-
target=self.exporter.export, args=([self.span],)
345-
)
346-
export_thread.start()
347-
# Wait for exporter to get lock and make export call.
348-
time.sleep(0.25)
349-
# pylint: disable=protected-access
350-
self.assertTrue(self.exporter._export_lock.locked())
351-
# Set to 1 seconds, so the 3 second server-side delay will not be reached.
352-
self.exporter.shutdown(timeout_millis=1000)
353-
# pylint: disable=protected-access
354-
self.assertTrue(self.exporter._shutdown)
355-
self.assertEqual(export_thread.join(), None)
326+
with self.assertLogs(level=WARNING) as warning:
327+
export_thread.start()
328+
# Wait a bit for export to fail and the backoff sleep to start
329+
time.sleep(.1)
330+
# The code should now be in a sleep that's between .8 and 1.2 seconds.
331+
begin_wait = time.time_ns()
332+
# pylint: disable=protected-access
333+
self.assertFalse(self.exporter._shutdown_is_occuring.is_set())
334+
self.exporter.shutdown()
335+
self.assertTrue(self.exporter._shutdown_is_occuring.is_set())
336+
export_result = export_thread.join()
337+
end_wait = time.time_ns()
338+
self.assertEqual(export_result, SpanExportResult.FAILURE)
339+
# Shutdown should have interrupted the sleep.
340+
self.assertTrue((end_wait - begin_wait) / 1e9 < .1)
341+
print(warning.records)
342+
self.assertEqual(
343+
warning.records[1].message,
344+
"Shutdown in progress, aborting retry."
345+
)
356346

357347
def test_export_over_closed_grpc_channel(self):
358348
# pylint: disable=protected-access

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import gzip
1616
import logging
17+
import threading
1718
import random
1819
import zlib
1920
from io import BytesIO
@@ -77,6 +78,7 @@ def __init__(
7778
compression: Optional[Compression] = None,
7879
session: Optional[requests.Session] = None,
7980
):
81+
self._shutdown_is_occuring = threading.Event()
8082
self._endpoint = endpoint or environ.get(
8183
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
8284
_append_logs_path(
@@ -164,14 +166,15 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
164166
serialized_data = encode_logs(batch).SerializeToString()
165167
deadline_sec = time() + self._timeout
166168
for retry_num in range(1, _MAX_RETRYS + 1):
167-
backoff_seconds = 2 ** (retry_num - 1) * random.uniform(0.8, 1.2)
168169
resp = self._export(serialized_data, deadline_sec - time())
169170
if resp.ok:
170171
return LogExportResult.SUCCESS
172+
backoff_seconds = 2 ** (retry_num - 1) * random.uniform(0.8, 1.2)
171173
if (
172174
not _is_retryable(resp)
173175
or retry_num == _MAX_RETRYS
174176
or backoff_seconds > (deadline_sec - time())
177+
or self._shutdown
175178
):
176179
_logger.error(
177180
"Failed to export logs batch code: %s, reason: %s",
@@ -184,8 +187,10 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
184187
resp.reason,
185188
backoff_seconds,
186189
)
187-
sleep(backoff_seconds)
188-
# Not possible to reach here but the linter is complaining.
190+
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
191+
if shutdown:
192+
_logger.warning("Shutdown in progress, aborting retry.")
193+
break
189194
return LogExportResult.FAILURE
190195

191196
def force_flush(self, timeout_millis: float = 10_000) -> bool:
@@ -196,6 +201,7 @@ def shutdown(self):
196201
if self._shutdown:
197202
_logger.warning("Exporter already shutdown, ignoring call")
198203
return
204+
self._shutdown_is_occuring.set()
199205
self._session.close()
200206
self._shutdown = True
201207

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import gzip
1616
import logging
17+
import threading
1718
import random
1819
import zlib
1920
from io import BytesIO
@@ -120,6 +121,7 @@ def __init__(
120121
| None = None,
121122
preferred_aggregation: dict[type, Aggregation] | None = None,
122123
):
124+
self._shutdown_is_occuring = threading.Event()
123125
self._endpoint = endpoint or environ.get(
124126
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
125127
_append_metrics_path(
@@ -224,6 +226,7 @@ def export(
224226
not _is_retryable(resp)
225227
or retry_num == _MAX_RETRYS
226228
or backoff_seconds > (deadline_sec - time())
229+
or self._shutdown
227230
):
228231
_logger.error(
229232
"Failed to export metrics batch code: %s, reason: %s",
@@ -236,14 +239,17 @@ def export(
236239
resp.reason,
237240
backoff_seconds,
238241
)
239-
sleep(backoff_seconds)
240-
# Not possible to reach here but the linter is complaining.
242+
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
243+
if shutdown:
244+
_logger.warning("Shutdown in progress, aborting retry.")
245+
break
241246
return MetricExportResult.FAILURE
242247

243248
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
244249
if self._shutdown:
245250
_logger.warning("Exporter already shutdown, ignoring call")
246251
return
252+
self._shutdown_is_occuring.set()
247253
self._session.close()
248254
self._shutdown = True
249255

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import logging
1717
import random
1818
import zlib
19+
import threading
1920
from io import BytesIO
2021
from os import environ
2122
from time import sleep, time
@@ -76,6 +77,7 @@ def __init__(
7677
compression: Optional[Compression] = None,
7778
session: Optional[requests.Session] = None,
7879
):
80+
self._shutdown_is_occuring = threading.Event()
7981
self._endpoint = endpoint or environ.get(
8082
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
8183
_append_trace_path(
@@ -162,14 +164,15 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
162164
serialized_data = encode_spans(spans).SerializePartialToString()
163165
deadline_sec = time() + self._timeout
164166
for retry_num in range(1, _MAX_RETRYS + 1):
165-
backoff_seconds = 2 ** (retry_num - 1) * random.uniform(0.8, 1.2)
166167
resp = self._export(serialized_data, deadline_sec - time())
167168
if resp.ok:
168169
return SpanExportResult.SUCCESS
170+
backoff_seconds = 2 ** (retry_num - 1) * random.uniform(0.8, 1.2)
169171
if (
170172
not _is_retryable(resp)
171173
or retry_num == _MAX_RETRYS
172174
or backoff_seconds > (deadline_sec - time())
175+
or self._shutdown
173176
):
174177
_logger.error(
175178
"Failed to export span batch code: %s, reason: %s",
@@ -182,14 +185,17 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
182185
resp.reason,
183186
backoff_seconds,
184187
)
185-
sleep(backoff_seconds)
186-
# Not possible to reach here but the linter is complaining.
188+
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
189+
if shutdown:
190+
_logger.warning("Shutdown in progress, aborting retry.")
191+
break
187192
return SpanExportResult.FAILURE
188193

189194
def shutdown(self):
190195
if self._shutdown:
191196
_logger.warning("Exporter already shutdown, ignoring call")
192197
return
198+
self._shutdown_is_occuring.set()
193199
self._session.close()
194200
self._shutdown = True
195201

0 commit comments

Comments
 (0)