Skip to content

Commit 04e84ab

Browse files
committed
gen ai uploader timeout and fix flaky bugs
1 parent 4fb00c9 commit 04e84ab

File tree

2 files changed

+33
-12
lines changed

2 files changed

+33
-12
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import posixpath
2121
import threading
2222
from concurrent.futures import Future, ThreadPoolExecutor
23+
from contextlib import ExitStack
2324
from dataclasses import asdict, dataclass
2425
from functools import partial
26+
from time import time
2527
from typing import Any, Callable, Final, Literal, TextIO, cast
2628
from uuid import uuid4
2729

@@ -102,12 +104,12 @@ def __init__(
102104

103105
def _submit_all(self, upload_data: UploadData) -> None:
104106
def done(future: Future[None]) -> None:
105-
self._semaphore.release()
106-
107107
try:
108108
future.result()
109109
except Exception: # pylint: disable=broad-except
110110
_logger.exception("fsspec uploader failed")
111+
finally:
112+
self._semaphore.release()
111113

112114
for path, json_encodeable in upload_data.items():
113115
# could not acquire, drop data
@@ -127,7 +129,7 @@ def done(future: Future[None]) -> None:
127129
_logger.info(
128130
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
129131
)
130-
break
132+
self._semaphore.release()
131133

132134
def _calculate_ref_path(self) -> CompletionRefs:
133135
# TODO: experimental with using the trace_id and span_id, or fetching
@@ -203,6 +205,17 @@ def to_dict(
203205
**references,
204206
}
205207

206-
def shutdown(self) -> None:
207-
# TODO: support timeout
208-
self._executor.shutdown()
208+
def shutdown(self, timeout_sec: float = 10.0) -> None:
209+
deadline = time() + timeout_sec
210+
211+
# Wait for all tasks to finish to flush the queue
212+
with ExitStack() as stack:
213+
for _ in range(self._max_size):
214+
if not self._semaphore.acquire(timeout=deadline - time()): # pylint: disable=consider-using-with
215+
# Couldn't finish flushing all uploads before timeout
216+
break
217+
218+
stack.callback(self._semaphore.release)
219+
220+
# Queue is flushed and blocked, start shutdown
221+
self._executor.shutdown(wait=False)

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,21 @@ def test_fsspec_entry_point_no_fsspec(self):
8585
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
8686

8787

88+
class ThreadSafeMagicMock(MagicMock):
89+
def __init__(self, *args, **kwargs) -> None:
90+
self.__dict__["_lock"] = threading.Lock()
91+
super().__init__(*args, **kwargs)
92+
93+
def _increment_mock_call(self, /, *args, **kwargs):
94+
with self.__dict__["_lock"]:
95+
super()._increment_mock_call(*args, **kwargs)
96+
97+
8898
class TestFsspecUploadHook(TestCase):
8999
def setUp(self):
90100
self._fsspec_patcher = patch(
91-
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
101+
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec",
102+
new=ThreadSafeMagicMock(),
92103
)
93104
self.mock_fsspec = self._fsspec_patcher.start()
94105
self.hook = FsspecUploadHook(
@@ -155,10 +166,7 @@ def blocked_upload(*args: Any):
155166
unblock_upload.set()
156167

157168
def test_failed_upload_logs(self):
158-
def failing_upload(*args: Any) -> None:
159-
raise RuntimeError("failed to upload")
160-
161-
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
169+
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
162170

163171
with self.assertLogs(level=logging.ERROR) as logs:
164172
self.hook.upload(
@@ -178,7 +186,7 @@ def test_upload_after_shutdown_logs(self):
178186
outputs=FAKE_OUTPUTS,
179187
system_instruction=FAKE_SYSTEM_INSTRUCTION,
180188
)
181-
self.assertEqual(len(logs.output), 1)
189+
self.assertEqual(len(logs.output), 3)
182190
self.assertIn(
183191
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
184192
logs.output[0],

0 commit comments

Comments
 (0)