Skip to content

Commit 5965721

Browse files
committed
Add boilerplate and Queue for fsspec gen ai upload hook
1 parent d7bd1a1 commit 5965721

File tree

8 files changed

+256
-9
lines changed

8 files changed

+256
-9
lines changed

docs-requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pymemcache~=1.3
99
# Required by conf
1010
django>=2.2
1111

12+
# Require by opentelemetry-util-genai
13+
fsspec>=2025.9.0
14+
1215
# Required by instrumentation and exporter packages
1316
aio_pika~=7.2.0
1417
aiohttp~=3.0

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
None,
131131
),
132132
"redis": ("https://redis.readthedocs.io/en/latest/", None),
133+
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
133134
}
134135

135136
# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky

docs/instrumentation-genai/util.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
2525
:members:
2626
:undoc-members:
2727
:show-inheritance:
28+
29+
.. automodule:: opentelemetry.util.genai._fsspec_upload
30+
:members:
31+
:show-inheritance:

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ deps =
10601060
{[testenv]test_deps}
10611061
{toxinidir}/opentelemetry-instrumentation
10621062
{toxinidir}/util/opentelemetry-util-http
1063-
{toxinidir}/util/opentelemetry-util-genai
1063+
{toxinidir}/util/opentelemetry-util-genai[fsspec]
10641064
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
10651065
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
10661066
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]

util/opentelemetry-util-genai/pyproject.toml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ dependencies = [
3030
"opentelemetry-api>=1.31.0",
3131
]
3232

33+
[project.entry-points.opentelemetry_genai_upload_hook]
34+
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
35+
3336
[project.optional-dependencies]
34-
test = [
35-
"pytest>=7.0.0",
36-
]
37+
test = ["pytest>=7.0.0"]
38+
fsspec = ["fsspec>=2025.9.0"]
3739

3840
[project.urls]
3941
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
@@ -43,10 +45,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib"
4345
path = "src/opentelemetry/util/genai/version.py"
4446

4547
[tool.hatch.build.targets.sdist]
46-
include = [
47-
"/src",
48-
"/tests",
49-
]
48+
include = ["/src", "/tests"]
5049

