Skip to content

Commit bad3ea2

Browse files
committed
Record content-type and use canonical paths in fsspec genai uploader
1 parent 229d969 commit bad3ea2

File tree

4 files changed

+101
-17
lines changed

4 files changed

+101
-17
lines changed

typings/fsspec/__init__.pyi

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Handwritten stubs for fsspec usage in opentelemetry-util-genai"""
16+
17+
from __future__ import annotations
18+
19+
import io
20+
from typing import Any, Literal
21+
22+
from fsspec.spec import (
23+
AbstractFileSystem as RealAbstractFileSystem,
24+
)
25+
26+
class AbstractFileSystem(RealAbstractFileSystem):
27+
def open(
28+
self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any
29+
) -> io.TextIOWrapper: ...
30+
31+
def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ...

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791))
1212
- Rename "fsspec_upload" entry point and classes to more generic "upload"
1313
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3798](#3798))
14+
- Record content-type and use canonical paths in fsspec genai uploader
15+
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))
1416

1517
## Version 0.1b0 (2025-09-24)
1618

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from functools import partial
2727
from os import environ
2828
from time import time
29-
from typing import Any, Callable, Final, Literal, TextIO, cast
29+
from typing import Any, Callable, Final, Literal
3030
from uuid import uuid4
3131

3232
import fsspec
@@ -78,11 +78,6 @@ class CompletionRefs:
7878
UploadData = dict[str, Callable[[], JsonEncodeable]]
7979

8080

81-
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
82-
"""typed wrapper around `fsspec.open`"""
83-
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
84-
85-
8681
class UploadCompletionHook(CompletionHook):
8782
"""An completion hook using ``fsspec`` to upload to external storage
8883
@@ -104,8 +99,9 @@ def __init__(
10499
max_size: int = 20,
105100
upload_format: Format | None = None,
106101
) -> None:
107-
self._base_path = base_path
108102
self._max_size = max_size
103+
self._fs, base_path = fsspec.url_to_fs(base_path)
104+
self._base_path = self._fs.unstrip_protocol(base_path)
109105

110106
if upload_format not in _FORMATS + (None,):
111107
raise ValueError(
@@ -188,7 +184,13 @@ def _do_upload(
188184
for message_idx, line in enumerate(message_lines):
189185
line[_MESSAGE_INDEX_KEY] = message_idx
190186

191-
with fsspec_open(path, "w") as file:
187+
content_type = (
188+
"application/json"
189+
if self._format == "json"
190+
else "application/jsonl"
191+
)
192+
193+
with self._fs.open(path, "w", content_type=content_type) as file:
192194
for message in message_lines:
193195
json.dump(
194196
message,

util/opentelemetry-util-genai/tests/test_upload.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from contextlib import contextmanager
2222
from typing import Any
2323
from unittest import TestCase
24-
from unittest.mock import MagicMock, patch
24+
from unittest.mock import ANY, MagicMock, patch
2525

2626
import fsspec
2727

@@ -117,8 +117,9 @@ def setUp(self):
117117
self._fsspec_patcher = patch(
118118
"opentelemetry.util.genai._upload.completion_hook.fsspec"
119119
)
120-
self.mock_fsspec = self._fsspec_patcher.start()
121-
self.mock_fsspec.open = ThreadSafeMagicMock()
120+
mock_fsspec = self._fsspec_patcher.start()
121+
self.mock_fs = ThreadSafeMagicMock()
122+
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""
122123

123124
self.hook = UploadCompletionHook(
124125
base_path=BASE_PATH,
@@ -133,12 +134,12 @@ def tearDown(self) -> None:
133134
def block_upload(self):
134135
unblock_upload = threading.Event()
135136

136-
def blocked_upload(*args: Any):
137+
def blocked_upload(*args: Any, **kwargs: Any):
137138
unblock_upload.wait()
138139
return MagicMock()
139140

140141
try:
141-
self.mock_fsspec.open.side_effect = blocked_upload
142+
self.mock_fs.open.side_effect = blocked_upload
142143
yield
143144
finally:
144145
unblock_upload.set()
@@ -156,7 +157,7 @@ def test_upload_then_shutdown(self):
156157
self.hook.shutdown()
157158

158159
self.assertEqual(
159-
self.mock_fsspec.open.call_count,
160+
self.mock_fs.open.call_count,
160161
3,
161162
"should have uploaded 3 files",
162163
)
@@ -172,7 +173,7 @@ def test_upload_blocked(self):
172173
)
173174

174175
self.assertLessEqual(
175-
self.mock_fsspec.open.call_count,
176+
self.mock_fs.open.call_count,
176177
MAXSIZE,
177178
f"uploader should only be called {MAXSIZE=} times",
178179
)
@@ -200,7 +201,7 @@ def test_shutdown_timeout(self):
200201
self.hook.shutdown(timeout_sec=0.01)
201202

202203
def test_failed_upload_logs(self):
203-
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
204+
self.mock_fs.open.side_effect = RuntimeError("failed to upload")
204205

205206
with self.assertLogs(level=logging.ERROR) as logs:
206207
self.hook.on_completion(
@@ -216,6 +217,27 @@ def test_invalid_upload_format(self):
216217
with self.assertRaisesRegex(ValueError, "Invalid upload_format"):
217218
UploadCompletionHook(base_path=BASE_PATH, upload_format="invalid")
218219

220+
def test_upload_format_sets_content_type(self):
221+
for upload_format, expect_content_type in (
222+
("json", "application/json"),
223+
("jsonl", "application/jsonl"),
224+
):
225+
hook = UploadCompletionHook(
226+
base_path=BASE_PATH, upload_format=upload_format
227+
)
228+
self.addCleanup(hook.shutdown)
229+
230+
hook.on_completion(
231+
inputs=FAKE_INPUTS,
232+
outputs=FAKE_OUTPUTS,
233+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
234+
)
235+
hook.shutdown()
236+
237+
self.mock_fs.open.assert_called_with(
238+
ANY, "w", content_type=expect_content_type
239+
)
240+
219241
def test_parse_upload_format_envvar(self):
220242
for envvar_value, expect in (
221243
("", "json"),
@@ -246,7 +268,11 @@ def test_parse_upload_format_envvar(self):
246268
base_path=BASE_PATH, upload_format="jsonl"
247269
)
248270
self.addCleanup(hook.shutdown)
249-
self.assertEqual(hook._format, "jsonl")
271+
self.assertEqual(
272+
hook._format,
273+
"jsonl",
274+
"upload_format kwarg should take precedence",
275+
)
250276

251277
def test_upload_after_shutdown_logs(self):
252278
self.hook.shutdown()
@@ -409,3 +435,26 @@ def test_upload_jsonlines(self) -> None:
409435
{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2}
410436
""",
411437
)
438+
439+
def test_upload_chained_filesystem_ref(self) -> None:
440+
"""Using a chained filesystem like simplecache should refer to the final remote destination"""
441+
hook = UploadCompletionHook(
442+
base_path="simplecache::memory",
443+
upload_format="jsonl",
444+
)
445+
self.addCleanup(hook.shutdown)
446+
log_record = LogRecord()
447+
448+
hook.on_completion(
449+
inputs=FAKE_INPUTS,
450+
outputs=FAKE_OUTPUTS,
451+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
452+
log_record=log_record,
453+
)
454+
hook.shutdown()
455+
456+
ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
457+
self.assertTrue(
458+
ref_uri.startswith("memory://"),
459+
f"{ref_uri=} does not start with final destination uri memory://",
460+
)

0 commit comments

Comments
 (0)