Skip to content

Commit 3c80b06

Browse files
committed
gen ai uploader timeout and fix flaky bugs
1 parent 4fb00c9 commit 3c80b06

File tree

2 files changed

+75
-35
lines changed

2 files changed

+75
-35
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import posixpath
2121
import threading
2222
from concurrent.futures import Future, ThreadPoolExecutor
23+
from contextlib import ExitStack
2324
from dataclasses import asdict, dataclass
2425
from functools import partial
26+
from time import time
2527
from typing import Any, Callable, Final, Literal, TextIO, cast
2628
from uuid import uuid4
2729

@@ -102,12 +104,12 @@ def __init__(
102104

103105
def _submit_all(self, upload_data: UploadData) -> None:
104106
def done(future: Future[None]) -> None:
105-
self._semaphore.release()
106-
107107
try:
108108
future.result()
109109
except Exception: # pylint: disable=broad-except
110110
_logger.exception("fsspec uploader failed")
111+
finally:
112+
self._semaphore.release()
111113

112114
for path, json_encodeable in upload_data.items():
113115
# could not acquire, drop data
@@ -127,7 +129,7 @@ def done(future: Future[None]) -> None:
127129
_logger.info(
128130
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
129131
)
130-
break
132+
self._semaphore.release()
131133

132134
def _calculate_ref_path(self) -> CompletionRefs:
133135
# TODO: experimental with using the trace_id and span_id, or fetching
@@ -203,6 +205,18 @@ def to_dict(
203205
**references,
204206
}
205207

206-
def shutdown(self) -> None:
207-
# TODO: support timeout
208-
self._executor.shutdown()
208+
def shutdown(self, *, timeout_sec: float = 10.0) -> None:
209+
deadline = time() + timeout_sec
210+
211+
# Wait for all tasks to finish to flush the queue
212+
with ExitStack() as stack:
213+
for _ in range(self._max_size):
214+
remaining = deadline - time()
215+
if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with
216+
# Couldn't finish flushing all uploads before timeout
217+
break
218+
219+
stack.callback(self._semaphore.release)
220+
221+
# Queue is flushed and blocked, start shutdown
222+
self._executor.shutdown(wait=False)

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import sys
2121
import threading
22+
from contextlib import contextmanager
2223
from dataclasses import asdict
2324
from typing import Any
2425
from unittest import TestCase
@@ -85,12 +86,24 @@ def test_fsspec_entry_point_no_fsspec(self):
8586
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
8687

8788

89+
class ThreadSafeMagicMock(MagicMock):
90+
def __init__(self, *args, **kwargs) -> None:
91+
self.__dict__["_lock"] = threading.Lock()
92+
super().__init__(*args, **kwargs)
93+
94+
def _increment_mock_call(self, /, *args, **kwargs):
95+
with self.__dict__["_lock"]:
96+
super()._increment_mock_call(*args, **kwargs)
97+
98+
8899
class TestFsspecUploadHook(TestCase):
89100
def setUp(self):
90101
self._fsspec_patcher = patch(
91102
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
92103
)
93104
self.mock_fsspec = self._fsspec_patcher.start()
105+
self.mock_fsspec.open = ThreadSafeMagicMock()
106+
94107
self.hook = FsspecUploadHook(
95108
base_path=BASE_PATH,
96109
max_size=MAXSIZE,
@@ -100,6 +113,20 @@ def tearDown(self) -> None:
100113
self.hook.shutdown()
101114
self._fsspec_patcher.stop()
102115

116+
@contextmanager
117+
def block_upload(self):
118+
unblock_upload = threading.Event()
119+
120+
def blocked_upload(*args: Any):
121+
unblock_upload.wait()
122+
return MagicMock()
123+
124+
try:
125+
self.mock_fsspec.open.side_effect = blocked_upload
126+
yield
127+
finally:
128+
unblock_upload.set()
129+
103130
def test_shutdown_no_items(self):
104131
self.hook.shutdown()
105132

@@ -119,46 +146,45 @@ def test_upload_then_shutdown(self):
119146
)
120147

121148
def test_upload_blocked(self):
122-
unblock_upload = threading.Event()
123-
124-
def blocked_upload(*args: Any):
125-
unblock_upload.wait()
126-
return MagicMock()
149+
with self.block_upload():
150+
# fill the queue
151+
for _ in range(MAXSIZE):
152+
self.hook.upload(
153+
inputs=FAKE_INPUTS,
154+
outputs=FAKE_OUTPUTS,
155+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
156+
)
157+
158+
self.assertLessEqual(
159+
self.mock_fsspec.open.call_count,
160+
MAXSIZE,
161+
f"uploader should only be called {MAXSIZE=} times",
162+
)
127163

128-
self.mock_fsspec.open.side_effect = blocked_upload
164+
with self.assertLogs(level=logging.WARNING) as logs:
165+
self.hook.upload(
166+
inputs=FAKE_INPUTS,
167+
outputs=FAKE_OUTPUTS,
168+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
169+
)
129170

130-
# fill the queue
131-
for _ in range(MAXSIZE):
132-
self.hook.upload(
133-
inputs=FAKE_INPUTS,
134-
outputs=FAKE_OUTPUTS,
135-
system_instruction=FAKE_SYSTEM_INSTRUCTION,
171+
self.assertIn(
172+
"fsspec upload queue is full, dropping upload", logs.output[0]
136173
)
137174

138-
self.assertLessEqual(
139-
self.mock_fsspec.open.call_count,
140-
MAXSIZE,
141-
f"uploader should only be called {MAXSIZE=} times",
142-
)
143-
144-
with self.assertLogs(level=logging.WARNING) as logs:
175+
def test_shutdown_timeout(self):
176+
with self.block_upload():
145177
self.hook.upload(
146178
inputs=FAKE_INPUTS,
147179
outputs=FAKE_OUTPUTS,
148180
system_instruction=FAKE_SYSTEM_INSTRUCTION,
149181
)
150182

151-
self.assertIn(
152-
"fsspec upload queue is full, dropping upload", logs.output[0]
153-
)
154-
155-
unblock_upload.set()
183+
# shutdown should timeout and return even though there are still items in the queue
184+
self.hook.shutdown(timeout_sec=0.01)
156185

157186
def test_failed_upload_logs(self):
158-
def failing_upload(*args: Any) -> None:
159-
raise RuntimeError("failed to upload")
160-
161-
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
187+
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
162188

163189
with self.assertLogs(level=logging.ERROR) as logs:
164190
self.hook.upload(
@@ -178,7 +204,7 @@ def test_upload_after_shutdown_logs(self):
178204
outputs=FAKE_OUTPUTS,
179205
system_instruction=FAKE_SYSTEM_INSTRUCTION,
180206
)
181-
self.assertEqual(len(logs.output), 1)
207+
self.assertEqual(len(logs.output), 3)
182208
self.assertIn(
183209
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
184210
logs.output[0],

0 commit comments

Comments
 (0)