Skip to content

Commit 31b10f9

Browse files
committed
Record content-type and use canonical paths in fsspec genai uploader
1 parent 205fd59 commit 31b10f9

File tree

4 files changed

+101
-17
lines changed

4 files changed

+101
-17
lines changed

typings/fsspec/__init__.pyi

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Handwritten stubs for fsspec usage in opentelemetry-util-genai"""
16+
17+
from __future__ import annotations
18+
19+
import io
20+
from typing import Any, Literal
21+
22+
from fsspec.spec import (
23+
AbstractFileSystem as RealAbstractFileSystem,
24+
)
25+
26+
class AbstractFileSystem(RealAbstractFileSystem):
27+
def open(
28+
self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any
29+
) -> io.TextIOWrapper: ...
30+
31+
def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ...

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
- Add jsonlines support to fsspec uploader
1111
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791))
12+
- Record content-type and use canonical paths in fsspec genai uploader
13+
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))
1214

1315
## Version 0.1b0 (2025-09-24)
1416

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/completion_hook.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from functools import partial
2727
from os import environ
2828
from time import time
29-
from typing import Any, Callable, Final, Literal, TextIO, cast
29+
from typing import Any, Callable, Final, Literal
3030
from uuid import uuid4
3131

3232
import fsspec
@@ -78,11 +78,6 @@ class CompletionRefs:
7878
UploadData = dict[str, Callable[[], JsonEncodeable]]
7979

8080

81-
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
82-
"""typed wrapper around `fsspec.open`"""
83-
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
84-
85-
8681
class FsspecUploadCompletionHook(CompletionHook):
8782
"""An completion hook using ``fsspec`` to upload to external storage
8883
@@ -104,8 +99,9 @@ def __init__(
10499
max_size: int = 20,
105100
upload_format: Format | None = None,
106101
) -> None:
107-
self._base_path = base_path
108102
self._max_size = max_size
103+
self._fs, base_path = fsspec.url_to_fs(base_path)
104+
self._base_path = self._fs.unstrip_protocol(base_path)
109105

110106
if upload_format not in _FORMATS + (None,):
111107
raise ValueError(
@@ -188,7 +184,13 @@ def _do_upload(
188184
for message_idx, line in enumerate(message_lines):
189185
line[_MESSAGE_INDEX_KEY] = message_idx
190186

191-
with fsspec_open(path, "w") as file:
187+
content_type = (
188+
"application/json"
189+
if self._format == "json"
190+
else "application/jsonl"
191+
)
192+
193+
with self._fs.open(path, "w", content_type=content_type) as file:
192194
for message in message_lines:
193195
json.dump(
194196
message,

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from contextlib import contextmanager
2222
from typing import Any
2323
from unittest import TestCase
24-
from unittest.mock import MagicMock, patch
24+
from unittest.mock import ANY, MagicMock, patch
2525

2626
import fsspec
2727

@@ -119,8 +119,9 @@ def setUp(self):
119119
self._fsspec_patcher = patch(
120120
"opentelemetry.util.genai._fsspec_upload.completion_hook.fsspec"
121121
)
122-
self.mock_fsspec = self._fsspec_patcher.start()
123-
self.mock_fsspec.open = ThreadSafeMagicMock()
122+
mock_fsspec = self._fsspec_patcher.start()
123+
self.mock_fs = ThreadSafeMagicMock()
124+
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""
124125

125126
self.hook = FsspecUploadCompletionHook(
126127
base_path=BASE_PATH,
@@ -135,12 +136,12 @@ def tearDown(self) -> None:
135136
def block_upload(self):
136137
unblock_upload = threading.Event()
137138

138-
def blocked_upload(*args: Any):
139+
def blocked_upload(*args: Any, **kwargs: Any):
139140
unblock_upload.wait()
140141
return MagicMock()
141142

142143
try:
143-
self.mock_fsspec.open.side_effect = blocked_upload
144+
self.mock_fs.open.side_effect = blocked_upload
144145
yield
145146
finally:
146147
unblock_upload.set()
@@ -158,7 +159,7 @@ def test_upload_then_shutdown(self):
158159
self.hook.shutdown()
159160

160161
self.assertEqual(
161-
self.mock_fsspec.open.call_count,
162+
self.mock_fs.open.call_count,
162163
3,
163164
"should have uploaded 3 files",
164165
)
@@ -174,7 +175,7 @@ def test_upload_blocked(self):
174175
)
175176

176177
self.assertLessEqual(
177-
self.mock_fsspec.open.call_count,
178+
self.mock_fs.open.call_count,
178179
MAXSIZE,
179180
f"uploader should only be called {MAXSIZE=} times",
180181
)
@@ -202,7 +203,7 @@ def test_shutdown_timeout(self):
202203
self.hook.shutdown(timeout_sec=0.01)
203204

204205
def test_failed_upload_logs(self):
205-
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
206+
self.mock_fs.open.side_effect = RuntimeError("failed to upload")
206207

207208
with self.assertLogs(level=logging.ERROR) as logs:
208209
self.hook.on_completion(
@@ -220,6 +221,27 @@ def test_invalid_upload_format(self):
220221
base_path=BASE_PATH, upload_format="invalid"
221222
)
222223

224+
def test_upload_format_sets_content_type(self):
225+
for upload_format, expect_content_type in (
226+
("json", "application/json"),
227+
("jsonl", "application/jsonl"),
228+
):
229+
hook = FsspecUploadCompletionHook(
230+
base_path=BASE_PATH, upload_format=upload_format
231+
)
232+
self.addCleanup(hook.shutdown)
233+
234+
hook.on_completion(
235+
inputs=FAKE_INPUTS,
236+
outputs=FAKE_OUTPUTS,
237+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
238+
)
239+
hook.shutdown()
240+
241+
self.mock_fs.open.assert_called_with(
242+
ANY, "w", content_type=expect_content_type
243+
)
244+
223245
def test_parse_upload_format_envvar(self):
224246
for envvar_value, expect in (
225247
("", "json"),
@@ -250,7 +272,11 @@ def test_parse_upload_format_envvar(self):
250272
base_path=BASE_PATH, upload_format="jsonl"
251273
)
252274
self.addCleanup(hook.shutdown)
253-
self.assertEqual(hook._format, "jsonl")
275+
self.assertEqual(
276+
hook._format,
277+
"jsonl",
278+
"upload_format kwarg should take precedence",
279+
)
254280

255281
def test_upload_after_shutdown_logs(self):
256282
self.hook.shutdown()
@@ -417,3 +443,26 @@ def test_upload_jsonlines(self) -> None:
417443
{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2}
418444
""",
419445
)
446+
447+
def test_upload_chained_filesystem_ref(self) -> None:
448+
"""Using a chained filesystem like simplecache should refer to the final remote destination"""
449+
hook = FsspecUploadCompletionHook(
450+
base_path="simplecache::memory",
451+
upload_format="jsonl",
452+
)
453+
self.addCleanup(hook.shutdown)
454+
log_record = LogRecord()
455+
456+
hook.on_completion(
457+
inputs=FAKE_INPUTS,
458+
outputs=FAKE_OUTPUTS,
459+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
460+
log_record=log_record,
461+
)
462+
hook.shutdown()
463+
464+
ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
465+
self.assertTrue(
466+
ref_uri.startswith("memory://"),
467+
f"{ref_uri=} does not start with final destination uri memory://",
468+
)

0 commit comments

Comments
 (0)