Skip to content

Commit 7819be1

Browse files
authored
Add fsspec gen ai upload hook (#3759)
* Add fsspec gen ai upload hook * split up into sub-package to make imports cleaner * Get rid of FsspecUploader separate class * comments, clean up doc strings
1 parent 7bac6be commit 7819be1

File tree

10 files changed

+483
-5
lines changed

10 files changed

+483
-5
lines changed

docs-requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pymemcache~=1.3
99
# Required by conf
1010
django>=2.2
1111

12+
# Require by opentelemetry-util-genai
13+
fsspec>=2025.9.0
14+
1215
# Required by instrumentation and exporter packages
1316
aio_pika~=7.2.0
1417
aiohttp~=3.0

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
None,
131131
),
132132
"redis": ("https://redis.readthedocs.io/en/latest/", None),
133+
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
133134
}
134135

135136
# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky

docs/instrumentation-genai/util.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
2525
:members:
2626
:undoc-members:
2727
:show-inheritance:
28+
29+
.. automodule:: opentelemetry.util.genai._fsspec_upload
30+
:members:
31+
:show-inheritance:

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ deps =
10601060
{[testenv]test_deps}
10611061
{toxinidir}/opentelemetry-instrumentation
10621062
{toxinidir}/util/opentelemetry-util-http
1063-
{toxinidir}/util/opentelemetry-util-genai
1063+
{toxinidir}/util/opentelemetry-util-genai[fsspec]
10641064
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
10651065
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
10661066
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]

util/opentelemetry-util-genai/pyproject.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ dependencies = [
3030
"opentelemetry-api>=1.31.0",
3131
]
3232

33+
[project.entry-points.opentelemetry_genai_upload_hook]
34+
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
35+
3336
[project.optional-dependencies]
34-
test = [
35-
"pytest>=7.0.0",
36-
]
37+
test = ["pytest>=7.0.0"]
38+
fsspec = ["fsspec>=2025.9.0"]
3739

