diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index 60481f65ff..d2ea9f2435 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -21,8 +21,10 @@ import threading from base64 import b64encode from concurrent.futures import Future, ThreadPoolExecutor +from contextlib import ExitStack from dataclasses import asdict, dataclass from functools import partial +from time import time from typing import Any, Callable, Final, Literal, TextIO, cast from uuid import uuid4 @@ -103,12 +105,12 @@ def __init__( def _submit_all(self, upload_data: UploadData) -> None: def done(future: Future[None]) -> None: - self._semaphore.release() - try: future.result() except Exception: # pylint: disable=broad-except _logger.exception("fsspec uploader failed") + finally: + self._semaphore.release() for path, json_encodeable in upload_data.items(): # could not acquire, drop data @@ -128,7 +130,7 @@ def done(future: Future[None]) -> None: _logger.info( "attempting to upload file after FsspecUploadHook.shutdown() was already called" ) - break + self._semaphore.release() def _calculate_ref_path(self) -> CompletionRefs: # TODO: experimental with using the trace_id and span_id, or fetching @@ -209,9 +211,21 @@ def to_dict( **references, } - def shutdown(self) -> None: - # TODO: support timeout - self._executor.shutdown() + def shutdown(self, *, timeout_sec: float = 10.0) -> None: + deadline = time() + timeout_sec + + # Wait for all tasks to finish to flush the queue + with ExitStack() as stack: + for _ in range(self._max_size): + remaining = deadline - time() + if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with + # Couldn't finish flushing all uploads before timeout + break + + stack.callback(self._semaphore.release) + + # Queue is flushed and blocked, start shutdown + self._executor.shutdown(wait=False) class Base64JsonEncoder(json.JSONEncoder): diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index 4e6f2470dc..2cf65e40ba 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -18,6 +18,7 @@ import logging import sys import threading +from contextlib import contextmanager from dataclasses import asdict from typing import Any from unittest import TestCase @@ -84,12 +85,24 @@ def test_fsspec_entry_point_no_fsspec(self): FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")] +class ThreadSafeMagicMock(MagicMock): + def __init__(self, *args, **kwargs) -> None: + self.__dict__["_lock"] = threading.Lock() + super().__init__(*args, **kwargs) + + def _increment_mock_call(self, /, *args, **kwargs): + with self.__dict__["_lock"]: + super()._increment_mock_call(*args, **kwargs) + + class TestFsspecUploadHook(TestCase): def setUp(self): self._fsspec_patcher = patch( "opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec" ) self.mock_fsspec = self._fsspec_patcher.start() + self.mock_fsspec.open = ThreadSafeMagicMock() + self.hook = FsspecUploadHook( base_path=BASE_PATH, max_size=MAXSIZE, @@ -99,6 +112,20 @@ def tearDown(self) -> None: self.hook.shutdown() self._fsspec_patcher.stop() + @contextmanager + def block_upload(self): + unblock_upload = threading.Event() + + def blocked_upload(*args: Any): + unblock_upload.wait() + return MagicMock() + + try: + self.mock_fsspec.open.side_effect = blocked_upload + yield + finally: + unblock_upload.set() + def test_shutdown_no_items(self): self.hook.shutdown() @@ -118,46 +145,45 @@ def test_upload_then_shutdown(self): ) def test_upload_blocked(self): - unblock_upload = threading.Event() + with self.block_upload(): + # fill the queue + for _ in range(MAXSIZE): + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) - def blocked_upload(*args: Any): - unblock_upload.wait() - return MagicMock() + self.assertLessEqual( + self.mock_fsspec.open.call_count, + MAXSIZE, + f"uploader should only be called {MAXSIZE=} times", + ) - self.mock_fsspec.open.side_effect = blocked_upload + with self.assertLogs(level=logging.WARNING) as logs: + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) - # fill the queue - for _ in range(MAXSIZE): - self.hook.upload( - inputs=FAKE_INPUTS, - outputs=FAKE_OUTPUTS, - system_instruction=FAKE_SYSTEM_INSTRUCTION, + self.assertIn( + "fsspec upload queue is full, dropping upload", logs.output[0] ) - self.assertLessEqual( - self.mock_fsspec.open.call_count, - MAXSIZE, - f"uploader should only be called {MAXSIZE=} times", - ) - - with self.assertLogs(level=logging.WARNING) as logs: + def test_shutdown_timeout(self): + with self.block_upload(): self.hook.upload( inputs=FAKE_INPUTS, outputs=FAKE_OUTPUTS, system_instruction=FAKE_SYSTEM_INSTRUCTION, ) - self.assertIn( - "fsspec upload queue is full, dropping upload", logs.output[0] - ) - - unblock_upload.set() + # shutdown should timeout and return even though there are still items in the queue + self.hook.shutdown(timeout_sec=0.01) def test_failed_upload_logs(self): - def failing_upload(*args: Any) -> None: - raise RuntimeError("failed to upload") - - self.mock_fsspec.open = MagicMock(wraps=failing_upload) + self.mock_fsspec.open.side_effect = RuntimeError("failed to upload") with self.assertLogs(level=logging.ERROR) as logs: self.hook.upload( @@ -177,7 +203,7 @@ def test_upload_after_shutdown_logs(self): outputs=FAKE_OUTPUTS, system_instruction=FAKE_SYSTEM_INSTRUCTION, ) - self.assertEqual(len(logs.output), 1) + self.assertEqual(len(logs.output), 3) self.assertIn( "attempting to upload file after FsspecUploadHook.shutdown() was already called", logs.output[0],