Skip to content

Commit 1caba81

Browse files
committed
Add boilerplate and Queue for fsspec gen ai upload hook
1 parent d7bd1a1 commit 1caba81

File tree

6 files changed

+258
-7
lines changed

6 files changed

+258
-7
lines changed

docs-requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pymemcache~=1.3
99
# Required by conf
1010
django>=2.2
1111

12+
# Require by opentelemetry-util-genai
13+
fsspec>=2025.9.0
14+
1215
# Required by instrumentation and exporter packages
1316
aio_pika~=7.2.0
1417
aiohttp~=3.0

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
None,
131131
),
132132
"redis": ("https://redis.readthedocs.io/en/latest/", None),
133+
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
133134
}
134135

135136
# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky

docs/instrumentation-genai/util.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
2525
:members:
2626
:undoc-members:
2727
:show-inheritance:
28+
29+
.. automodule:: opentelemetry.util.genai._fsspec_upload
30+
:members:
31+
:show-inheritance:

util/opentelemetry-util-genai/pyproject.toml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ dependencies = [
3030
"opentelemetry-api>=1.31.0",
3131
]
3232

33+
[project.entry-points.opentelemetry_genai_upload_hook]
34+
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
35+
3336
[project.optional-dependencies]
34-
test = [
35-
"pytest>=7.0.0",
36-
]
37+
test = ["pytest>=7.0.0"]
38+
fsspec = ["fsspec>=2025.9.0"]
3739

3840
[project.urls]
3941
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
@@ -43,10 +45,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib"
4345
path = "src/opentelemetry/util/genai/version.py"
4446

4547
[tool.hatch.build.targets.sdist]
46-
include = [
47-
"/src",
48-
"/tests",
49-
]
48+
include = ["/src", "/tests"]
5049

