Skip to content

Commit b7bbeae

Browse files
committed
Get rid of FsspecUploader separate class
1 parent 816ffd3 commit b7bbeae

File tree

3 files changed

+19
-34
lines changed

3 files changed

+19
-34
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ def fsspec_upload_hook() -> UploadHook:
2727
try:
2828
# pylint: disable=import-outside-toplevel
2929
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
30-
FsspecUploader,
3130
FsspecUploadHook,
3231
)
3332
except ImportError:
@@ -37,7 +36,4 @@ def fsspec_upload_hook() -> UploadHook:
3736
if not base_path:
3837
return _NoOpUploadHook()
3938

40-
return FsspecUploadHook(
41-
uploader=FsspecUploader(),
42-
base_path=base_path,
43-
)
39+
return FsspecUploadHook(base_path=base_path)

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,6 @@ def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
6060
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
6161

6262

63-
class FsspecUploader:
64-
"""Implements uploading GenAI completions to a generic backend using fsspec
65-
66-
This class is used by the `BatchUploadHook` to upload completions to an external
67-
storage.
68-
"""
69-
70-
def upload( # pylint: disable=no-self-use
71-
self,
72-
path: str,
73-
json_encodeable: Callable[[], JsonEncodeable],
74-
) -> None:
75-
with fsspec_open(path, "w") as file:
76-
json.dump(json_encodeable(), file, separators=(",", ":"))
77-
78-
7963
class FsspecUploadHook(UploadHook):
8064
"""An upload hook using ``fsspec`` to upload to external storage
8165
@@ -93,12 +77,10 @@ class FsspecUploadHook(UploadHook):
9377
def __init__(
9478
self,
9579
*,
96-
uploader: FsspecUploader,
9780
base_path: str,
9881
max_size: int = 20,
9982
) -> None:
10083
self._base_path = base_path
101-
self._uploader = uploader
10284
self._max_size = max_size
10385

10486
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
@@ -126,7 +108,7 @@ def done(future: Future[None]) -> None:
126108

127109
try:
128110
fut = self._executor.submit(
129-
self._uploader.upload, path, json_encodeable
111+
self._do_upload, path, json_encodeable
130112
)
131113
fut.add_done_callback(done)
132114
except RuntimeError:
@@ -152,6 +134,13 @@ def _calculate_ref_path(self) -> CompletionRefs:
152134
),
153135
)
154136

137+
@staticmethod
138+
def _do_upload(
139+
path: str, json_encodeable: Callable[[], JsonEncodeable]
140+
) -> None:
141+
with fsspec_open(path, "w") as file:
142+
json.dump(json_encodeable(), file, separators=(",", ":"))
143+
155144
def upload(
156145
self,
157146
*,

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
from opentelemetry.test.test_base import TestBase
3131
from opentelemetry.util.genai import types
3232
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
33-
FsspecUploader,
3433
FsspecUploadHook,
3534
)
3635
from opentelemetry.util.genai.upload_hook import (
@@ -88,15 +87,18 @@ def test_fsspec_entry_point_no_fsspec(self):
8887

8988
class TestFsspecUploadHook(TestCase):
9089
def setUp(self):
91-
self.mock_uploader = MagicMock(spec=FsspecUploader)
90+
self._fsspec_patcher = patch(
91+
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
92+
)
93+
self.mock_fsspec = self._fsspec_patcher.start()
9294
self.hook = FsspecUploadHook(
93-
uploader=self.mock_uploader,
9495
base_path=BASE_PATH,
9596
max_size=MAXSIZE,
9697
)
9798

9899
def tearDown(self) -> None:
99100
self.hook.shutdown()
101+
self._fsspec_patcher.stop()
100102

101103
def test_shutdown_no_items(self):
102104
self.hook.shutdown()
@@ -111,7 +113,7 @@ def test_upload_then_shutdown(self):
111113
self.hook.shutdown()
112114

113115
self.assertEqual(
114-
self.mock_uploader.upload.call_count,
116+
self.mock_fsspec.open.call_count,
115117
3,
116118
"should have uploaded 3 files",
117119
)
@@ -122,7 +124,7 @@ def test_upload_blocked(self):
122124
def blocked_upload(*args: Any) -> None:
123125
unblock_upload.wait()
124126

125-
self.mock_uploader.upload = MagicMock(wraps=blocked_upload)
127+
self.mock_fsspec.open = MagicMock(wraps=blocked_upload)
126128

127129
# fill the queue
128130
for _ in range(MAXSIZE):
@@ -133,7 +135,7 @@ def blocked_upload(*args: Any) -> None:
133135
)
134136

135137
self.assertLessEqual(
136-
self.mock_uploader.upload.call_count,
138+
self.mock_fsspec.open.call_count,
137139
MAXSIZE,
138140
f"uploader should only be called {MAXSIZE=} times",
139141
)
@@ -155,7 +157,7 @@ def test_failed_upload_logs(self):
155157
def failing_upload(*args: Any) -> None:
156158
raise RuntimeError("failed to upload")
157159

158-
self.mock_uploader.upload = MagicMock(wraps=failing_upload)
160+
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
159161

160162
with self.assertLogs(level=logging.ERROR) as logs:
161163
self.hook.upload(
@@ -184,8 +186,7 @@ def test_upload_after_shutdown_logs(self):
184186

185187
class FsspecUploaderTest(TestCase):
186188
def test_upload(self):
187-
uploader = FsspecUploader()
188-
uploader.upload(
189+
FsspecUploadHook._do_upload(
189190
"memory://my_path",
190191
lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS],
191192
)
@@ -207,7 +208,6 @@ def assert_fsspec_equal(self, path: str, value: str) -> None:
207208

208209
def test_upload_completions(self):
209210
hook = FsspecUploadHook(
210-
uploader=FsspecUploader(),
211211
base_path=BASE_PATH,
212212
)
213213
hook.upload(

0 commit comments

Comments
 (0)