Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-sdk/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ zipp==3.19.2
-e tests/opentelemetry-test-utils
-e opentelemetry-api
-e opentelemetry-semantic-conventions
-e opentelemetry-sdk
-e opentelemetry-sdk
41 changes: 39 additions & 2 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
from sys import version_info
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -331,9 +332,12 @@ def test_simple_log_record_processor_different_msg_types_with_formatter(
self.assertEqual(expected, emitted)


# Many more test cases for the BatchLogRecordProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
# Important: make sure to call .shutdown() on the BatchLogRecordProcessor
# before the end of the test, otherwise the worker thread will continue
# to run after the end of the test.
class TestBatchLogRecordProcessor(unittest.TestCase):
# Many more test cases for the BatchLogRecordProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
def test_emit_call_log_record(self):
exporter = InMemoryLogExporter()
log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter))
Expand All @@ -346,6 +350,34 @@ def test_emit_call_log_record(self):

logger.error("error")
self.assertEqual(log_record_processor.emit.call_count, 1)
log_record_processor.shutdown()

def test_with_multiple_threads(self): # pylint: disable=no-self-use
exporter = InMemoryLogExporter()
batch_processor = BatchLogRecordProcessor(
exporter,
max_queue_size=3000,
max_export_batch_size=50,
schedule_delay_millis=30000,
export_timeout_millis=500,
)

def bulk_emit(num_emit):
for _ in range(num_emit):
batch_processor.emit(EMPTY_LOG)

total_expected_logs = 0
with ThreadPoolExecutor(max_workers=69) as executor:
for num_logs_to_emit in range(1, 70):
executor.submit(bulk_emit, num_logs_to_emit)
total_expected_logs += num_logs_to_emit

executor.shutdown()

batch_processor.shutdown()
# Wait a bit for logs to flush.
time.sleep(2)
assert len(exporter.get_finished_logs()) == total_expected_logs

@mark.skipif(
version_info < (3, 10),
Expand Down Expand Up @@ -404,6 +436,7 @@ def test_args(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 15000
)
log_record_processor.shutdown()

@patch.dict(
"os.environ",
Expand Down Expand Up @@ -432,6 +465,7 @@ def test_env_vars(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 15000
)
log_record_processor.shutdown()

def test_args_defaults(self):
exporter = InMemoryLogExporter()
Expand All @@ -451,6 +485,7 @@ def test_args_defaults(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 30000
)
log_record_processor.shutdown()

@patch.dict(
"os.environ",
Expand Down Expand Up @@ -481,6 +516,7 @@ def test_args_env_var_value_error(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 30000
)
log_record_processor.shutdown()

def test_args_none_defaults(self):
exporter = InMemoryLogExporter()
Expand All @@ -506,6 +542,7 @@ def test_args_none_defaults(self):
self.assertEqual(
log_record_processor._batch_processor._export_timeout_millis, 30000
)
log_record_processor.shutdown()

def test_validation_negative_max_queue_size(self):
exporter = InMemoryLogExporter()
Expand Down
36 changes: 6 additions & 30 deletions opentelemetry-sdk/tests/shared_internal/test_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import time
import unittest
import weakref
from concurrent.futures import ThreadPoolExecutor
from platform import system
from sys import version_info
from unittest.mock import Mock

import pytest
from pytest import mark

from opentelemetry.sdk._logs import (
LogData,
Expand Down Expand Up @@ -53,6 +50,8 @@


# BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor.
# Important: make sure to call .shutdown() before the end of the test,
# otherwise the worker thread will continue to run after the end of the test.
@pytest.mark.parametrize(
"batch_processor_class,telemetry",
[(BatchLogRecordProcessor, EMPTY_LOG), (BatchSpanProcessor, BASIC_SPAN)],
Expand Down Expand Up @@ -80,6 +79,7 @@ def test_telemetry_exported_once_batch_size_reached(
after_export = time.time_ns()
# Shows the worker's 30 second sleep was interrupted within a second.
assert after_export - before_export < 1e9
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_telemetry_exported_once_schedule_delay_reached(
Expand All @@ -96,6 +96,7 @@ def test_telemetry_exported_once_schedule_delay_reached(
batch_processor._batch_processor.emit(telemetry)
time.sleep(0.2)
exporter.export.assert_called_once_with([telemetry])
batch_processor.shutdown()

def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
self, batch_processor_class, telemetry
Expand Down Expand Up @@ -136,33 +137,7 @@ def test_force_flush_flushes_telemetry(
batch_processor._batch_processor.emit(telemetry)
batch_processor.force_flush()
exporter.export.assert_called_once_with([telemetry for _ in range(10)])

@mark.skipif(
system() == "Windows" or version_info < (3, 9),
reason="This test randomly fails on windows and python 3.8.",
)
def test_with_multiple_threads(self, batch_processor_class, telemetry):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=3000,
max_export_batch_size=1000,
schedule_delay_millis=30000,
export_timeout_millis=500,
)

def bulk_emit_and_flush(num_emit):
for _ in range(num_emit):
batch_processor._batch_processor.emit(telemetry)
batch_processor.force_flush()

with ThreadPoolExecutor(max_workers=69) as executor:
for idx in range(69):
executor.submit(bulk_emit_and_flush, idx + 1)

executor.shutdown()
# 69 calls to force flush.
assert exporter.export.call_count == 69
batch_processor.shutdown()

@unittest.skipUnless(
hasattr(os, "fork"),
Expand Down Expand Up @@ -202,6 +177,7 @@ def child(conn):
batch_processor.force_flush()
# Single export for the telemetry we emitted at the start of the test.
assert exporter.export.call_count == 1
batch_processor.shutdown()

def test_record_processor_is_garbage_collected(
self, batch_processor_class, telemetry
Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ def test_simple_span_processor_not_sampled(self):
self.assertListEqual([], spans_names_list)


# Many more test cases for the BatchSpanProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
# Important: make sure to call .shutdown() on the BatchSpanProcessor
# before the end of the test, otherwise the worker thread will continue
# to run after the end of the test.
class TestBatchSpanProcessor(unittest.TestCase):
# Many more test cases for the BatchSpanProcessor exist under
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
@mock.patch.dict(
"os.environ",
{
Expand All @@ -173,6 +176,7 @@ def test_args_env_var(self):
self.assertEqual(
batch_span_processor._batch_processor._export_timeout_millis, 4
)
batch_span_processor.shutdown()

def test_args_env_var_defaults(self):
batch_span_processor = export.BatchSpanProcessor(
Expand All @@ -191,6 +195,7 @@ def test_args_env_var_defaults(self):
self.assertEqual(
batch_span_processor._batch_processor._export_timeout_millis, 30000
)
batch_span_processor.shutdown()

@mock.patch.dict(
"os.environ",
Expand Down Expand Up @@ -220,6 +225,7 @@ def test_args_env_var_value_error(self):
self.assertEqual(
batch_span_processor._batch_processor._export_timeout_millis, 30000
)
batch_span_processor.shutdown()

def test_on_start_accepts_parent_context(self):
# pylint: disable=no-self-use
Expand Down