Skip to content

Commit 6e1149f

Browse files
committed
Update test to use BatchLogRecordProcessor instead of BatchProcessor
1 parent d8a2bea commit 6e1149f

File tree

1 file changed

+69
-61
lines changed

1 file changed

+69
-61
lines changed

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 69 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
from concurrent.futures import ThreadPoolExecutor
2121
from unittest.mock import Mock
2222

23+
import pytest
24+
2325
from opentelemetry.sdk._logs import (
2426
LogData,
2527
LogRecord,
2628
)
2729
from opentelemetry.sdk._logs.export import (
28-
InMemoryLogExporter,
30+
BatchLogRecordProcessor,
2931
)
30-
from opentelemetry.sdk._shared_internal import BatchProcessor
3132
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
3233

3334
EMPTY_LOG = LogData(
@@ -36,145 +37,152 @@
3637
)
3738

3839

39-
class TestBatchProcessor(unittest.TestCase):
40-
def test_logs_exported_once_batch_size_reached(self):
40+
# BatchLogRecodpRocessor initializes / uses BatchProcessor.
41+
@pytest.mark.parametrize(
42+
"batch_processor_class,telemetry", [(BatchLogRecordProcessor, EMPTY_LOG)]
43+
)
44+
class TestBatchProcessor:
45+
def test_telemetry_exported_once_batch_size_reached(
46+
self, batch_processor_class, telemetry
47+
):
4148
exporter = Mock()
42-
log_record_processor = BatchProcessor(
49+
batch_processor = batch_processor_class(
4350
exporter=exporter,
4451
max_queue_size=15,
4552
max_export_batch_size=15,
4653
# Will not reach this during the test, this sleep should be interrupted when batch size is reached.
4754
schedule_delay_millis=30000,
48-
exporting="Log",
4955
export_timeout_millis=500,
5056
)
5157
before_export = time.time_ns()
5258
for _ in range(15):
53-
log_record_processor.emit(EMPTY_LOG)
59+
batch_processor.emit(telemetry)
5460
# Wait a bit for the worker thread to wake up and call export.
5561
time.sleep(0.1)
5662
exporter.export.assert_called_once()
5763
after_export = time.time_ns()
5864
# Shows the worker's 30 second sleep was interrupted within a second.
59-
self.assertLess(after_export - before_export, 1e9)
65+
assert after_export - before_export < 1e9
6066

6167
# pylint: disable=no-self-use
62-
def test_logs_exported_once_schedule_delay_reached(self):
68+
def test_telemetry_exported_once_schedule_delay_reached(
69+
self, batch_processor_class, telemetry
70+
):
6371
exporter = Mock()
64-
log_record_processor = BatchProcessor(
72+
batch_processor = batch_processor_class(
6573
exporter=exporter,
6674
max_queue_size=15,
6775
max_export_batch_size=15,
6876
schedule_delay_millis=100,
69-
exporting="Log",
7077
export_timeout_millis=500,
7178
)
72-
log_record_processor.emit(EMPTY_LOG)
79+
batch_processor.emit(telemetry)
7380
time.sleep(0.2)
74-
exporter.export.assert_called_once_with([EMPTY_LOG])
81+
exporter.export.assert_called_once_with([telemetry])
7582

76-
def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
83+
def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
84+
self, batch_processor_class, telemetry, caplog
85+
):
7786
exporter = Mock()
78-
log_record_processor = BatchProcessor(
87+
batch_processor = batch_processor_class(
7988
exporter=exporter,
8089
# Neither of these thresholds should be hit before test ends.
8190
max_queue_size=15,
8291
max_export_batch_size=15,
8392
schedule_delay_millis=30000,
84-
exporting="Log",
8593
export_timeout_millis=500,
8694
)
8795
# This log should be flushed because it was written before shutdown.
88-
log_record_processor.emit(EMPTY_LOG)
89-
log_record_processor.shutdown()
90-
exporter.export.assert_called_once_with([EMPTY_LOG])
91-
self.assertTrue(exporter._stopped)
92-
93-
with self.assertLogs(level="INFO") as log:
94-
# This log should not be flushed.
95-
log_record_processor.emit(EMPTY_LOG)
96-
self.assertEqual(len(log.output), 1)
97-
self.assertEqual(len(log.records), 1)
98-
self.assertIn("Shutdown called, ignoring Log.", log.output[0])
96+
batch_processor.emit(telemetry)
97+
batch_processor.shutdown()
98+
exporter.export.assert_called_once_with([telemetry])
99+
assert batch_processor._shutdown is True
100+
101+
# This should not be flushed.
102+
batch_processor.emit(telemetry)
103+
assert len(caplog.records) == 1
104+
assert "Shutdown called, ignoring" in caplog.text
99105
exporter.export.assert_called_once()
100106

101107
# pylint: disable=no-self-use
102-
def test_force_flush_flushes_logs(self):
108+
def test_force_flush_flushes_telemetry(
109+
self, batch_processor_class, telemetry
110+
):
103111
exporter = Mock()
104-
log_record_processor = BatchProcessor(
112+
batch_processor = batch_processor_class(
105113
exporter=exporter,
106114
# Neither of these thresholds should be hit before test ends.
107115
max_queue_size=15,
108116
max_export_batch_size=15,
109117
schedule_delay_millis=30000,
110-
exporting="Log",
111118
export_timeout_millis=500,
112119
)
113120
for _ in range(10):
114-
log_record_processor.emit(EMPTY_LOG)
115-
log_record_processor.force_flush()
116-
exporter.export.assert_called_once_with([EMPTY_LOG for _ in range(10)])
121+
batch_processor.emit(telemetry)
122+
batch_processor.force_flush()
123+
exporter.export.assert_called_once_with([telemetry for _ in range(10)])
117124

118-
def test_with_multiple_threads(self):
119-
exporter = InMemoryLogExporter()
120-
log_record_processor = BatchProcessor(
125+
def test_with_multiple_threads(self, batch_processor_class, telemetry):
126+
exporter = Mock()
127+
batch_processor = batch_processor_class(
121128
exporter=exporter,
122129
max_queue_size=3000,
123130
max_export_batch_size=1000,
124131
schedule_delay_millis=30000,
125-
exporting="Log",
126132
export_timeout_millis=500,
127133
)
128134

129-
def bulk_log_and_flush(num_logs):
130-
for _ in range(num_logs):
131-
log_record_processor.emit(EMPTY_LOG)
132-
log_record_processor.force_flush()
135+
def bulk_emit_and_flush(num_emit):
136+
for _ in range(num_emit):
137+
batch_processor.emit(telemetry)
138+
batch_processor.force_flush()
133139

134140
with ThreadPoolExecutor(max_workers=69) as executor:
135141
for idx in range(69):
136-
executor.submit(bulk_log_and_flush, idx + 1)
142+
executor.submit(bulk_emit_and_flush, idx + 1)
137143

138144
executor.shutdown()
139145

140-
finished_logs = exporter.get_finished_logs()
141-
self.assertEqual(len(finished_logs), 2415)
146+
# 69 force flush calls, should result in 69 export calls.
147+
assert exporter.export.call_count == 69
142148

143149
@unittest.skipUnless(
144150
hasattr(os, "fork"),
145151
"needs *nix",
146152
)
147-
def test_batch_log_record_processor_fork(self):
148-
exporter = InMemoryLogExporter()
149-
log_record_processor = BatchProcessor(
153+
def test_batch_telemetry_record_processor_fork(
154+
self, batch_processor_class, telemetry
155+
):
156+
exporter = Mock()
157+
batch_processor = batch_processor_class(
150158
exporter,
151-
max_queue_size=100,
152-
max_export_batch_size=64,
159+
max_queue_size=200,
160+
max_export_batch_size=10,
153161
schedule_delay_millis=30000,
154-
exporting="Log",
155162
export_timeout_millis=500,
156163
)
157-
# These logs should be flushed only from the parent process.
164+
# This telemetry should be flushed only from the parent process.
158165
# _at_fork_reinit should be called in the child process, to
159-
# clear these logs in the child process.
160-
for _ in range(10):
161-
log_record_processor.emit(EMPTY_LOG)
166+
# clear the logs/spans in the child process.
167+
for _ in range(9):
168+
batch_processor.emit(telemetry)
162169

163170
multiprocessing.set_start_method("fork")
164171

165172
def child(conn):
166173
for _ in range(100):
167-
log_record_processor.emit(EMPTY_LOG)
168-
log_record_processor.force_flush()
174+
batch_processor.emit(telemetry)
175+
batch_processor.force_flush()
169176

170-
logs = exporter.get_finished_logs()
171-
conn.send(len(logs) == 100)
177+
# Expect force flush to export 10 batches of max export batch size (10)
178+
conn.send(exporter.export.call_count == 10)
172179
conn.close()
173180

174181
parent_conn, child_conn = multiprocessing.Pipe()
175182
process = multiprocessing.Process(target=child, args=(child_conn,))
176183
process.start()
177-
self.assertTrue(parent_conn.recv())
184+
assert parent_conn.recv() is True
178185
process.join()
179-
log_record_processor.force_flush()
180-
self.assertTrue(len(exporter.get_finished_logs()) == 10)
186+
batch_processor.force_flush()
187+
# Single export for the telemetry we emitted at the start of the test.
188+
assert exporter.export.call_count == 1

0 commit comments

Comments
 (0)