5150
[tool.hatch.build.targets.wheel]
5251
packages = ["src/opentelemetry"]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import logging
18+
import queue
19+
import threading
20+
from dataclasses import dataclass
21+
from typing import Any
22+
23+
from opentelemetry._logs import LogRecord
24+
from opentelemetry.trace import Span
25+
from opentelemetry.util._once import Once
26+
from opentelemetry.util.genai import types
27+
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
28+
29+
_logger = logging.getLogger(__name__)
30+
31+
32+
# If the SDK is available, use the BatchProcessor shared class for queueing, otherwise the hook will be a
33+
# no-op.
34+
try:
35+
import fsspec
36+
except ImportError:
37+
fsspec = None
38+
39+
40+
@dataclass
41+
class Completion:
42+
inputs: list[types.InputMessage]
43+
outputs: list[types.OutputMessage]
44+
system_instruction: list[types.MessagePart]
45+
46+
47+
if fsspec is not None:
48+
49+
class FsspecCompletionExporter:
50+
"""Implements uploading GenAI completions to a generic backend using fsspec
51+
52+
This class is used by the `BatchUploadHook` to export completions to an external
53+
storage.
54+
"""
55+
56+
def export(self, completion: Completion) -> None:
57+
"""upload a completion with fsspec, may be called concurrently"""
58+
# TODO: implement fsspec upload
59+
60+
class FsspecUploadHook(UploadHook):
61+
def __init__(
62+
self, exporter: FsspecCompletionExporter, maxsize: int = 20
63+
) -> None:
64+
self._exporter = exporter
65+
66+
# Use opinionated defaults to start, we can add environment variables later
67+
self._queue: queue.Queue[Completion | None] = queue.Queue(
68+
maxsize=maxsize
69+
)
70+
self._consumer_thread = threading.Thread(
71+
target=self._consumer, daemon=True
72+
)
73+
self._consumer_thread.start()
74+
self._shutdown_once = Once()
75+
76+
def _consumer(self) -> None:
77+
shutdown = False
78+
while not (shutdown and self._queue.empty()):
79+
completion = self._queue.get()
80+
if completion is None:
81+
shutdown = True
82+
self._queue.task_done()
83+
continue
84+
85+
# TODO: submit to a thread pool
86+
self._exporter.export(completion)
87+
self._queue.task_done()
88+
89+
def upload(
90+
self,
91+
*,
92+
inputs: list[types.InputMessage],
93+
outputs: list[types.OutputMessage],
94+
system_instruction: list[types.MessagePart],
95+
span: Span | None = None,
96+
log_record: LogRecord | None = None,
97+
**kwargs: Any,
98+
) -> None:
99+
try:
100+
self._queue.put(
101+
Completion(
102+
inputs=inputs,
103+
outputs=outputs,
104+
system_instruction=system_instruction,
105+
),
106+
block=False,
107+
)
108+
except queue.Full:
109+
logging.warning(
110+
"gen ai fsspec upload queue is full, dropping completion"
111+
)
112+
113+
def shutdown(self, timeout_sec: float = 5) -> None:
114+
"""Shuts down any worker threads"""
115+
116+
def do_shutdown() -> None:
117+
# TODO: proper deadlines
118+
self._queue.put(None, timeout=timeout_sec)
119+
self._consumer_thread.join(timeout=timeout_sec)
120+
121+
self._shutdown_once.do_once(do_shutdown)
122+
123+
def fsspec_upload_hook() -> UploadHook:
124+
return FsspecUploadHook(FsspecCompletionExporter())
125+
else:
126+
127+
def fsspec_upload_hook() -> UploadHook:
128+
return _NoOpUploadHook()
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pytest==7.4.4
2-
-e opentelemetry-instrumentation
2+
fsspec==2025.9.0
3+
-e opentelemetry-instrumentation
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
# pylint: disable=import-outside-toplevel,no-name-in-module
17+
18+
import importlib
19+
import logging
20+
import sys
21+
import threading
22+
from unittest import TestCase
23+
from unittest.mock import MagicMock, patch
24+
25+
from opentelemetry.util.genai._fsspec_upload import (
26+
Completion,
27+
FsspecCompletionExporter,
28+
FsspecUploadHook,
29+
)
30+
from opentelemetry.util.genai.upload_hook import (
31+
_NoOpUploadHook,
32+
load_upload_hook,
33+
)
34+
35+
36+
@patch.dict(
37+
"os.environ",
38+
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec"},
39+
clear=True,
40+
)
41+
class TestFsspecEntryPoint(TestCase):
42+
def test_fsspec_entry_point(self):
43+
self.assertIsInstance(load_upload_hook(), FsspecUploadHook)
44+
45+
def test_fsspec_entry_point_no_fsspec(self):
46+
"""Tests that the a no-op uploader is used when fsspec is not installed"""
47+
48+
from opentelemetry.util.genai import _fsspec_upload
49+
50+
# Simulate fsspec imports failing
51+
with patch.dict(sys.modules, {"fsspec": None}):
52+
importlib.reload(_fsspec_upload)
53+
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
54+
55+
56+
MAXSIZE = 5
57+
SHUTDOWN_TIMEOUT = 0.5
58+
59+
60+
class TestFsspecUploadHook(TestCase):
61+
def setUp(self):
62+
self.mock_exporter = MagicMock(spec=FsspecCompletionExporter)
63+
self.hook = FsspecUploadHook(self.mock_exporter, maxsize=MAXSIZE)
64+
65+
def tearDown(self) -> None:
66+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
67+
68+
def test_fsspec_upload_hook_shutdown_no_items(self):
69+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
70+
71+
def test_fsspec_upload_hook_upload_then_shutdown(self):
72+
for _ in range(10):
73+
self.hook.upload(
74+
inputs=[],
75+
outputs=[],
76+
system_instruction=[],
77+
)
78+
# all items should be consumed
79+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
80+
81+
def test_fsspec_upload_hook_upload_blocked(self):
82+
unblock_export = threading.Event()
83+
84+
def blocked_export(completion: Completion) -> None:
85+
unblock_export.wait()
86+
87+
self.mock_exporter.export = MagicMock(wraps=blocked_export)
88+
89+
# fill the queue
90+
for _ in range(10):
91+
self.hook.upload(
92+
inputs=[],
93+
outputs=[],
94+
system_instruction=[],
95+
)
96+
97+
self.assertLessEqual(self.mock_exporter.export.call_count, MAXSIZE)
98+
99+
# attempt to enqueue when full should wawrn
100+
with self.assertLogs(level=logging.WARNING) as logs:
101+
self.hook.upload(
102+
inputs=[],
103+
outputs=[],
104+
system_instruction=[],
105+
)
106+
107+
self.assertEqual(len(logs.output), 1)
108+
unblock_export.set()
109+
110+
# all items should be consumed
111+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)

0 commit comments

Comments
 (0)