Skip to content

Commit 02220bf

Browse files
committed
Initital commit to imporve shutdown behavior.
1 parent c9ad4bc commit 02220bf

File tree

5 files changed

+137
-39
lines changed

5 files changed

+137
-39
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
import collections
1818
import enum
19+
import inspect
1920
import logging
2021
import os
2122
import threading
23+
import time
2224
import weakref
2325
from abc import abstractmethod
2426
from typing import (
@@ -130,17 +132,27 @@ def worker(self):
130132
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
131133
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
132134
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
135+
print(
136+
"In worker loop:{}, {}, {}".format(
137+
sleep_interrupted,
138+
self._schedule_delay,
139+
self._schedule_delay_millis,
140+
)
141+
)
133142
if self._shutdown:
143+
print("Shutdown is set...")
134144
break
135145
self._export(
136146
BatchExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
137147
if sleep_interrupted
138148
else BatchExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
139149
)
140150
self._worker_awaken.clear()
151+
print("last export bach...")
141152
self._export(BatchExportStrategy.EXPORT_ALL)
142153

143154
def _export(self, batch_strategy: BatchExportStrategy) -> None:
155+
print("export started...:{}".format(batch_strategy))
144156
with self._export_lock:
145157
iteration = 0
146158
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
@@ -149,6 +161,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
149161
iteration += 1
150162
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
151163
try:
164+
print("SIZE: {}".format(len(self._queue)))
152165
self._exporter.export(
153166
[
154167
# Oldest records are at the back, so pop from there.
@@ -161,6 +174,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
161174
)
162175
]
163176
)
177+
print("export succeded??")
164178
except Exception: # pylint: disable=broad-exception-caught
165179
self._logger.exception(
166180
"Exception while exporting %s.", self._exporting
@@ -180,16 +194,32 @@ def emit(self, data: Telemetry) -> None:
180194
if len(self._queue) >= self._max_export_batch_size:
181195
self._worker_awaken.set()
182196

183-
def shutdown(self):
197+
# LoggerProvider calls shutdown without arguments currently, so the default is used.
198+
def shutdown(self, timeout_millis=30000):
184199
if self._shutdown:
185200
return
186201
# Prevents emit and force_flush from further calling export.
187202
self._shutdown = True
188-
# Interrupts sleep in the worker, if it's sleeping.
203+
# Interrupts sleep in the worker if it's sleeping.
189204
self._worker_awaken.set()
190-
# Main worker loop should exit after one final export call with flush all strategy.
205+
# Wait a tiny bit for the worker thread to wake and call export for a final time.
206+
time.sleep(0.1)
207+
# We will force shutdown after 30 seconds.
208+
for _ in range(10):
209+
# If export is not being called, we can shutdown.
210+
if not self._export_lock.locked():
211+
break
212+
time.sleep(timeout_millis / 1000 / 10)
213+
# We want to shutdown immediately because we already waited 30 seconds. Some exporter's shutdown support a timeout param.
214+
if (
215+
"timeout_millis"
216+
in inspect.getfullargspec(self._exporter.shutdown).args
217+
):
218+
self._exporter.shutdown(timeout_millis=0) # type: ignore
219+
else:
220+
self._exporter.shutdown()
221+
# Worker thread should be finished at this point and return instantly.
191222
self._worker_thread.join()
192-
self._exporter.shutdown()
193223

194224
# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
195225
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:

opentelemetry-sdk/test-requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ packaging==24.0
66
pluggy==1.5.0
77
psutil==5.9.6; sys_platform != 'win32'
88
py-cpuinfo==9.0.0
9+
requests==2.32.3
910
pytest==7.4.4
1011
tomli==2.0.1
1112
typing_extensions==4.10.0
@@ -15,3 +16,6 @@ zipp==3.19.2
1516
-e opentelemetry-api
1617
-e opentelemetry-semantic-conventions
1718
-e opentelemetry-sdk
19+
-e exporter/opentelemetry-exporter-otlp-proto-http
20+
-e exporter/opentelemetry-exporter-otlp-proto-common
21+
-e opentelemetry-proto

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import os
1818
import time
1919
import unittest
20+
from concurrent.futures import ThreadPoolExecutor
2021
from sys import version_info
2122
from unittest.mock import Mock, patch
2223

@@ -331,9 +332,12 @@ def test_simple_log_record_processor_different_msg_types_with_formatter(
331332
self.assertEqual(expected, emitted)
332333

333334

335+
# Many more test cases for the BatchLogRecordProcessor exist under
336+
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
337+
# Important: make sure to call .shutdown() on the BatchLogRecordProcessor
338+
# before the end of the test, otherwise the worker thread will continue
339+
# to run after the end of the test.
334340
class TestBatchLogRecordProcessor(unittest.TestCase):
335-
# Many more test cases for the BatchLogRecordProcessor exist under
336-
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
337341
def test_emit_call_log_record(self):
338342
exporter = InMemoryLogExporter()
339343
log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter))
@@ -346,6 +350,34 @@ def test_emit_call_log_record(self):
346350

