Skip to content

Commit 13fa314

Browse files
authored
gen ai uploader timeout and fix flaky bugs (#3770)
1 parent 9fab62b commit 13fa314

File tree

2 files changed

+74
-34
lines changed

2 files changed

+74
-34
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import threading
2222
from base64 import b64encode
2323
from concurrent.futures import Future, ThreadPoolExecutor
24+
from contextlib import ExitStack
2425
from dataclasses import asdict, dataclass
2526
from functools import partial
27+
from time import time
2628
from typing import Any, Callable, Final, Literal, TextIO, cast
2729
from uuid import uuid4
2830

@@ -103,12 +105,12 @@ def __init__(
103105

104106
def _submit_all(self, upload_data: UploadData) -> None:
105107
def done(future: Future[None]) -> None:
106-
self._semaphore.release()
107-
108108
try:
109109
future.result()
110110
except Exception: # pylint: disable=broad-except
111111
_logger.exception("fsspec uploader failed")
112+
finally:
113+
self._semaphore.release()
112114

113115
for path, json_encodeable in upload_data.items():
114116
# could not acquire, drop data
@@ -128,7 +130,7 @@ def done(future: Future[None]) -> None:
128130
_logger.info(
129131
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
130132
)
131-
break
133+
self._semaphore.release()
132134

133135
def _calculate_ref_path(self) -> CompletionRefs:
134136
# TODO: experimental with using the trace_id and span_id, or fetching
@@ -209,9 +211,21 @@ def to_dict(
209211
**references,
210212
}
211213

212-
def shutdown(self) -> None:
213-
# TODO: support timeout
214-
self._executor.shutdown()
214+
def shutdown(self, *, timeout_sec: float = 10.0) -> None:
215+
deadline = time() + timeout_sec
216+
217+
# Wait for all tasks to finish to flush the queue
218+
with ExitStack() as stack:
219+
for _ in range(self._max_size):
220+
remaining = deadline - time()
221+
if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with
222+
# Couldn't finish flushing all uploads before timeout
223+
break
224+
225+
stack.callback(self._semaphore.release)
226+
227+
# Queue is flushed and blocked, start shutdown
228+
self._executor.shutdown(wait=False)
215229

216230

217231
class Base64JsonEncoder(json.JSONEncoder):

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import logging
1919
import sys
2020
import threading
21+
from contextlib import contextmanager
2122
from dataclasses import asdict
2223
from typing import Any
2324
from unittest import TestCase
@@ -84,12 +85,24 @@ def test_fsspec_entry_point_no_fsspec(self):
8485
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
8586

8687

88+
class ThreadSafeMagicMock(MagicMock):
89+
def __init__(self, *args, **kwargs) -> None:
90+
self.__dict__["_lock"] = threading.Lock()
91+
super().__init__(*args, **kwargs)
92+
93+
def _increment_mock_call(self, /, *args, **kwargs):
94+
with self.__dict__["_lock"]:
95+
super()._increment_mock_call(*args, **kwargs)
96+
97+
8798
class TestFsspecUploadHook(TestCase):
8899
def setUp(self):
89100
self._fsspec_patcher = patch(
90101
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
91102
)
92103
self.mock_fsspec = self._fsspec_patcher.start()
104+
self.mock_fsspec.open = ThreadSafeMagicMock()
105+
93106
self.hook = FsspecUploadHook(
94107
base_path=BASE_PATH,
95108
max_size=MAXSIZE,
@@ -99,6 +112,20 @@ def tearDown(self) -> None:
99112
self.hook.shutdown()
100113
self._fsspec_patcher.stop()
101114

115+
@contextmanager
116+
def block_upload(self):
117+
unblock_upload = threading.Event()
118+
119+
def blocked_upload(*args: Any):
120+
unblock_upload.wait()
121+
return MagicMock()
122+
123+
try:
124+
self.mock_fsspec.open.side_effect = blocked_upload
125+
yield
126+
finally:
127+
unblock_upload.set()
128+
102129
def test_shutdown_no_items(self):
103130
self.hook.shutdown()
104131

@@ -118,46 +145,45 @@ def test_upload_then_shutdown(self):
118145
)
119146

120147
def test_upload_blocked(self):
121-
unblock_upload = threading.Event()
148+
with self.block_upload():
149+
# fill the queue
150+
for _ in range(MAXSIZE):
151+
self.hook.upload(
152+
inputs=FAKE_INPUTS,
153+
outputs=FAKE_OUTPUTS,
154+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
155+
)
122156

123-
def blocked_upload(*args: Any):
124-
unblock_upload.wait()
125-
return MagicMock()
157+
self.assertLessEqual(
158+
self.mock_fsspec.open.call_count,
159+
MAXSIZE,
160+
f"uploader should only be called {MAXSIZE=} times",
161+
)
126162

127-
self.mock_fsspec.open.side_effect = blocked_upload
163+
with self.assertLogs(level=logging.WARNING) as logs:
164+
self.hook.upload(
165+
inputs=FAKE_INPUTS,
166+
outputs=FAKE_OUTPUTS,
167+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
168+
)
128169

129-
# fill the queue
130-
for _ in range(MAXSIZE):
131-
self.hook.upload(
132-
inputs=FAKE_INPUTS,
133-
outputs=FAKE_OUTPUTS,
134-
system_instruction=FAKE_SYSTEM_INSTRUCTION,
170+
self.assertIn(
171+
"fsspec upload queue is full, dropping upload", logs.output[0]
135172
)
136173

137-
self.assertLessEqual(
138-
self.mock_fsspec.open.call_count,
139-
MAXSIZE,
140-
f"uploader should only be called {MAXSIZE=} times",
141-
)
142-
143-
with self.assertLogs(level=logging.WARNING) as logs:
174+
def test_shutdown_timeout(self):
175+
with self.block_upload():
144176
self.hook.upload(
145177
inputs=FAKE_INPUTS,
146178
outputs=FAKE_OUTPUTS,
147179
system_instruction=FAKE_SYSTEM_INSTRUCTION,
148180
)
149181

150-
self.assertIn(
151-
"fsspec upload queue is full, dropping upload", logs.output[0]
152-
)
153-
154-
unblock_upload.set()
182+
# shutdown should timeout and return even though there are still items in the queue
183+
self.hook.shutdown(timeout_sec=0.01)
155184

156185
def test_failed_upload_logs(self):
157-
def failing_upload(*args: Any) -> None:
158-
raise RuntimeError("failed to upload")
159-
160-
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
186+
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
161187

162188
with self.assertLogs(level=logging.ERROR) as logs:
163189
self.hook.upload(
@@ -177,7 +203,7 @@ def test_upload_after_shutdown_logs(self):
177203
outputs=FAKE_OUTPUTS,
178204
system_instruction=FAKE_SYSTEM_INSTRUCTION,
179205
)
180-
self.assertEqual(len(logs.output), 1)
206+
self.assertEqual(len(logs.output), 3)
181207
self.assertIn(
182208
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
183209
logs.output[0],

0 commit comments

Comments
 (0)