Skip to content

Commit 29e4bab

Browse files
authored
Make batch processor fork aware and reinit when needed (#2242)
Since 3.7 python provides register_at_fork which can be used to make our batch processor fork-safe.
1 parent 41c5f99 commit 29e4bab

File tree

5 files changed

+144
-2
lines changed

5 files changed

+144
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1616
([#2153](https://github.com/open-telemetry/opentelemetry-python/pull/2153))
1717
- Add metrics API
1818
([#1887](https://github.com/open-telemetry/opentelemetry-python/pull/1887))
19+
- Make batch processor fork aware and reinit when needed
20+
([#2242](https://github.com/open-telemetry/opentelemetry-python/pull/2242))
1921

2022
## [1.6.2-0.25b2](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.2-0.25b2) - 2021-10-19
2123

opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import collections
1717
import enum
1818
import logging
19+
import os
1920
import sys
2021
import threading
2122
from os import linesep
@@ -154,6 +155,17 @@ def __init__(
154155
None
155156
] * self._max_export_batch_size # type: List[Optional[LogData]]
156157
self._worker_thread.start()
158+
# Only available in *nix since py37.
159+
if hasattr(os, "register_at_fork"):
160+
os.register_at_fork(
161+
after_in_child=self._at_fork_reinit
162+
) # pylint: disable=protected-access
163+
164+
def _at_fork_reinit(self):
165+
self._condition = threading.Condition(threading.Lock())
166+
self._queue.clear()
167+
self._worker_thread = threading.Thread(target=self.worker, daemon=True)
168+
self._worker_thread.start()
157169

158170
def worker(self):
159171
timeout = self._schedule_delay_millis / 1e3

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import collections
1616
import logging
17+
import os
1718
import sys
1819
import threading
1920
import typing
@@ -197,6 +198,11 @@ def __init__(
197198
None
198199
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
199200
self.worker_thread.start()
201+
# Only available in *nix since py37.
202+
if hasattr(os, "register_at_fork"):
203+
os.register_at_fork(
204+
after_in_child=self._at_fork_reinit
205+
) # pylint: disable=protected-access
200206

201207
def on_start(
202208
self, span: Span, parent_context: typing.Optional[Context] = None
@@ -220,6 +226,17 @@ def on_end(self, span: ReadableSpan) -> None:
220226
with self.condition:
221227
self.condition.notify()
222228

229+
def _at_fork_reinit(self):
230+
self.condition = threading.Condition(threading.Lock())
231+
self.queue.clear()
232+
233+
# worker_thread is local to a process, only the thread that issued fork continues
234+
# to exist. A new worker thread must be started in child process.
235+
self.worker_thread = threading.Thread(
236+
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
237+
)
238+
self.worker_thread.start()
239+
223240
def worker(self):
224241
timeout = self.schedule_delay_millis / 1e3
225242
flush_request = None # type: typing.Optional[_FlushRequest]

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
# pylint: disable=protected-access
1616
import logging
17+
import multiprocessing
1718
import os
19+
import sys
1820
import time
1921
import unittest
2022
from concurrent.futures import ThreadPoolExecutor
@@ -38,6 +40,7 @@
3840
from opentelemetry.sdk._logs.severity import SeverityNumber
3941
from opentelemetry.sdk.resources import Resource as SDKResource
4042
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
43+
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
4144
from opentelemetry.trace import TraceFlags
4245
from opentelemetry.trace.span import INVALID_SPAN_CONTEXT
4346

@@ -158,7 +161,7 @@ def test_simple_log_processor_shutdown(self):
158161
self.assertEqual(len(finished_logs), 0)
159162

160163

161-
class TestBatchLogProcessor(unittest.TestCase):
164+
class TestBatchLogProcessor(ConcurrencyTestBase):
162165
def test_emit_call_log_record(self):
163166
exporter = InMemoryLogExporter()
164167
log_processor = Mock(wraps=BatchLogProcessor(exporter))
@@ -269,6 +272,54 @@ def bulk_log_and_flush(num_logs):
269272
finished_logs = exporter.get_finished_logs()
270273
self.assertEqual(len(finished_logs), 2415)
271274

275+
@unittest.skipUnless(
276+
hasattr(os, "fork") and sys.version_info >= (3, 7),
277+
"needs *nix and minor version 7 or later",
278+
)
279+
def test_batch_log_processor_fork(self):
280+
# pylint: disable=invalid-name
281+
exporter = InMemoryLogExporter()
282+
log_processor = BatchLogProcessor(
283+
exporter,
284+
max_export_batch_size=64,
285+
schedule_delay_millis=10,
286+
)
287+
provider = LogEmitterProvider()
288+
provider.add_log_processor(log_processor)
289+
290+
emitter = provider.get_log_emitter(__name__)
291+
logger = logging.getLogger("test-fork")
292+
logger.addHandler(OTLPHandler(log_emitter=emitter))
293+
294+
logger.critical("yolo")
295+
time.sleep(0.5) # give some time for the exporter to upload
296+
297+
self.assertTrue(log_processor.force_flush())
298+
self.assertEqual(len(exporter.get_finished_logs()), 1)
299+
exporter.clear()
300+
301+
multiprocessing.set_start_method("fork")
302+
303+
def child(conn):
304+
def _target():
305+
logger.critical("Critical message child")
306+
307+
self.run_with_many_threads(_target, 100)
308+
309+
time.sleep(0.5)
310+
311+
logs = exporter.get_finished_logs()
312+
conn.send(len(logs) == 100)
313+
conn.close()
314+
315+
parent_conn, child_conn = multiprocessing.Pipe()
316+
p = multiprocessing.Process(target=child, args=(child_conn,))
317+
p.start()
318+
self.assertTrue(parent_conn.recv())
319+
p.join()
320+
321+
log_processor.shutdown()
322+
272323

273324
class TestConsoleExporter(unittest.TestCase):
274325
def test_export(self): # pylint: disable=no-self-use

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import multiprocessing
1516
import os
17+
import sys
1618
import threading
1719
import time
1820
import unittest
@@ -30,6 +32,10 @@
3032
OTEL_BSP_SCHEDULE_DELAY,
3133
)
3234
from opentelemetry.sdk.trace import export
35+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
36+
InMemorySpanExporter,
37+
)
38+
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
3339

3440

3541
class MySpanExporter(export.SpanExporter):
@@ -157,7 +163,7 @@ def _create_start_and_end_span(name, span_processor):
157163
span.end()
158164

159165

160-
class TestBatchSpanProcessor(unittest.TestCase):
166+
class TestBatchSpanProcessor(ConcurrencyTestBase):
161167
@mock.patch.dict(
162168
"os.environ",
163169
{
@@ -356,6 +362,60 @@ def test_batch_span_processor_not_sampled(self):
356362
self.assertEqual(len(spans_names_list), 0)
357363
span_processor.shutdown()
358364

365+
def _check_fork_trace(self, exporter, expected):
366+
time.sleep(0.5) # give some time for the exporter to upload spans
367+
spans = exporter.get_finished_spans()
368+
for span in spans:
369+
self.assertIn(span.name, expected)
370+
371+
@unittest.skipUnless(
372+
hasattr(os, "fork") and sys.version_info >= (3, 7),
373+
"needs *nix and minor version 7 or later",
374+
)
375+
def test_batch_span_processor_fork(self):
376+
# pylint: disable=invalid-name
377+
tracer_provider = trace.TracerProvider()
378+
tracer = tracer_provider.get_tracer(__name__)
379+
380+
exporter = InMemorySpanExporter()
381+
span_processor = export.BatchSpanProcessor(
382+
exporter,
383+
max_queue_size=256,
384+
max_export_batch_size=64,
385+
schedule_delay_millis=10,
386+
)
387+
tracer_provider.add_span_processor(span_processor)
388+
with tracer.start_as_current_span("foo"):
389+
pass
390+
time.sleep(0.5) # give some time for the exporter to upload spans
391+
392+
self.assertTrue(span_processor.force_flush())
393+
self.assertEqual(len(exporter.get_finished_spans()), 1)
394+
exporter.clear()
395+
396+
def child(conn):
397+
def _target():
398+
with tracer.start_as_current_span("span") as s:
399+
s.set_attribute("i", "1")
400+
with tracer.start_as_current_span("temp"):
401+
pass
402+
403+
self.run_with_many_threads(_target, 100)
404+
405+
time.sleep(0.5)
406+
407+
spans = exporter.get_finished_spans()
408+
conn.send(len(spans) == 200)
409+
conn.close()
410+
411+
parent_conn, child_conn = multiprocessing.Pipe()
412+
p = multiprocessing.Process(target=child, args=(child_conn,))
413+
p.start()
414+
self.assertTrue(parent_conn.recv())
415+
p.join()
416+
417+
span_processor.shutdown()
418+
359419
def test_batch_span_processor_scheduled_delay(self):
360420
"""Test that spans are exported each schedule_delay_millis"""
361421
spans_names_list = []

0 commit comments

Comments
 (0)