Skip to content

Commit f0b745e

Browse files
committed
gen ai uploader timeout and fix flaky bugs
1 parent 4fb00c9 commit f0b745e

File tree

2 files changed

+75
-36
lines changed

2 files changed

+75
-36
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import posixpath
2121
import threading
2222
from concurrent.futures import Future, ThreadPoolExecutor
23+
from contextlib import ExitStack
2324
from dataclasses import asdict, dataclass
2425
from functools import partial
26+
from time import time
2527
from typing import Any, Callable, Final, Literal, TextIO, cast
2628
from uuid import uuid4
2729

@@ -102,12 +104,12 @@ def __init__(
102104

103105
def _submit_all(self, upload_data: UploadData) -> None:
104106
def done(future: Future[None]) -> None:
105-
self._semaphore.release()
106-
107107
try:
108108
future.result()
109109
except Exception: # pylint: disable=broad-except
110110
_logger.exception("fsspec uploader failed")
111+
finally:
112+
self._semaphore.release()
111113

112114
for path, json_encodeable in upload_data.items():
113115
# could not acquire, drop data
@@ -127,7 +129,7 @@ def done(future: Future[None]) -> None:
127129
_logger.info(
128130
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
129131
)
130-
break
132+
self._semaphore.release()
131133

132134
def _calculate_ref_path(self) -> CompletionRefs:
133135
# TODO: experimental with using the trace_id and span_id, or fetching
@@ -203,6 +205,18 @@ def to_dict(
203205
**references,
204206
}
205207

206-
def shutdown(self) -> None:
207-
# TODO: support timeout
208-
self._executor.shutdown()
208+
def shutdown(self, *, timeout_sec: float = 10.0) -> None:
209+
deadline = time() + timeout_sec
210+
211+
# Wait for all tasks to finish to flush the queue
212+
with ExitStack() as stack:
213+
for _ in range(self._max_size):
214+
remaining = deadline - time()
215+
if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with
216+
# Couldn't finish flushing all uploads before timeout
217+
break
218+
219+
stack.callback(self._semaphore.release)
220+
221+
# Queue is flushed and blocked, start shutdown
222+
self._executor.shutdown(wait=False)

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import sys
2121
import threading
22+
from contextlib import contextmanager
2223
from dataclasses import asdict
2324
from typing import Any
2425
from unittest import TestCase
@@ -85,10 +86,21 @@ def test_fsspec_entry_point_no_fsspec(self):
8586
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
8687

8788

89+
class ThreadSafeMagicMock(MagicMock):
90+
def __init__(self, *args, **kwargs) -> None:
91+
self.__dict__["_lock"] = threading.Lock()
92+
super().__init__(*args, **kwargs)
93+
94+
def _increment_mock_call(self, /, *args, **kwargs):
95+
with self.__dict__["_lock"]:
96+
super()._increment_mock_call(*args, **kwargs)
97+
98+
8899
class TestFsspecUploadHook(TestCase):
89100
def setUp(self):
90101
self._fsspec_patcher = patch(
91-
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
102+
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec",
103+
new=ThreadSafeMagicMock(),
92104
)
93105
self.mock_fsspec = self._fsspec_patcher.start()
94106
self.hook = FsspecUploadHook(
@@ -100,6 +112,20 @@ def tearDown(self) -> None:
100112
self.hook.shutdown()
101113
self._fsspec_patcher.stop()
102114

115+
@contextmanager
116+
def block_upload(self):
117+
unblock_upload = threading.Event()
118+
119+
def blocked_upload(*args: Any):
120+
unblock_upload.wait()
121+
return MagicMock()
122+
123+
try:
124+
self.mock_fsspec.open.side_effect = blocked_upload
125+
yield
126+
finally:
127+
unblock_upload.set()
128+
103129
def test_shutdown_no_items(self):
104130
self.hook.shutdown()
105131

@@ -119,46 +145,45 @@ def test_upload_then_shutdown(self):
119145
)
120146

121147
def test_upload_blocked(self):
122-
unblock_upload = threading.Event()
123-
124-
def blocked_upload(*args: Any):
125-
unblock_upload.wait()
126-
return MagicMock()
148+
with self.block_upload():
149+
# fill the queue
150+
for _ in range(MAXSIZE):
151+
self.hook.upload(
152+
inputs=FAKE_INPUTS,
153+
outputs=FAKE_OUTPUTS,
154+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
155+
)
156+
157+
self.assertLessEqual(
158+
self.mock_fsspec.open.call_count,
159+
MAXSIZE,
160+
f"uploader should only be called {MAXSIZE=} times",
161+
)
127162

128-
self.mock_fsspec.open.side_effect = blocked_upload
163+
with self.assertLogs(level=logging.WARNING) as logs:
164+
self.hook.upload(
165+
inputs=FAKE_INPUTS,
166+
outputs=FAKE_OUTPUTS,
167+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
168+
)
129169

130-
# fill the queue
131-
for _ in range(MAXSIZE):
132-
self.hook.upload(
133-
inputs=FAKE_INPUTS,
134-
outputs=FAKE_OUTPUTS,
135-
system_instruction=FAKE_SYSTEM_INSTRUCTION,
170+
self.assertIn(
171+
"fsspec upload queue is full, dropping upload", logs.output[0]
136172
)
137173

138-
self.assertLessEqual(
139-
self.mock_fsspec.open.call_count,
140-
MAXSIZE,
141-
f"uploader should only be called {MAXSIZE=} times",
142-
)
143-
144-
with self.assertLogs(level=logging.WARNING) as logs:
174+
def test_shutdown_timeout(self):
175+
with self.block_upload():
145176
self.hook.upload(
146177
inputs=FAKE_INPUTS,
147178
outputs=FAKE_OUTPUTS,
148179
system_instruction=FAKE_SYSTEM_INSTRUCTION,
149180
)
150181

151-
self.assertIn(
152-
"fsspec upload queue is full, dropping upload", logs.output[0]
153-
)
154-
155-
unblock_upload.set()
182+
# shutdown should timeout and return even though there are still items in the queue
183+
self.hook.shutdown(timeout_sec=0.01)
156184

157185
def test_failed_upload_logs(self):
158-
def failing_upload(*args: Any) -> None:
159-
raise RuntimeError("failed to upload")
160-
161-
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
186+
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
162187

163188
with self.assertLogs(level=logging.ERROR) as logs:
164189
self.hook.upload(
@@ -178,7 +203,7 @@ def test_upload_after_shutdown_logs(self):
178203
outputs=FAKE_OUTPUTS,
179204
system_instruction=FAKE_SYSTEM_INSTRUCTION,
180205
)
181-
self.assertEqual(len(logs.output), 1)
206+
self.assertEqual(len(logs.output), 3)
182207
self.assertIn(
183208
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
184209
logs.output[0],

0 commit comments

Comments
 (0)