diff --git a/docs-requirements.txt b/docs-requirements.txt index cc246fe221..afd03672d0 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -9,6 +9,9 @@ pymemcache~=1.3 # Required by conf django>=2.2 +# Require by opentelemetry-util-genai +fsspec>=2025.9.0 + # Required by instrumentation and exporter packages aio_pika~=7.2.0 aiohttp~=3.0 diff --git a/docs/conf.py b/docs/conf.py index 47ad43def2..0f45647dc4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -130,6 +130,7 @@ None, ), "redis": ("https://redis.readthedocs.io/en/latest/", None), + "fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None), } # http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky diff --git a/docs/instrumentation-genai/util.rst b/docs/instrumentation-genai/util.rst index d55f2d1bf2..2ea0852e3c 100644 --- a/docs/instrumentation-genai/util.rst +++ b/docs/instrumentation-genai/util.rst @@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util :members: :undoc-members: :show-inheritance: + +.. automodule:: opentelemetry.util.genai._fsspec_upload + :members: + :show-inheritance: diff --git a/tox.ini b/tox.ini index 379735d408..e2d2e3a6c2 100644 --- a/tox.ini +++ b/tox.ini @@ -1060,7 +1060,7 @@ deps = {[testenv]test_deps} {toxinidir}/opentelemetry-instrumentation {toxinidir}/util/opentelemetry-util-http - {toxinidir}/util/opentelemetry-util-genai + {toxinidir}/util/opentelemetry-util-genai[fsspec] {toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments] {toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments] {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments] diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index 280da37d58..b24e6b6879 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -30,10 +30,12 @@ dependencies = [ "opentelemetry-api>=1.31.0", ] +[project.entry-points.opentelemetry_genai_upload_hook] +fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook" + [project.optional-dependencies] -test = [ - "pytest>=7.0.0", -] +test = ["pytest>=7.0.0"] +fsspec = ["fsspec>=2025.9.0"] [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai" @@ -43,10 +45,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" path = "src/opentelemetry/util/genai/version.py" [tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", -] +include = ["/src", "/tests"] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py new file mode 100644 index 0000000000..3a360fa122 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py @@ -0,0 +1,128 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import queue +import threading +from dataclasses import dataclass +from typing import Any + +from opentelemetry._logs import LogRecord +from opentelemetry.trace import Span +from opentelemetry.util._once import Once +from opentelemetry.util.genai import types +from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook + +_logger = logging.getLogger(__name__) + + +# If the SDK is available, use the BatchProcessor shared class for queueing, otherwise the hook will be a +# no-op. +try: + import fsspec +except ImportError: + fsspec = None + + +@dataclass +class Completion: + inputs: list[types.InputMessage] + outputs: list[types.OutputMessage] + system_instruction: list[types.MessagePart] + + +if fsspec is not None: + + class FsspecCompletionExporter: + """Implements uploading GenAI completions to a generic backend using fsspec + + This class is used by the `BatchUploadHook` to export completions to an external + storage. + """ + + def export(self, completion: Completion) -> None: + """upload a completion with fsspec, may be called concurrently""" + # TODO: implement fsspec upload + + class FsspecUploadHook(UploadHook): + def __init__( + self, exporter: FsspecCompletionExporter, maxsize: int = 20 + ) -> None: + self._exporter = exporter + + # Use opinionated defaults to start, we can add environment variables later + self._queue: queue.Queue[Completion | None] = queue.Queue( + maxsize=maxsize + ) + self._consumer_thread = threading.Thread( + target=self._consumer, daemon=True + ) + self._consumer_thread.start() + self._shutdown_once = Once() + + def _consumer(self) -> None: + shutdown = False + while not (shutdown and self._queue.empty()): + completion = self._queue.get() + if completion is None: + shutdown = True + self._queue.task_done() + continue + + # TODO: submit to a thread pool + self._exporter.export(completion) + self._queue.task_done() + + def upload( + self, + *, + inputs: list[types.InputMessage], + outputs: list[types.OutputMessage], + system_instruction: list[types.MessagePart], + span: Span | None = None, + log_record: LogRecord | None = None, + **kwargs: Any, + ) -> None: + try: + self._queue.put( + Completion( + inputs=inputs, + outputs=outputs, + system_instruction=system_instruction, + ), + block=False, + ) + except queue.Full: + logging.warning( + "gen ai fsspec upload queue is full, dropping completion" + ) + + def shutdown(self, timeout_sec: float = 5) -> None: + """Shuts down any worker threads""" + + def do_shutdown() -> None: + # TODO: proper deadlines + self._queue.put(None, timeout=timeout_sec) + self._consumer_thread.join(timeout=timeout_sec) + + self._shutdown_once.do_once(do_shutdown) + + def fsspec_upload_hook() -> UploadHook: + return FsspecUploadHook(FsspecCompletionExporter()) +else: + + def fsspec_upload_hook() -> UploadHook: + return _NoOpUploadHook() diff --git a/util/opentelemetry-util-genai/test-requirements.txt b/util/opentelemetry-util-genai/test-requirements.txt index 91d59f42f5..34a1ad14a2 100644 --- a/util/opentelemetry-util-genai/test-requirements.txt +++ b/util/opentelemetry-util-genai/test-requirements.txt @@ -1,2 +1,3 @@ pytest==7.4.4 --e opentelemetry-instrumentation \ No newline at end of file +fsspec==2025.9.0 +-e opentelemetry-instrumentation diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py new file mode 100644 index 0000000000..e5d314a834 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -0,0 +1,111 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# pylint: disable=import-outside-toplevel,no-name-in-module + +import importlib +import logging +import sys +import threading +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from opentelemetry.util.genai._fsspec_upload import ( + Completion, + FsspecCompletionExporter, + FsspecUploadHook, +) +from opentelemetry.util.genai.upload_hook import ( + _NoOpUploadHook, + load_upload_hook, +) + + +@patch.dict( + "os.environ", + {"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec"}, + clear=True, +) +class TestFsspecEntryPoint(TestCase): + def test_fsspec_entry_point(self): + self.assertIsInstance(load_upload_hook(), FsspecUploadHook) + + def test_fsspec_entry_point_no_fsspec(self): + """Tests that the a no-op uploader is used when fsspec is not installed""" + + from opentelemetry.util.genai import _fsspec_upload + + # Simulate fsspec imports failing + with patch.dict(sys.modules, {"fsspec": None}): + importlib.reload(_fsspec_upload) + self.assertIsInstance(load_upload_hook(), _NoOpUploadHook) + + +MAXSIZE = 5 +SHUTDOWN_TIMEOUT = 0.5 + + +class TestFsspecUploadHook(TestCase): + def setUp(self): + self.mock_exporter = MagicMock(spec=FsspecCompletionExporter) + self.hook = FsspecUploadHook(self.mock_exporter, maxsize=MAXSIZE) + + def tearDown(self) -> None: + self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT) + + def test_fsspec_upload_hook_shutdown_no_items(self): + self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT) + + def test_fsspec_upload_hook_upload_then_shutdown(self): + for _ in range(10): + self.hook.upload( + inputs=[], + outputs=[], + system_instruction=[], + ) + # all items should be consumed + self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT) + + def test_fsspec_upload_hook_upload_blocked(self): + unblock_export = threading.Event() + + def blocked_export(completion: Completion) -> None: + unblock_export.wait() + + self.mock_exporter.export = MagicMock(wraps=blocked_export) + + # fill the queue + for _ in range(10): + self.hook.upload( + inputs=[], + outputs=[], + system_instruction=[], + ) + + self.assertLessEqual(self.mock_exporter.export.call_count, MAXSIZE) + + # attempt to enqueue when full should wawrn + with self.assertLogs(level=logging.WARNING) as logs: + self.hook.upload( + inputs=[], + outputs=[], + system_instruction=[], + ) + + self.assertEqual(len(logs.output), 1) + unblock_export.set() + + # all items should be consumed + self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)