347351
logger.error("error")
348352
self.assertEqual(log_record_processor.emit.call_count, 1)
353+
log_record_processor.shutdown()
354+
355+
def test_with_multiple_threads(self):
356+
exporter = InMemoryLogExporter()
357+
batch_processor = BatchLogRecordProcessor(
358+
exporter,
359+
max_queue_size=3000,
360+
max_export_batch_size=50,
361+
schedule_delay_millis=30000,
362+
export_timeout_millis=500,
363+
)
364+
365+
def bulk_emit(num_emit):
366+
for _ in range(num_emit):
367+
batch_processor.emit(EMPTY_LOG)
368+
369+
total_expected_logs = 0
370+
with ThreadPoolExecutor(max_workers=69) as executor:
371+
for num_logs_to_emit in range(1, 70):
372+
executor.submit(bulk_emit, num_logs_to_emit)
373+
total_expected_logs += num_logs_to_emit
374+
375+
executor.shutdown()
376+
377+
batch_processor.shutdown()
378+
# Wait a bit for logs to flush.
379+
time.sleep(2)
380+
assert len(exporter.get_finished_logs()) == total_expected_logs
349381

350382
@mark.skipif(
351383
version_info < (3, 10),
@@ -404,6 +436,7 @@ def test_args(self):
404436
self.assertEqual(
405437
log_record_processor._batch_processor._export_timeout_millis, 15000
406438
)
439+
log_record_processor.shutdown()
407440

408441
@patch.dict(
409442
"os.environ",
@@ -432,6 +465,7 @@ def test_env_vars(self):
432465
self.assertEqual(
433466
log_record_processor._batch_processor._export_timeout_millis, 15000
434467
)
468+
log_record_processor.shutdown()
435469

436470
def test_args_defaults(self):
437471
exporter = InMemoryLogExporter()
@@ -451,6 +485,7 @@ def test_args_defaults(self):
451485
self.assertEqual(
452486
log_record_processor._batch_processor._export_timeout_millis, 30000
453487
)
488+
log_record_processor.shutdown()
454489

455490
@patch.dict(
456491
"os.environ",
@@ -481,6 +516,7 @@ def test_args_env_var_value_error(self):
481516
self.assertEqual(
482517
log_record_processor._batch_processor._export_timeout_millis, 30000
483518
)
519+
log_record_processor.shutdown()
484520

485521
def test_args_none_defaults(self):
486522
exporter = InMemoryLogExporter()
@@ -506,6 +542,7 @@ def test_args_none_defaults(self):
506542
self.assertEqual(
507543
log_record_processor._batch_processor._export_timeout_millis, 30000
508544
)
545+
log_record_processor.shutdown()
509546

510547
def test_validation_negative_max_queue_size(self):
511548
exporter = InMemoryLogExporter()

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
import time
2020
import unittest
2121
import weakref
22-
from concurrent.futures import ThreadPoolExecutor
2322
from platform import system
24-
from sys import version_info
25-
from unittest.mock import Mock
23+
from unittest.mock import Mock, patch
2624

2725
import pytest
28-
from pytest import mark
26+
from requests import Session
27+
from requests.models import Response
2928

29+
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
30+
OTLPLogExporter,
31+
)
32+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
33+
OTLPSpanExporter,
34+
)
3035
from opentelemetry.sdk._logs import (
3136
LogData,
3237
LogRecord,
@@ -46,13 +51,22 @@
4651
BASIC_SPAN = ReadableSpan(
4752
"MySpan",
4853
instrumentation_scope=InstrumentationScope("example", "example"),
54+
context=Mock(
55+
**{
56+
"trace_state": {"a": "b", "c": "d"},
57+
"span_id": 10217189687419569865,
58+
"trace_id": 67545097771067222548457157018666467027,
59+
}
60+
),
4961
)
5062

5163
if system() != "Windows":
5264
multiprocessing.set_start_method("fork")
5365

5466

5567
# BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor.
68+
# Important: make sure to call .shutdown() before the end of the test,
69+
# otherwise the worker thread will continue to run after the end of the test.
5670
@pytest.mark.parametrize(
5771
"batch_processor_class,telemetry",
5872
[(BatchLogRecordProcessor, EMPTY_LOG), (BatchSpanProcessor, BASIC_SPAN)],
@@ -80,6 +94,7 @@ def test_telemetry_exported_once_batch_size_reached(
8094
after_export = time.time_ns()
8195
# Shows the worker's 30 second sleep was interrupted within a second.
8296
assert after_export - before_export < 1e9
97+
batch_processor.shutdown()
8398

8499
# pylint: disable=no-self-use
85100
def test_telemetry_exported_once_schedule_delay_reached(
@@ -96,6 +111,7 @@ def test_telemetry_exported_once_schedule_delay_reached(
96111
batch_processor._batch_processor.emit(telemetry)
97112
time.sleep(0.2)
98113
exporter.export.assert_called_once_with([telemetry])
114+
batch_processor.shutdown()
99115

100116
def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
101117
self, batch_processor_class, telemetry
@@ -136,33 +152,7 @@ def test_force_flush_flushes_telemetry(
136152
batch_processor._batch_processor.emit(telemetry)
137153
batch_processor.force_flush()
138154
exporter.export.assert_called_once_with([telemetry for _ in range(10)])
139-
140-
@mark.skipif(
141-
system() == "Windows" or version_info < (3, 9),
142-
reason="This test randomly fails on windows and python 3.8.",
143-
)
144-
def test_with_multiple_threads(self, batch_processor_class, telemetry):
145-
exporter = Mock()
146-
batch_processor = batch_processor_class(
147-
exporter,
148-
max_queue_size=3000,
149-
max_export_batch_size=1000,
150-
schedule_delay_millis=30000,
151-
export_timeout_millis=500,
152-
)
153-
154-
def bulk_emit_and_flush(num_emit):
155-
for _ in range(num_emit):
156-
batch_processor._batch_processor.emit(telemetry)
157-
batch_processor.force_flush()
158-
159-
with ThreadPoolExecutor(max_workers=69) as executor:
160-
for idx in range(69):
161-
executor.submit(bulk_emit_and_flush, idx + 1)
162-
163-
executor.shutdown()
164-
# 69 calls to force flush.
165-
assert exporter.export.call_count == 69
155+
batch_processor.shutdown()
166156

167157
@unittest.skipUnless(
168158
hasattr(os, "fork"),
@@ -202,6 +192,7 @@ def child(conn):
202192
batch_processor.force_flush()
203193
# Single export for the telemetry we emitted at the start of the test.
204194
assert exporter.export.call_count == 1
195+
batch_processor.shutdown()
205196

206197
def test_record_processor_is_garbage_collected(
207198
self, batch_processor_class, telemetry
@@ -217,3 +208,33 @@ def test_record_processor_is_garbage_collected(
217208

218209
# Then the reference to the processor should no longer exist
219210
assert weak_ref() is None
211+
212+
def test_shutdown_waits_30sec_before_cancelling_export(
213+
self, batch_processor_class, telemetry, caplog
214+
):
215+
resp = Response()
216+
resp.status_code = 200
217+
218+
def export_side_effect(*args, **kwargs):
219+
time.sleep(5)
220+
return resp
221+
222+
if type(BASIC_SPAN) is type(telemetry):
223+
exporter = OTLPSpanExporter()
224+
else:
225+
exporter = OTLPLogExporter()
226+
227+
with patch.object(Session, "post") as mock_post:
228+
mock_post.side_effect = export_side_effect
229+
processor = batch_processor_class(
230+
exporter,
231+
max_queue_size=200,
232+
max_export_batch_size=10,
233+
schedule_delay_millis=30000,
234+
)
235+
print("emitting..")
236+
processor._batch_processor.emit(telemetry)
237+
print("shutting down..")
238+
processor.shutdown(timeout_millis=4000)
239+
print("finished shutting down..")
240+
print(caplog.record_tuples)

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,12 @@ def test_simple_span_processor_not_sampled(self):
144144
self.assertListEqual([], spans_names_list)
145145

146146

147+
# Many more test cases for the BatchSpanProcessor exist under
148+
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
149+
# Important: make sure to call .shutdown() on the BatchSpanProcessor
150+
# before the end of the test, otherwise the worker thread will continue
151+
# to run after the end of the test.
147152
class TestBatchSpanProcessor(unittest.TestCase):
148-
# Many more test cases for the BatchSpanProcessor exist under
149-
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
150153
@mock.patch.dict(
151154
"os.environ",
152155
{
@@ -173,6 +176,7 @@ def test_args_env_var(self):
173176
self.assertEqual(
174177
batch_span_processor._batch_processor._export_timeout_millis, 4
175178
)
179+
batch_span_processor.shutdown()
176180

177181
def test_args_env_var_defaults(self):
178182
batch_span_processor = export.BatchSpanProcessor(
@@ -191,6 +195,7 @@ def test_args_env_var_defaults(self):
191195
self.assertEqual(
192196
batch_span_processor._batch_processor._export_timeout_millis, 30000
193197
)
198+
batch_span_processor.shutdown()
194199

195200
@mock.patch.dict(
196201
"os.environ",
@@ -220,6 +225,7 @@ def test_args_env_var_value_error(self):
220225
self.assertEqual(
221226
batch_span_processor._batch_processor._export_timeout_millis, 30000
222227
)
228+
batch_span_processor.shutdown()
223229

224230
def test_on_start_accepts_parent_context(self):
225231
# pylint: disable=no-self-use

0 commit comments

Comments
 (0)