Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions typings/fsspec/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Handwritten stubs for fsspec usage in opentelemetry-util-genai"""

from __future__ import annotations

import io
from typing import Any, Literal

from fsspec.spec import (
AbstractFileSystem as RealAbstractFileSystem,
)

class AbstractFileSystem(RealAbstractFileSystem):
def open(
self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any
) -> io.TextIOWrapper: ...

def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ...
2 changes: 2 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791))
- Rename "fsspec_upload" entry point and classes to more generic "upload"
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3798](#3798))
- Record content-type and use canonical paths in fsspec genai uploader
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))

## Version 0.1b0 (2025-09-24)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from functools import partial
from os import environ
from time import time
from typing import Any, Callable, Final, Literal, TextIO, cast
from typing import Any, Callable, Final, Literal
from uuid import uuid4

import fsspec
Expand Down Expand Up @@ -78,11 +78,6 @@ class CompletionRefs:
UploadData = dict[str, Callable[[], JsonEncodeable]]


def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
"""typed wrapper around `fsspec.open`"""
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]


class UploadCompletionHook(CompletionHook):
"""An completion hook using ``fsspec`` to upload to external storage

Expand All @@ -104,8 +99,9 @@ def __init__(
max_size: int = 20,
upload_format: Format | None = None,
) -> None:
self._base_path = base_path
self._max_size = max_size
self._fs, base_path = fsspec.url_to_fs(base_path)
self._base_path = self._fs.unstrip_protocol(base_path)

if upload_format not in _FORMATS + (None,):
raise ValueError(
Expand Down Expand Up @@ -188,7 +184,13 @@ def _do_upload(
for message_idx, line in enumerate(message_lines):
line[_MESSAGE_INDEX_KEY] = message_idx

with fsspec_open(path, "w") as file:
content_type = (
"application/json"
if self._format == "json"
else "application/jsonl"
)

with self._fs.open(path, "w", content_type=content_type) as file:
for message in message_lines:
json.dump(
message,
Expand Down
67 changes: 58 additions & 9 deletions util/opentelemetry-util-genai/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from contextlib import contextmanager
from typing import Any
from unittest import TestCase
from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch

import fsspec

Expand Down Expand Up @@ -117,8 +117,9 @@ def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._upload.completion_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.mock_fsspec.open = ThreadSafeMagicMock()
mock_fsspec = self._fsspec_patcher.start()
self.mock_fs = ThreadSafeMagicMock()
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""

self.hook = UploadCompletionHook(
base_path=BASE_PATH,
Expand All @@ -133,12 +134,12 @@ def tearDown(self) -> None:
def block_upload(self):
unblock_upload = threading.Event()

def blocked_upload(*args: Any):
def blocked_upload(*args: Any, **kwargs: Any):
unblock_upload.wait()
return MagicMock()

try:
self.mock_fsspec.open.side_effect = blocked_upload
self.mock_fs.open.side_effect = blocked_upload
yield
finally:
unblock_upload.set()
Expand All @@ -156,7 +157,7 @@ def test_upload_then_shutdown(self):
self.hook.shutdown()

self.assertEqual(
self.mock_fsspec.open.call_count,
self.mock_fs.open.call_count,
3,
"should have uploaded 3 files",
)
Expand All @@ -172,7 +173,7 @@ def test_upload_blocked(self):
)

self.assertLessEqual(
self.mock_fsspec.open.call_count,
self.mock_fs.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)
Expand Down Expand Up @@ -200,7 +201,7 @@ def test_shutdown_timeout(self):
self.hook.shutdown(timeout_sec=0.01)

def test_failed_upload_logs(self):
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
self.mock_fs.open.side_effect = RuntimeError("failed to upload")

with self.assertLogs(level=logging.ERROR) as logs:
self.hook.on_completion(
Expand All @@ -216,6 +217,27 @@ def test_invalid_upload_format(self):
with self.assertRaisesRegex(ValueError, "Invalid upload_format"):
UploadCompletionHook(base_path=BASE_PATH, upload_format="invalid")

def test_upload_format_sets_content_type(self):
for upload_format, expect_content_type in (
("json", "application/json"),
("jsonl", "application/jsonl"),
):
hook = UploadCompletionHook(
base_path=BASE_PATH, upload_format=upload_format
)
self.addCleanup(hook.shutdown)

hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
hook.shutdown()

self.mock_fs.open.assert_called_with(
ANY, "w", content_type=expect_content_type
)

def test_parse_upload_format_envvar(self):
for envvar_value, expect in (
("", "json"),
Expand Down Expand Up @@ -246,7 +268,11 @@ def test_parse_upload_format_envvar(self):
base_path=BASE_PATH, upload_format="jsonl"
)
self.addCleanup(hook.shutdown)
self.assertEqual(hook._format, "jsonl")
self.assertEqual(
hook._format,
"jsonl",
"upload_format kwarg should take precedence",
)

def test_upload_after_shutdown_logs(self):
self.hook.shutdown()
Expand Down Expand Up @@ -409,3 +435,26 @@ def test_upload_jsonlines(self) -> None:
{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2}
""",
)

def test_upload_chained_filesystem_ref(self) -> None:
"""Using a chained filesystem like simplecache should refer to the final remote destination"""
hook = UploadCompletionHook(
base_path="simplecache::memory",
upload_format="jsonl",
)
self.addCleanup(hook.shutdown)
log_record = LogRecord()

hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=log_record,
)
hook.shutdown()

ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
self.assertTrue(
ref_uri.startswith("memory://"),
f"{ref_uri=} does not start with final destination uri memory://",
)