5150
[tool.hatch.build.targets.wheel]
5251
packages = ["src/opentelemetry"]
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import logging
18+
import queue
19+
import threading
20+
from dataclasses import dataclass
21+
from typing import Any
22+
23+
from opentelemetry._logs import LogRecord
24+
from opentelemetry.trace import Span
25+
from opentelemetry.util._once import Once
26+
from opentelemetry.util.genai import types
27+
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
28+
29+
_logger = logging.getLogger(__name__)
30+
31+
32+
# If the SDK is available, use the BatchProcessor shared class for queueing, otherwise the hook will be a
33+
# no-op.
34+
try:
35+
from opentelemetry.sdk._shared_internal import BatchProcessor
36+
except ImportError:
37+
BatchProcessor = None
38+
39+
40+
@dataclass
41+
class Completion:
42+
inputs: list[types.InputMessage]
43+
outputs: list[types.OutputMessage]
44+
system_instruction: list[types.MessagePart]
45+
46+
47+
if BatchProcessor is not None:
48+
# save a copy for the type checker
49+
BatchProcessorCopy = BatchProcessor
50+
51+
class FsspecCompletionExporter:
52+
"""Implements uploading GenAI completions to a generic backend using fsspec
53+
54+
This class is used by the `BatchUploadHook` to export completions to an external
55+
storage.
56+
"""
57+
58+
def export(self, completion: Completion) -> None:
59+
"""upload a completion with fsspec, may be called concurrently"""
60+
# TODO: implement fsspec upload
61+
62+
class FsspecUploadHook(UploadHook):
63+
def __init__(
64+
self, exporter: FsspecCompletionExporter, maxsize: int = 20
65+
) -> None:
66+
self._exporter = exporter
67+
68+
# Use opinionated defaults to start, we can add environment variables later
69+
self._queue: queue.Queue[Completion | None] = queue.Queue(
70+
maxsize=maxsize
71+
)
72+
self._consumer_thread = threading.Thread(
73+
target=self._consumer, daemon=True
74+
)
75+
self._consumer_thread.start()
76+
self._shutdown_once = Once()
77+
78+
def _consumer(self) -> None:
79+
shutdown = False
80+
while not (shutdown and self._queue.empty()):
81+
completion = self._queue.get()
82+
if completion is None:
83+
shutdown = True
84+
self._queue.task_done()
85+
continue
86+
87+
# TODO: submit to a thread pool
88+
self._exporter.export(completion)
89+
self._queue.task_done()
90+
91+
def upload(
92+
self,
93+
*,
94+
inputs: list[types.InputMessage],
95+
outputs: list[types.OutputMessage],
96+
system_instruction: list[types.MessagePart],
97+
span: Span | None = None,
98+
log_record: LogRecord | None = None,
99+
**kwargs: Any,
100+
) -> None:
101+
try:
102+
self._queue.put(
103+
Completion(
104+
inputs=inputs,
105+
outputs=outputs,
106+
system_instruction=system_instruction,
107+
),
108+
block=False,
109+
)
110+
except queue.Full:
111+
logging.warning(
112+
"gen ai fsspec upload queue is full, dropping completion"
113+
)
114+
115+
def shutdown(self, timeout_sec: float = 5) -> None:
116+
"""Shuts down any worker threads"""
117+
118+
def do_shutdown() -> None:
119+
# TODO: proper deadlines
120+
self._queue.put(None, timeout=timeout_sec)
121+
self._consumer_thread.join(timeout=timeout_sec)
122+
123+
self._shutdown_once.do_once(do_shutdown)
124+
125+
def fsspec_upload_hook() -> UploadHook:
126+
return FsspecUploadHook(FsspecCompletionExporter())
127+
else:
128+
129+
def fsspec_upload_hook() -> UploadHook:
130+
return _NoOpUploadHook()
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
# pylint: disable=import-outside-toplevel,no-name-in-module
17+
18+
import importlib
19+
import logging
20+
import sys
21+
import threading
22+
from unittest import TestCase
23+
from unittest.mock import MagicMock, patch
24+
25+
from opentelemetry.util.genai._fsspec_upload import (
26+
Completion,
27+
FsspecCompletionExporter,
28+
FsspecUploadHook,
29+
)
30+
from opentelemetry.util.genai.upload_hook import (
31+
_NoOpUploadHook,
32+
load_upload_hook,
33+
)
34+
35+
36+
@patch.dict(
37+
"os.environ",
38+
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec"},
39+
clear=True,
40+
)
41+
class TestFsspecEntryPoint(TestCase):
42+
def test_fsspec_entry_point(self):
43+
self.assertIsInstance(load_upload_hook(), FsspecUploadHook)
44+
45+
def test_fsspec_entry_point_no_sdk(self):
46+
"""Tests that the a no-op uploader is used when the BatchProcessor cannot be
47+
imported because opentelemetry-sdk is not installed"""
48+
49+
from opentelemetry.util.genai import _fsspec_upload
50+
51+
# Simulate opentelemetry-sdk imports failing
52+
with patch.dict(
53+
sys.modules, {"opentelemetry.sdk._shared_internal": None}
54+
):
55+
importlib.reload(_fsspec_upload)
56+
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
57+
58+
59+
MAXSIZE = 5
60+
SHUTDOWN_TIMEOUT = 0.5
61+
62+
63+
class TestFsspecUploadHook(TestCase):
64+
def setUp(self):
65+
self.mock_exporter = MagicMock(spec=FsspecCompletionExporter)
66+
self.hook = FsspecUploadHook(self.mock_exporter, maxsize=MAXSIZE)
67+
68+
def tearDown(self) -> None:
69+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
70+
71+
def test_fsspec_upload_hook_shutdown_no_items(self):
72+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
73+
74+
def test_fsspec_upload_hook_upload_then_shutdown(self):
75+
for _ in range(10):
76+
self.hook.upload(
77+
inputs=[],
78+
outputs=[],
79+
system_instruction=[],
80+
)
81+
# all items should be consumed
82+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
83+
84+
def test_fsspec_upload_hook_upload_blocked(self):
85+
unblock_export = threading.Event()
86+
87+
def blocked_export(completion: Completion) -> None:
88+
unblock_export.wait()
89+
90+
self.mock_exporter.export = MagicMock(wraps=blocked_export)
91+
92+
# fill the queue
93+
for _ in range(10):
94+
self.hook.upload(
95+
inputs=[],
96+
outputs=[],
97+
system_instruction=[],
98+
)
99+
100+
self.assertLessEqual(self.mock_exporter.export.call_count, MAXSIZE)
101+
102+
# attempt to enqueue when full should wawrn
103+
with self.assertLogs(level=logging.WARNING) as logs:
104+
self.hook.upload(
105+
inputs=[],
106+
outputs=[],
107+
system_instruction=[],
108+
)
109+
110+
self.assertEqual(len(logs.output), 1)
111+
unblock_export.set()
112+
113+
# all items should be consumed
114+
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)

0 commit comments

Comments
 (0)