3840
[project.urls]
3941
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from os import environ
18+
19+
from opentelemetry.util.genai.environment_variables import (
20+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
21+
)
22+
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
23+
24+
25+
def fsspec_upload_hook() -> UploadHook:
26+
# If fsspec is not installed the hook will be a no-op.
27+
try:
28+
# pylint: disable=import-outside-toplevel
29+
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
30+
FsspecUploadHook,
31+
)
32+
except ImportError:
33+
return _NoOpUploadHook()
34+
35+
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
36+
if not base_path:
37+
return _NoOpUploadHook()
38+
39+
return FsspecUploadHook(base_path=base_path)
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from __future__ import annotations
17+
18+
import json
19+
import logging
20+
import posixpath
21+
import threading
22+
from concurrent.futures import Future, ThreadPoolExecutor
23+
from dataclasses import asdict, dataclass
24+
from functools import partial
25+
from typing import Any, Callable, Literal, TextIO, cast
26+
from uuid import uuid4
27+
28+
import fsspec
29+
30+
from opentelemetry._logs import LogRecord
31+
from opentelemetry.trace import Span
32+
from opentelemetry.util.genai import types
33+
from opentelemetry.util.genai.upload_hook import UploadHook
34+
35+
_logger = logging.getLogger(__name__)
36+
37+
38+
@dataclass
39+
class Completion:
40+
inputs: list[types.InputMessage]
41+
outputs: list[types.OutputMessage]
42+
system_instruction: list[types.MessagePart]
43+
44+
45+
@dataclass
46+
class CompletionRefs:
47+
inputs_ref: str
48+
outputs_ref: str
49+
system_instruction_ref: str
50+
51+
52+
JsonEncodeable = list[dict[str, Any]]
53+
54+
# mapping of upload path to function computing upload data dict
55+
UploadData = dict[str, Callable[[], JsonEncodeable]]
56+
57+
58+
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
59+
"""typed wrapper around `fsspec.open`"""
60+
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
61+
62+
63+
class FsspecUploadHook(UploadHook):
64+
"""An upload hook using ``fsspec`` to upload to external storage
65+
66+
This function can be used as the
67+
:func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by
68+
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``.
69+
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
70+
base path for uploads.
71+
72+
Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
73+
implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]``
74+
as a requirement to achieve this.
75+
"""
76+
77+
def __init__(
78+
self,
79+
*,
80+
base_path: str,
81+
max_size: int = 20,
82+
) -> None:
83+
self._base_path = base_path
84+
self._max_size = max_size
85+
86+
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
87+
# limits the number of queued tasks. If the queue is full, data will be dropped.
88+
self._executor = ThreadPoolExecutor(max_workers=max_size)
89+
self._semaphore = threading.BoundedSemaphore(max_size)
90+
91+
def _submit_all(self, upload_data: UploadData) -> None:
92+
def done(future: Future[None]) -> None:
93+
self._semaphore.release()
94+
95+
try:
96+
future.result()
97+
except Exception: # pylint: disable=broad-except
98+
_logger.exception("fsspec uploader failed")
99+
100+
for path, json_encodeable in upload_data.items():
101+
# could not acquire, drop data
102+
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
103+
_logger.warning(
104+
"fsspec upload queue is full, dropping upload %s",
105+
path,
106+
)
107+
continue
108+
109+
try:
110+
fut = self._executor.submit(
111+
self._do_upload, path, json_encodeable
112+
)
113+
fut.add_done_callback(done)
114+
except RuntimeError:
115+
_logger.info(
116+
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
117+
)
118+
break
119+
120+
def _calculate_ref_path(self) -> CompletionRefs:
121+
# TODO: experimental with using the trace_id and span_id, or fetching
122+
# gen_ai.response.id from the active span.
123+
124+
uuid_str = str(uuid4())
125+
return CompletionRefs(
126+
inputs_ref=posixpath.join(
127+
self._base_path, f"{uuid_str}_inputs.json"
128+
),
129+
outputs_ref=posixpath.join(
130+
self._base_path, f"{uuid_str}_outputs.json"
131+
),
132+
system_instruction_ref=posixpath.join(
133+
self._base_path, f"{uuid_str}_system_instruction.json"
134+
),
135+
)
136+
137+
@staticmethod
138+
def _do_upload(
139+
path: str, json_encodeable: Callable[[], JsonEncodeable]
140+
) -> None:
141+
with fsspec_open(path, "w") as file:
142+
json.dump(json_encodeable(), file, separators=(",", ":"))
143+
144+
def upload(
145+
self,
146+
*,
147+
inputs: list[types.InputMessage],
148+
outputs: list[types.OutputMessage],
149+
system_instruction: list[types.MessagePart],
150+
span: Span | None = None,
151+
log_record: LogRecord | None = None,
152+
**kwargs: Any,
153+
) -> None:
154+
completion = Completion(
155+
inputs=inputs,
156+
outputs=outputs,
157+
system_instruction=system_instruction,
158+
)
159+
# generate the paths to upload to
160+
ref_names = self._calculate_ref_path()
161+
162+
def to_dict(
163+
dataclass_list: list[types.InputMessage]
164+
| list[types.OutputMessage]
165+
| list[types.MessagePart],
166+
) -> JsonEncodeable:
167+
return [asdict(dc) for dc in dataclass_list]
168+
169+
self._submit_all(
170+
{
171+
# Use partial to defer as much as possible to the background threads
172+
ref_names.inputs_ref: partial(to_dict, completion.inputs),
173+
ref_names.outputs_ref: partial(to_dict, completion.outputs),
174+
ref_names.system_instruction_ref: partial(
175+
to_dict, completion.system_instruction
176+
),
177+
},
178+
)
179+
180+
# TODO: stamp the refs on telemetry
181+
182+
def shutdown(self) -> None:
183+
# TODO: support timeout
184+
self._executor.shutdown()

util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,24 @@
2222
"""
2323
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK
2424
"""
25+
26+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = (
27+
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH"
28+
)
29+
"""
30+
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH
31+
32+
An :func:`fsspec.open` compatible URI/path for uploading prompts and responses. Can be a local
33+
path like ``/path/to/prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more
34+
information, see
35+
36+
* `Instantiate a file-system
37+
<https://filesystem-spec.readthedocs.io/en/latest/usage.html#instantiate-a-file-system>`_ for supported values and how to
38+
install support for additional backend implementations.
39+
* `Configuration
40+
<https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_ for
41+
configuring a backend with environment variables.
42+
* `URL Chaining
43+
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
44+
use cases.
45+
"""
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pytest==7.4.4
2-
-e opentelemetry-instrumentation
2+
fsspec==2025.9.0
3+
-e opentelemetry-instrumentation

0 commit comments

Comments
 (0)