From bad3ea21f61b8af24fa0114f5087a563d2f125ba Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Wed, 1 Oct 2025 04:45:48 +0000 Subject: [PATCH] Record content-type and use canonical paths in fsspec genai uploader --- typings/fsspec/__init__.pyi | 31 +++++++++ util/opentelemetry-util-genai/CHANGELOG.md | 2 + .../util/genai/_upload/completion_hook.py | 18 ++--- .../tests/test_upload.py | 67 ++++++++++++++++--- 4 files changed, 101 insertions(+), 17 deletions(-) create mode 100644 typings/fsspec/__init__.pyi diff --git a/typings/fsspec/__init__.pyi b/typings/fsspec/__init__.pyi new file mode 100644 index 0000000000..008a62a4ed --- /dev/null +++ b/typings/fsspec/__init__.pyi @@ -0,0 +1,31 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Handwritten stubs for fsspec usage in opentelemetry-util-genai""" + +from __future__ import annotations + +import io +from typing import Any, Literal + +from fsspec.spec import ( + AbstractFileSystem as RealAbstractFileSystem, +) + +class AbstractFileSystem(RealAbstractFileSystem): + def open( + self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any + ) -> io.TextIOWrapper: ... + +def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ... diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 1bd9bd815d..b8c2ec9dcf 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791)) - Rename "fsspec_upload" entry point and classes to more generic "upload" ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3798](#3798)) +- Record content-type and use canonical paths in fsspec genai uploader + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795)) ## Version 0.1b0 (2025-09-24) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 2bccd0358b..ec175ab776 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -26,7 +26,7 @@ from functools import partial from os import environ from time import time -from typing import Any, Callable, Final, Literal, TextIO, cast +from typing import Any, Callable, Final, Literal from uuid import uuid4 import fsspec @@ -78,11 +78,6 @@ class CompletionRefs: UploadData = dict[str, Callable[[], JsonEncodeable]] -def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO: - """typed wrapper around `fsspec.open`""" - return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType] - - class UploadCompletionHook(CompletionHook): """An completion hook using ``fsspec`` to upload to external storage @@ -104,8 +99,9 @@ def __init__( max_size: int = 20, upload_format: Format | None = None, ) -> None: - self._base_path = base_path self._max_size = max_size + self._fs, base_path = fsspec.url_to_fs(base_path) + self._base_path = self._fs.unstrip_protocol(base_path) if upload_format not in _FORMATS + (None,): raise ValueError( @@ -188,7 +184,13 @@ def _do_upload( for message_idx, line in enumerate(message_lines): line[_MESSAGE_INDEX_KEY] = message_idx - with fsspec_open(path, "w") as file: + content_type = ( + "application/json" + if self._format == "json" + else "application/jsonl" + ) + + with self._fs.open(path, "w", content_type=content_type) as file: for message in message_lines: json.dump( message, diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index b1685fe41b..26e33467a7 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -21,7 +21,7 @@ from contextlib import contextmanager from typing import Any from unittest import TestCase -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, patch import fsspec @@ -117,8 +117,9 @@ def setUp(self): self._fsspec_patcher = patch( "opentelemetry.util.genai._upload.completion_hook.fsspec" ) - self.mock_fsspec = self._fsspec_patcher.start() - self.mock_fsspec.open = ThreadSafeMagicMock() + mock_fsspec = self._fsspec_patcher.start() + self.mock_fs = ThreadSafeMagicMock() + mock_fsspec.url_to_fs.return_value = self.mock_fs, "" self.hook = UploadCompletionHook( base_path=BASE_PATH, @@ -133,12 +134,12 @@ def tearDown(self) -> None: def block_upload(self): unblock_upload = threading.Event() - def blocked_upload(*args: Any): + def blocked_upload(*args: Any, **kwargs: Any): unblock_upload.wait() return MagicMock() try: - self.mock_fsspec.open.side_effect = blocked_upload + self.mock_fs.open.side_effect = blocked_upload yield finally: unblock_upload.set() @@ -156,7 +157,7 @@ def test_upload_then_shutdown(self): self.hook.shutdown() self.assertEqual( - self.mock_fsspec.open.call_count, + self.mock_fs.open.call_count, 3, "should have uploaded 3 files", ) @@ -172,7 +173,7 @@ def test_upload_blocked(self): ) self.assertLessEqual( - self.mock_fsspec.open.call_count, + self.mock_fs.open.call_count, MAXSIZE, f"uploader should only be called {MAXSIZE=} times", ) @@ -200,7 +201,7 @@ def test_shutdown_timeout(self): self.hook.shutdown(timeout_sec=0.01) def test_failed_upload_logs(self): - self.mock_fsspec.open.side_effect = RuntimeError("failed to upload") + self.mock_fs.open.side_effect = RuntimeError("failed to upload") with self.assertLogs(level=logging.ERROR) as logs: self.hook.on_completion( @@ -216,6 +217,27 @@ def test_invalid_upload_format(self): with self.assertRaisesRegex(ValueError, "Invalid upload_format"): UploadCompletionHook(base_path=BASE_PATH, upload_format="invalid") + def test_upload_format_sets_content_type(self): + for upload_format, expect_content_type in ( + ("json", "application/json"), + ("jsonl", "application/jsonl"), + ): + hook = UploadCompletionHook( + base_path=BASE_PATH, upload_format=upload_format + ) + self.addCleanup(hook.shutdown) + + hook.on_completion( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + hook.shutdown() + + self.mock_fs.open.assert_called_with( + ANY, "w", content_type=expect_content_type + ) + def test_parse_upload_format_envvar(self): for envvar_value, expect in ( ("", "json"), @@ -246,7 +268,11 @@ def test_parse_upload_format_envvar(self): base_path=BASE_PATH, upload_format="jsonl" ) self.addCleanup(hook.shutdown) - self.assertEqual(hook._format, "jsonl") + self.assertEqual( + hook._format, + "jsonl", + "upload_format kwarg should take precedence", + ) def test_upload_after_shutdown_logs(self): self.hook.shutdown() @@ -409,3 +435,26 @@ def test_upload_jsonlines(self) -> None: {"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2} """, ) + + def test_upload_chained_filesystem_ref(self) -> None: + """Using a chained filesystem like simplecache should refer to the final remote destination""" + hook = UploadCompletionHook( + base_path="simplecache::memory", + upload_format="jsonl", + ) + self.addCleanup(hook.shutdown) + log_record = LogRecord() + + hook.on_completion( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=log_record, + ) + hook.shutdown() + + ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"] + self.assertTrue( + ref_uri.startswith("memory://"), + f"{ref_uri=} does not start with final destination uri memory://", + )