diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bc593e5089..1b80ef9ec32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 the OTLP `LogHandler` remains attached to the root logger. Fix a bug that can cause a deadlock to occur over `logging._lock` in some cases ([#4636](https://github.com/open-telemetry/opentelemetry-python/pull/4636)). +- Update OTLP gRPC/HTTP exporters: calling shutdown will now interrupt exporters that are sleeping + before a retry attempt, and cause them to return failure immediately. + Update BatchSpan/LogRecordProcessors: shutdown will now complete after 30 seconds of trying to finish + exporting any buffered telemetry, instead of continuing to export until all telemetry was exported. + ([#4638](https://github.com/open-telemetry/opentelemetry-python/pull/4638)). + ## Version 1.35.0/0.56b0 (2025-07-11) - Update OTLP proto to v1.7 [#4645](https://github.com/open-telemetry/opentelemetry-python/pull/4645). diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index c41f7219ae9..6791062d5dc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -20,7 +20,7 @@ from collections.abc import Sequence # noqa: F401 from logging import getLogger from os import environ -from time import sleep, time +from time import time from typing import ( # noqa: F401 Any, Callable, @@ -289,7 +289,7 @@ def __init__( ) self._client = self._stub(self._channel) - self._export_lock = threading.Lock() + self._shutdown_in_progress = threading.Event() self._shutdown = False @abstractmethod @@ -309,50 +309,53 @@ def _export( # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto # TracesData and use the code below instead. - with self._export_lock: - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=deadline_sec - time(), + deadline_sec = time() + self._timeout + for retry_num in range(_MAX_RETRYS): + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=deadline_sec - time(), + ) + return self._result.SUCCESS + except RpcError as error: + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" + ) + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + backoff_seconds = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 ) - return self._result.SUCCESS - except RpcError as error: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - backoff_seconds = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - if ( - error.code() not in _RETRYABLE_ERROR_CODES - or retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - ): - logger.error( - "Failed to export %s to %s, error code: %s", - self._exporting, - self._endpoint, - error.code(), - exc_info=error.code() == StatusCode.UNKNOWN, - ) - return self._result.FAILURE - logger.warning( - "Transient error %s encountered while exporting %s to %s, retrying in %.2fs.", - error.code(), + if ( + error.code() not in _RETRYABLE_ERROR_CODES + or retry_num + 1 == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + or self._shutdown + ): + logger.error( + "Failed to export %s to %s, error code: %s", self._exporting, self._endpoint, - backoff_seconds, + error.code(), + exc_info=error.code() == StatusCode.UNKNOWN, ) - sleep(backoff_seconds) + return self._result.FAILURE + logger.warning( + "Transient error %s encountered while exporting %s to %s, retrying in %.2fs.", + error.code(), + self._exporting, + self._endpoint, + backoff_seconds, + ) + shutdown = self._shutdown_in_progress.wait(backoff_seconds) + if shutdown: + logger.warning("Shutdown in progress, aborting retry.") + break # Not possible to reach here but the linter is complaining. return self._result.FAILURE @@ -360,11 +363,9 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: logger.warning("Exporter already shutdown, ignoring call") return - # wait for the last export if any - self._export_lock.acquire(timeout=timeout_millis / 1e3) self._shutdown = True + self._shutdown_in_progress.set() self._channel.close() - self._export_lock.release() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 595f1bac5bc..aef52fbc4a7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -325,31 +325,14 @@ def test_shutdown(self): "Exporter already shutdown, ignoring batch", ) - def test_shutdown_wait_last_export(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams( - StatusCode.OK, optional_export_sleep=1 - ), - self.server, - ) - - export_thread = ThreadWithReturnValue( - target=self.exporter.export, args=([self.span],) - ) - export_thread.start() - # Wait a bit for exporter to get lock and make export call. - time.sleep(0.25) - # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - self.exporter.shutdown(timeout_millis=3000) - # pylint: disable=protected-access - self.assertTrue(self.exporter._shutdown) - self.assertEqual(export_thread.join(), SpanExportResult.SUCCESS) - - def test_shutdown_doesnot_wait_last_export(self): + @unittest.skipIf( + system() == "Windows", + "For gRPC + windows there's some added delay in the RPCs which breaks the assertion over amount of time passed.", + ) + def test_shutdown_interrupts_export_retry_backoff(self): add_TraceServiceServicer_to_server( TraceServiceServicerWithExportParams( - StatusCode.OK, optional_export_sleep=3 + StatusCode.UNAVAILABLE, ), self.server, ) @@ -357,16 +340,25 @@ def test_shutdown_doesnot_wait_last_export(self): export_thread = ThreadWithReturnValue( target=self.exporter.export, args=([self.span],) ) - export_thread.start() - # Wait for exporter to get lock and make export call. - time.sleep(0.25) - # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # Set to 1 seconds, so the 3 second server-side delay will not be reached. - self.exporter.shutdown(timeout_millis=1000) - # pylint: disable=protected-access - self.assertTrue(self.exporter._shutdown) - self.assertEqual(export_thread.join(), None) + with self.assertLogs(level=WARNING) as warning: + begin_wait = time.time() + export_thread.start() + # Wait a bit for export to fail and the backoff sleep to start + time.sleep(0.05) + # The code should now be in a 1 second backoff. + # pylint: disable=protected-access + self.assertFalse(self.exporter._shutdown_in_progress.is_set()) + self.exporter.shutdown() + self.assertTrue(self.exporter._shutdown_in_progress.is_set()) + export_result = export_thread.join() + end_wait = time.time() + self.assertEqual(export_result, SpanExportResult.FAILURE) + # Shutdown should have interrupted the sleep. + self.assertTrue(end_wait - begin_wait < 0.2) + self.assertEqual( + warning.records[1].message, + "Shutdown in progress, aborting retry.", + ) def test_export_over_closed_grpc_channel(self): # pylint: disable=protected-access diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index c64f269b9ed..4c8b4de6004 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -15,10 +15,11 @@ import gzip import logging import random +import threading import zlib from io import BytesIO from os import environ -from time import sleep, time +from time import time from typing import Dict, Optional, Sequence import requests @@ -77,6 +78,7 @@ def __init__( compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): + self._shutdown_is_occuring = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, _append_logs_path( @@ -173,6 +175,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: not _is_retryable(resp) or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) + or self._shutdown ): _logger.error( "Failed to export logs batch code: %s, reason: %s", @@ -185,8 +188,10 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: resp.reason, backoff_seconds, ) - sleep(backoff_seconds) - # Not possible to reach here but the linter is complaining. + shutdown = self._shutdown_is_occuring.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break return LogExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: @@ -197,8 +202,9 @@ def shutdown(self): if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") return - self._session.close() self._shutdown = True + self._shutdown_is_occuring.set() + self._session.close() def _compression_from_env() -> Compression: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 7ee0aa79132..486344753a0 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -15,10 +15,11 @@ import gzip import logging import random +import threading import zlib from io import BytesIO from os import environ -from time import sleep, time +from time import time from typing import ( # noqa: F401 Any, Callable, @@ -120,6 +121,7 @@ def __init__( | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, ): + self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, _append_metrics_path( @@ -223,6 +225,7 @@ def export( not _is_retryable(resp) or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) + or self._shutdown ): _logger.error( "Failed to export metrics batch code: %s, reason: %s", @@ -235,16 +238,19 @@ def export( resp.reason, backoff_seconds, ) - sleep(backoff_seconds) - # Not possible to reach here but the linter is complaining. + shutdown = self._shutdown_in_progress.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break return MetricExportResult.FAILURE def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") return - self._session.close() self._shutdown = True + self._shutdown_in_progress.set() + self._session.close() @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 9f9baf31150..8b97a651d4f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -15,10 +15,11 @@ import gzip import logging import random +import threading import zlib from io import BytesIO from os import environ -from time import sleep, time +from time import time from typing import Dict, Optional, Sequence import requests @@ -76,6 +77,7 @@ def __init__( compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): + self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, _append_trace_path( @@ -171,6 +173,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: not _is_retryable(resp) or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) + or self._shutdown ): _logger.error( "Failed to export span batch code: %s, reason: %s", @@ -183,16 +186,19 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: resp.reason, backoff_seconds, ) - sleep(backoff_seconds) - # Not possible to reach here but the linter is complaining. + shutdown = self._shutdown_in_progress.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break return SpanExportResult.FAILURE def shutdown(self): if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") return - self._session.close() self._shutdown = True + self._shutdown_in_progress.set() + self._session.close() def force_flush(self, timeout_millis: int = 30000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 1b5e9cc5f92..815761397ea 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading import time from logging import WARNING from os import environ @@ -541,3 +542,34 @@ def export_side_effect(*args, **kwargs): mock_post.side_effect = export_side_effect exporter = OTLPMetricExporter(timeout=0.4) exporter.export(self.metrics["sum_int"]) + + @patch.object(Session, "post") + def test_shutdown_interrupts_retry_backoff(self, mock_post): + exporter = OTLPMetricExporter(timeout=1.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + thread = threading.Thread( + target=exporter.export, args=(self.metrics["sum_int"],) + ) + with self.assertLogs(level=WARNING) as warning: + before = time.time() + thread.start() + # Wait for the first attempt to fail, then enter a 1 second backoff. + time.sleep(0.05) + # Should cause export to wake up and return. + exporter.shutdown() + thread.join() + after = time.time() + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting metrics batch, retrying in", + warning.records[0].message, + ) + self.assertIn( + "Shutdown in progress, aborting retry.", + warning.records[1].message, + ) + + assert after - before < 0.2 diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 024015d0fa5..19183029edc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -14,6 +14,7 @@ # pylint: disable=protected-access +import threading import time import unittest from logging import WARNING @@ -439,3 +440,34 @@ def export_side_effect(*args, **kwargs): mock_post.side_effect = export_side_effect exporter = OTLPLogExporter(timeout=0.4) exporter.export(self._get_sdk_log_data()) + + @patch.object(Session, "post") + def test_shutdown_interrupts_retry_backoff(self, mock_post): + exporter = OTLPLogExporter(timeout=1.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + thread = threading.Thread( + target=exporter.export, args=(self._get_sdk_log_data(),) + ) + with self.assertLogs(level=WARNING) as warning: + before = time.time() + thread.start() + # Wait for the first attempt to fail, then enter a 1 second backoff. + time.sleep(0.05) + # Should cause export to wake up and return. + exporter.shutdown() + thread.join() + after = time.time() + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting logs batch, retrying in", + warning.records[0].message, + ) + self.assertIn( + "Shutdown in progress, aborting retry.", + warning.records[1].message, + ) + + assert after - before < 0.2 diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 16d40e3f3fd..224227a7f59 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading import time import unittest from logging import WARNING @@ -288,3 +289,32 @@ def export_side_effect(*args, **kwargs): mock_post.side_effect = export_side_effect exporter = OTLPSpanExporter(timeout=0.4) exporter.export([BASIC_SPAN]) + + @patch.object(Session, "post") + def test_shutdown_interrupts_retry_backoff(self, mock_post): + exporter = OTLPSpanExporter(timeout=1.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + thread = threading.Thread(target=exporter.export, args=([BASIC_SPAN],)) + with self.assertLogs(level=WARNING) as warning: + before = time.time() + thread.start() + # Wait for the first attempt to fail, then enter a 1 second backoff. + time.sleep(0.05) + # Should cause export to wake up and return. + exporter.shutdown() + thread.join() + after = time.time() + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting span batch, retrying in", + warning.records[0].message, + ) + self.assertIn( + "Shutdown in progress, aborting retry.", + warning.records[1].message, + ) + + assert after - before < 0.2 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index 97a00980cba..aec04e80ea0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -16,9 +16,11 @@ import collections import enum +import inspect import logging import os import threading +import time import weakref from abc import abstractmethod from typing import ( @@ -90,6 +92,7 @@ def __init__( self._exporting = exporting self._shutdown = False + self._shutdown_timeout_exceeded = False self._export_lock = threading.Lock() self._worker_awaken = threading.Event() self._worker_thread.start() @@ -101,7 +104,7 @@ def __init__( def _should_export_batch( self, batch_strategy: BatchExportStrategy, num_iterations: int ) -> bool: - if not self._queue: + if not self._queue or self._shutdown_timeout_exceeded: return False # Always continue to export while queue length exceeds max batch size. if len(self._queue) >= self._max_export_batch_size: @@ -180,16 +183,32 @@ def emit(self, data: Telemetry) -> None: if len(self._queue) >= self._max_export_batch_size: self._worker_awaken.set() - def shutdown(self): + def shutdown(self, timeout_millis: int = 30000): if self._shutdown: return - # Prevents emit and force_flush from further calling export. + shutdown_should_end = time.time() + (timeout_millis / 1000) + # Causes emit to reject telemetry and makes force_flush a no-op. self._shutdown = True - # Interrupts sleep in the worker, if it's sleeping. + # Interrupts sleep in the worker if it's sleeping. self._worker_awaken.set() - # Main worker loop should exit after one final export call with flush all strategy. - self._worker_thread.join() - self._exporter.shutdown() + self._worker_thread.join(timeout_millis / 1000) + # Stops worker thread from calling export again if queue is still not empty. + self._shutdown_timeout_exceeded = True + # We want to shutdown immediately only if we already waited `timeout_secs`. + # Otherwise we pass the remaining timeout to the exporter. + # Some exporter's shutdown support a timeout param. + if ( + "timeout_millis" + in inspect.getfullargspec(self._exporter.shutdown).args + ): + remaining_millis = (shutdown_should_end - time.time()) * 1000 + self._exporter.shutdown(timeout_millis=max(0, remaining_millis)) # type: ignore + else: + self._exporter.shutdown() + # Worker thread **should** be finished at this point, because we called shutdown on the exporter, + # and set shutdown_is_occuring to prevent further export calls. It's possible that a single export + # call is ongoing and the thread isn't finished. In this case we will return instead of waiting on + # the thread to finish. # TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568. def force_flush(self, timeout_millis: Optional[int] = None) -> bool: diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index 4888d81779d..541d27c880a 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -16,10 +16,12 @@ import gc import multiprocessing import os +import threading import time import unittest import weakref from platform import system +from typing import Any from unittest.mock import Mock import pytest @@ -49,6 +51,28 @@ multiprocessing.set_start_method("fork") +class MockExporterForTesting: + def __init__(self, export_sleep: int): + self.num_export_calls = 0 + self.export_sleep = export_sleep + self._shutdown = False + self.export_sleep_event = threading.Event() + + def export(self, _: list[Any]): + self.num_export_calls += 1 + if self._shutdown: + raise ValueError("Cannot export, already shutdown") + + sleep_interrupted = self.export_sleep_event.wait(self.export_sleep) + if sleep_interrupted: + raise ValueError("Did not get to finish !") + + def shutdown(self): + # Force export to finish sleeping. + self._shutdown = True + self.export_sleep_event.set() + + # BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor. # Important: make sure to call .shutdown() before the end of the test, # otherwise the worker thread will continue to run after the end of the test. @@ -193,3 +217,32 @@ def test_record_processor_is_garbage_collected( # Then the reference to the processor should no longer exist assert weak_ref() is None + + def test_shutdown_allows_1_export_to_finish( + self, batch_processor_class, telemetry, caplog + ): + # This exporter throws an exception if it's export sleep cannot finish. + exporter = MockExporterForTesting(export_sleep=2) + processor = batch_processor_class( + exporter, + max_queue_size=200, + max_export_batch_size=1, + schedule_delay_millis=30000, + ) + # Max export batch size is 1, so 3 emit calls requires 3 separate calls (each block for 2 seconds) to Export to clear the queue. + processor._batch_processor.emit(telemetry) + processor._batch_processor.emit(telemetry) + processor._batch_processor.emit(telemetry) + before = time.time() + processor._batch_processor.shutdown(timeout_millis=3000) + # Shutdown does not kill the thread. + assert processor._batch_processor._worker_thread.is_alive() is True + + after = time.time() + assert after - before < 3.3 + # Thread will naturally finish after a little bit. + time.sleep(0.1) + assert processor._batch_processor._worker_thread.is_alive() is False + # Expect the second call to be interrupted by shutdown, and the third call to never be made. + assert "Exception while exporting" in caplog.text + assert 2 == exporter.num_export_calls