Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import threading
from base64 import b64encode
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import ExitStack
from dataclasses import asdict, dataclass
from functools import partial
from time import time
from typing import Any, Callable, Final, Literal, TextIO, cast
from uuid import uuid4

Expand Down Expand Up @@ -103,12 +105,12 @@ def __init__(

def _submit_all(self, upload_data: UploadData) -> None:
def done(future: Future[None]) -> None:
self._semaphore.release()

try:
future.result()
except Exception: # pylint: disable=broad-except
_logger.exception("fsspec uploader failed")
finally:
self._semaphore.release()

for path, json_encodeable in upload_data.items():
# could not acquire, drop data
Expand All @@ -128,7 +130,7 @@ def done(future: Future[None]) -> None:
_logger.info(
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
)
break
self._semaphore.release()

def _calculate_ref_path(self) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
Expand Down Expand Up @@ -209,9 +211,21 @@ def to_dict(
**references,
}

def shutdown(self) -> None:
# TODO: support timeout
self._executor.shutdown()
def shutdown(self, *, timeout_sec: float = 10.0) -> None:
deadline = time() + timeout_sec

# Wait for all tasks to finish to flush the queue
with ExitStack() as stack:
for _ in range(self._max_size):
remaining = deadline - time()
if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with
# Couldn't finish flushing all uploads before timeout
break

stack.callback(self._semaphore.release)

# Queue is flushed and blocked, start shutdown
self._executor.shutdown(wait=False)


class Base64JsonEncoder(json.JSONEncoder):
Expand Down
82 changes: 54 additions & 28 deletions util/opentelemetry-util-genai/tests/test_fsspec_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import sys
import threading
from contextlib import contextmanager
from dataclasses import asdict
from typing import Any
from unittest import TestCase
Expand Down Expand Up @@ -84,12 +85,24 @@ def test_fsspec_entry_point_no_fsspec(self):
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]


class ThreadSafeMagicMock(MagicMock):
def __init__(self, *args, **kwargs) -> None:
self.__dict__["_lock"] = threading.Lock()
super().__init__(*args, **kwargs)

def _increment_mock_call(self, /, *args, **kwargs):
with self.__dict__["_lock"]:
super()._increment_mock_call(*args, **kwargs)


class TestFsspecUploadHook(TestCase):
def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.mock_fsspec.open = ThreadSafeMagicMock()

self.hook = FsspecUploadHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
Expand All @@ -99,6 +112,20 @@ def tearDown(self) -> None:
self.hook.shutdown()
self._fsspec_patcher.stop()

@contextmanager
def block_upload(self):
unblock_upload = threading.Event()

def blocked_upload(*args: Any):
unblock_upload.wait()
return MagicMock()

try:
self.mock_fsspec.open.side_effect = blocked_upload
yield
finally:
unblock_upload.set()

def test_shutdown_no_items(self):
self.hook.shutdown()

Expand All @@ -118,46 +145,45 @@ def test_upload_then_shutdown(self):
)

def test_upload_blocked(self):
unblock_upload = threading.Event()
with self.block_upload():
# fill the queue
for _ in range(MAXSIZE):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)

def blocked_upload(*args: Any):
unblock_upload.wait()
return MagicMock()
self.assertLessEqual(
self.mock_fsspec.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)

self.mock_fsspec.open.side_effect = blocked_upload
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)

# fill the queue
for _ in range(MAXSIZE):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
)

self.assertLessEqual(
self.mock_fsspec.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)

with self.assertLogs(level=logging.WARNING) as logs:
def test_shutdown_timeout(self):
with self.block_upload():
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)

self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
)

unblock_upload.set()
# shutdown should timeout and return even though there are still items in the queue
self.hook.shutdown(timeout_sec=0.01)

def test_failed_upload_logs(self):
def failing_upload(*args: Any) -> None:
raise RuntimeError("failed to upload")

self.mock_fsspec.open = MagicMock(wraps=failing_upload)
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")

with self.assertLogs(level=logging.ERROR) as logs:
self.hook.upload(
Expand All @@ -177,7 +203,7 @@ def test_upload_after_shutdown_logs(self):
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertEqual(len(logs.output), 1)
self.assertEqual(len(logs.output), 3)
self.assertIn(
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
logs.output[0],
Expand Down