diff --git a/opentelemetry-sdk/test-requirements.txt b/opentelemetry-sdk/test-requirements.txt index 6c260c1a3ed..859a2196e1a 100644 --- a/opentelemetry-sdk/test-requirements.txt +++ b/opentelemetry-sdk/test-requirements.txt @@ -14,4 +14,4 @@ zipp==3.19.2 -e tests/opentelemetry-test-utils -e opentelemetry-api -e opentelemetry-semantic-conventions --e opentelemetry-sdk +-e opentelemetry-sdk \ No newline at end of file diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 30a8c724b26..02741acfd74 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -17,6 +17,7 @@ import os import time import unittest +from concurrent.futures import ThreadPoolExecutor from sys import version_info from unittest.mock import Mock, patch @@ -331,9 +332,12 @@ def test_simple_log_record_processor_different_msg_types_with_formatter( self.assertEqual(expected, emitted) +# Many more test cases for the BatchLogRecordProcessor exist under +# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py. +# Important: make sure to call .shutdown() on the BatchLogRecordProcessor +# before the end of the test, otherwise the worker thread will continue +# to run after the end of the test. class TestBatchLogRecordProcessor(unittest.TestCase): - # Many more test cases for the BatchLogRecordProcessor exist under - # opentelemetry-sdk/tests/shared_internal/test_batch_processor.py. def test_emit_call_log_record(self): exporter = InMemoryLogExporter() log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter)) @@ -346,6 +350,34 @@ def test_emit_call_log_record(self): logger.error("error") self.assertEqual(log_record_processor.emit.call_count, 1) + log_record_processor.shutdown() + + def test_with_multiple_threads(self): # pylint: disable=no-self-use + exporter = InMemoryLogExporter() + batch_processor = BatchLogRecordProcessor( + exporter, + max_queue_size=3000, + max_export_batch_size=50, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + + def bulk_emit(num_emit): + for _ in range(num_emit): + batch_processor.emit(EMPTY_LOG) + + total_expected_logs = 0 + with ThreadPoolExecutor(max_workers=69) as executor: + for num_logs_to_emit in range(1, 70): + executor.submit(bulk_emit, num_logs_to_emit) + total_expected_logs += num_logs_to_emit + + executor.shutdown() + + batch_processor.shutdown() + # Wait a bit for logs to flush. + time.sleep(2) + assert len(exporter.get_finished_logs()) == total_expected_logs @mark.skipif( version_info < (3, 10), @@ -404,6 +436,7 @@ def test_args(self): self.assertEqual( log_record_processor._batch_processor._export_timeout_millis, 15000 ) + log_record_processor.shutdown() @patch.dict( "os.environ", @@ -432,6 +465,7 @@ def test_env_vars(self): self.assertEqual( log_record_processor._batch_processor._export_timeout_millis, 15000 ) + log_record_processor.shutdown() def test_args_defaults(self): exporter = InMemoryLogExporter() @@ -451,6 +485,7 @@ def test_args_defaults(self): self.assertEqual( log_record_processor._batch_processor._export_timeout_millis, 30000 ) + log_record_processor.shutdown() @patch.dict( "os.environ", @@ -481,6 +516,7 @@ def test_args_env_var_value_error(self): self.assertEqual( log_record_processor._batch_processor._export_timeout_millis, 30000 ) + log_record_processor.shutdown() def test_args_none_defaults(self): exporter = InMemoryLogExporter() @@ -506,6 +542,7 @@ def test_args_none_defaults(self): self.assertEqual( log_record_processor._batch_processor._export_timeout_millis, 30000 ) + log_record_processor.shutdown() def test_validation_negative_max_queue_size(self): exporter = InMemoryLogExporter() diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index 37d1e11a27e..4888d81779d 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -19,13 +19,10 @@ import time import unittest import weakref -from concurrent.futures import ThreadPoolExecutor from platform import system -from sys import version_info from unittest.mock import Mock import pytest -from pytest import mark from opentelemetry.sdk._logs import ( LogData, @@ -53,6 +50,8 @@ # BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor. +# Important: make sure to call .shutdown() before the end of the test, +# otherwise the worker thread will continue to run after the end of the test. @pytest.mark.parametrize( "batch_processor_class,telemetry", [(BatchLogRecordProcessor, EMPTY_LOG), (BatchSpanProcessor, BASIC_SPAN)], @@ -80,6 +79,7 @@ def test_telemetry_exported_once_batch_size_reached( after_export = time.time_ns() # Shows the worker's 30 second sleep was interrupted within a second. assert after_export - before_export < 1e9 + batch_processor.shutdown() # pylint: disable=no-self-use def test_telemetry_exported_once_schedule_delay_reached( @@ -96,6 +96,7 @@ def test_telemetry_exported_once_schedule_delay_reached( batch_processor._batch_processor.emit(telemetry) time.sleep(0.2) exporter.export.assert_called_once_with([telemetry]) + batch_processor.shutdown() def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown( self, batch_processor_class, telemetry @@ -136,33 +137,7 @@ def test_force_flush_flushes_telemetry( batch_processor._batch_processor.emit(telemetry) batch_processor.force_flush() exporter.export.assert_called_once_with([telemetry for _ in range(10)]) - - @mark.skipif( - system() == "Windows" or version_info < (3, 9), - reason="This test randomly fails on windows and python 3.8.", - ) - def test_with_multiple_threads(self, batch_processor_class, telemetry): - exporter = Mock() - batch_processor = batch_processor_class( - exporter, - max_queue_size=3000, - max_export_batch_size=1000, - schedule_delay_millis=30000, - export_timeout_millis=500, - ) - - def bulk_emit_and_flush(num_emit): - for _ in range(num_emit): - batch_processor._batch_processor.emit(telemetry) - batch_processor.force_flush() - - with ThreadPoolExecutor(max_workers=69) as executor: - for idx in range(69): - executor.submit(bulk_emit_and_flush, idx + 1) - - executor.shutdown() - # 69 calls to force flush. - assert exporter.export.call_count == 69 + batch_processor.shutdown() @unittest.skipUnless( hasattr(os, "fork"), @@ -202,6 +177,7 @@ def child(conn): batch_processor.force_flush() # Single export for the telemetry we emitted at the start of the test. assert exporter.export.call_count == 1 + batch_processor.shutdown() def test_record_processor_is_garbage_collected( self, batch_processor_class, telemetry diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 77e28f9f989..e94c3e67680 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -144,9 +144,12 @@ def test_simple_span_processor_not_sampled(self): self.assertListEqual([], spans_names_list) +# Many more test cases for the BatchSpanProcessor exist under +# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py. +# Important: make sure to call .shutdown() on the BatchSpanProcessor +# before the end of the test, otherwise the worker thread will continue +# to run after the end of the test. class TestBatchSpanProcessor(unittest.TestCase): - # Many more test cases for the BatchSpanProcessor exist under - # opentelemetry-sdk/tests/shared_internal/test_batch_processor.py. @mock.patch.dict( "os.environ", { @@ -173,6 +176,7 @@ def test_args_env_var(self): self.assertEqual( batch_span_processor._batch_processor._export_timeout_millis, 4 ) + batch_span_processor.shutdown() def test_args_env_var_defaults(self): batch_span_processor = export.BatchSpanProcessor( @@ -191,6 +195,7 @@ def test_args_env_var_defaults(self): self.assertEqual( batch_span_processor._batch_processor._export_timeout_millis, 30000 ) + batch_span_processor.shutdown() @mock.patch.dict( "os.environ", @@ -220,6 +225,7 @@ def test_args_env_var_value_error(self): self.assertEqual( batch_span_processor._batch_processor._export_timeout_millis, 30000 ) + batch_span_processor.shutdown() def test_on_start_accepts_parent_context(self): # pylint: disable=no-self-use