Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-sdk/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ zipp==3.19.2
-e tests/opentelemetry-test-utils
-e opentelemetry-api
-e opentelemetry-semantic-conventions
-e opentelemetry-sdk
-e opentelemetry-sdk
41 changes: 39 additions & 2 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
from sys import version_info
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -331,9 +332,12 @@ def test_simple_log_record_processor_different_msg_types_with_formatter(
self.assertEqual(expected, emitted)


# Many more test cases for the BatchLogRecordProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
# Important: make sure to call .shutdown() on the BatchLogRecordProcessor
# before the end of the test, otherwise the worker thread will continue
# to run after the end of the test.
class TestBatchLogRecordProcessor(unittest.TestCase):
# Many more test cases for the BatchLogRecordProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
def test_emit_call_log_record(self):
exporter = InMemoryLogExporter()
log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter))
Expand All @@ -346,6 +350,34 @@ def test_emit_call_log_record(self):

logger.error("error")
self.assertEqual(log_record_processor.emit.call_count, 1)
log_record_processor.shutdown()

def test_with_multiple_threads(self): # pylint: disable=no-self-use
exporter = InMemoryLogExporter()
batch_processor = BatchLogRecordProcessor(
exporter,
max_queue_size=3000,
max_export_batch_size=50,
schedule_delay_millis=30000,
export_timeout_millis=500,
)

def bulk_emit(num_emit):
for _ in range(num_emit):
batch_processor.emit(EMPTY_LOG)

total_expected_logs = 0
with ThreadPoolExecutor(max_workers=69) as executor:
for num_logs_to_emit in range(1, 70):
executor.submit(bulk_emit, num_logs_to_emit)
total_expected_logs += num_logs_to_emit

executor.shutdown()

batch_processor.shutdown()
# Wait a bit for logs to flush.
time.sleep(2)
assert len(exporter.get_finished_logs()) == total_expected_logs

@mark.skipif(
version_info < (3, 10),
Expand Down Expand Up @@ -404,6 +436,7 @@ def test_args(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 15000
)
log_record_processor.shutdown()

@patch.dict(
"os.environ",
Expand Down Expand Up @@ -432,6 +465,7 @@ def test_env_vars(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 15000
)
log_record_processor.shutdown()

def test_args_defaults(self):
exporter = InMemoryLogExporter()
Expand All @@ -451,6 +485,7 @@ def test_args_defaults(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 30000
)
log_record_processor.shutdown()

@patch.dict(
"os.environ",
Expand Down Expand Up @@ -481,6 +516,7 @@ def test_args_env_var_value_error(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 30000
)
log_record_processor.shutdown()

def test_args_none_defaults(self):
exporter = InMemoryLogExporter()
Expand All @@ -506,6 +542,7 @@ def test_args_none_defaults(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 30000
)
log_record_processor.shutdown()

def test_validation_negative_max_queue_size(self):
exporter = InMemoryLogExporter()
Expand Down
36 changes: 6 additions & 30 deletions opentelemetry-sdk/tests/shared_internal/test_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import time
import unittest
import weakref
from concurrent.futures import ThreadPoolExecutor
from platform import system
from sys import version_info
from unittest.mock import Mock

import pytest
from pytest import mark

from opentelemetry.sdk._logs import (
LogData,
Expand Down Expand Up @@ -53,6 +50,8 @@


# BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor.
# Important: make sure to call .shutdown() before the end of the test,
# otherwise the worker thread will continue to run after the end of the test.
@pytest.mark.parametrize(
"batch_processor_class,telemetry",
[(BatchLogRecordProcessor, EMPTY_LOG), (BatchSpanProcessor, BASIC_SPAN)],
Expand Down Expand Up @@ -80,6 +79,7 @@ def test_telemetry_exported_once_batch_size_reached(
after_export = time.time_ns()
# Shows the worker's 30 second sleep was interrupted within a second.
assert after_export - before_export < 1e9
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_telemetry_exported_once_schedule_delay_reached(
Expand All @@ -96,6 +96,7 @@ def test_telemetry_exported_once_schedule_delay_reached(
batch_processor._batch_processor.emit(telemetry)
time.sleep(0.2)
exporter.export.assert_called_once_with([telemetry])
batch_processor.shutdown()

def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
self, batch_processor_class, telemetry
Expand Down Expand Up @@ -136,33 +137,7 @@ def test_force_flush_flushes_telemetry(
batch_processor._batch_processor.emit(telemetry)
batch_processor.force_flush()
exporter.export.assert_called_once_with([telemetry for _ in range(10)])

@mark.skipif(
system() == "Windows" or version_info < (3, 9),
reason="This test randomly fails on windows and python 3.8.",
)
def test_with_multiple_threads(self, batch_processor_class, telemetry):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=3000,
max_export_batch_size=1000,
schedule_delay_millis=30000,
export_timeout_millis=500,
)

def bulk_emit_and_flush(num_emit):
for _ in range(num_emit):
batch_processor._batch_processor.emit(telemetry)
batch_processor.force_flush()

with ThreadPoolExecutor(max_workers=69) as executor:
for idx in range(69):
executor.submit(bulk_emit_and_flush, idx + 1)

executor.shutdown()
# 69 calls to force flush.
assert exporter.export.call_count == 69
batch_processor.shutdown()

@unittest.skipUnless(
hasattr(os, "fork"),
Expand Down Expand Up @@ -202,6 +177,7 @@ def child(conn):
batch_processor.force_flush()
# Single export for the telemetry we emitted at the start of the test.
assert exporter.export.call_count == 1
batch_processor.shutdown()

def test_record_processor_is_garbage_collected(
self, batch_processor_class, telemetry
Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ def test_simple_span_processor_not_sampled(self):
self.assertListEqual([], spans_names_list)


# Many more test cases for the BatchSpanProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
# Important: make sure to call .shutdown() on the BatchSpanProcessor
# before the end of the test, otherwise the worker thread will continue
# to run after the end of the test.
class TestBatchSpanProcessor(unittest.TestCase):
# Many more test cases for the BatchSpanProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
@mock.patch.dict(
"os.environ",
{
Expand All @@ -173,6 +176,7 @@ def test_args_env_var(self):
self.assertEqual(
batch_span_processor._batch_processor._export_timeout_millis, 4
)
batch_span_processor.shutdown()

def test_args_env_var_defaults(self):
batch_span_processor = export.BatchSpanProcessor(
Expand All @@ -191,6 +195,7 @@ def test_args_env_var_defaults(self):
self.assertEqual(
batch_span_processor._batch_processor._export_timeout_millis, 30000
)
batch_span_processor.shutdown()

@mock.patch.dict(
"os.environ",
Expand Down Expand Up @@ -220,6 +225,7 @@ def test_args_env_var_value_error(self):
self.assertEqual(
batch_span_processor._batch_processor._export_timeout_millis, 30000
)
batch_span_processor.shutdown()

def test_on_start_accepts_parent_context(self):
# pylint: disable=no-self-use
Expand Down
Loading