Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pymemcache~=1.3
# Required by conf
django>=2.2

# Require by opentelemetry-util-genai
fsspec>=2025.9.0

# Required by instrumentation and exporter packages
aio_pika~=7.2.0
aiohttp~=3.0
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
None,
),
"redis": ("https://redis.readthedocs.io/en/latest/", None),
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
}

# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky
Expand Down
4 changes: 4 additions & 0 deletions docs/instrumentation-genai/util.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
:members:
:undoc-members:
:show-inheritance:

.. automodule:: opentelemetry.util.genai._fsspec_upload
:members:
:show-inheritance:
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ deps =
{[testenv]test_deps}
{toxinidir}/opentelemetry-instrumentation
{toxinidir}/util/opentelemetry-util-http
{toxinidir}/util/opentelemetry-util-genai
{toxinidir}/util/opentelemetry-util-genai[fsspec]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]
Expand Down
13 changes: 6 additions & 7 deletions util/opentelemetry-util-genai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ dependencies = [
"opentelemetry-api>=1.31.0",
]

[project.entry-points.opentelemetry_genai_upload_hook]
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"

[project.optional-dependencies]
test = [
"pytest>=7.0.0",
]
test = ["pytest>=7.0.0"]
fsspec = ["fsspec>=2025.9.0"]

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
Expand All @@ -43,10 +45,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib"
path = "src/opentelemetry/util/genai/version.py"

[tool.hatch.build.targets.sdist]
include = [
"/src",
"/tests",
]
include = ["/src", "/tests"]

[tool.hatch.build.targets.wheel]
packages = ["src/opentelemetry"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import logging
import queue
import threading
from dataclasses import dataclass
from typing import Any

from opentelemetry._logs import LogRecord
from opentelemetry.trace import Span
from opentelemetry.util._once import Once
from opentelemetry.util.genai import types
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook

_logger = logging.getLogger(__name__)


# If the SDK is available, use the BatchProcessor shared class for queueing, otherwise the hook will be a
# no-op.
try:
import fsspec
except ImportError:
fsspec = None


@dataclass
class Completion:
inputs: list[types.InputMessage]
outputs: list[types.OutputMessage]
system_instruction: list[types.MessagePart]


if fsspec is not None:

class FsspecCompletionExporter:
"""Implements uploading GenAI completions to a generic backend using fsspec

This class is used by the `BatchUploadHook` to export completions to an external
storage.
"""

def export(self, completion: Completion) -> None:
"""upload a completion with fsspec, may be called concurrently"""
# TODO: implement fsspec upload

class FsspecUploadHook(UploadHook):
def __init__(
self, exporter: FsspecCompletionExporter, maxsize: int = 20
) -> None:
self._exporter = exporter

# Use opinionated defaults to start, we can add environment variables later
self._queue: queue.Queue[Completion | None] = queue.Queue(
maxsize=maxsize
)
self._consumer_thread = threading.Thread(
target=self._consumer, daemon=True
)
self._consumer_thread.start()
self._shutdown_once = Once()

def _consumer(self) -> None:
shutdown = False
while not (shutdown and self._queue.empty()):
completion = self._queue.get()
if completion is None:
shutdown = True
self._queue.task_done()
continue

# TODO: submit to a thread pool
self._exporter.export(completion)
self._queue.task_done()

def upload(
self,
*,
inputs: list[types.InputMessage],
outputs: list[types.OutputMessage],
system_instruction: list[types.MessagePart],
span: Span | None = None,
log_record: LogRecord | None = None,
**kwargs: Any,
) -> None:
try:
self._queue.put(
Completion(
inputs=inputs,
outputs=outputs,
system_instruction=system_instruction,
),
block=False,
)
except queue.Full:
logging.warning(
"gen ai fsspec upload queue is full, dropping completion"
)

def shutdown(self, timeout_sec: float = 5) -> None:
"""Shuts down any worker threads"""

def do_shutdown() -> None:
# TODO: proper deadlines
self._queue.put(None, timeout=timeout_sec)
self._consumer_thread.join(timeout=timeout_sec)

self._shutdown_once.do_once(do_shutdown)

def fsspec_upload_hook() -> UploadHook:
return FsspecUploadHook(FsspecCompletionExporter())
else:

def fsspec_upload_hook() -> UploadHook:
return _NoOpUploadHook()
3 changes: 2 additions & 1 deletion util/opentelemetry-util-genai/test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pytest==7.4.4
-e opentelemetry-instrumentation
fsspec==2025.9.0
-e opentelemetry-instrumentation
111 changes: 111 additions & 0 deletions util/opentelemetry-util-genai/tests/test_fsspec_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# pylint: disable=import-outside-toplevel,no-name-in-module

import importlib
import logging
import sys
import threading
from unittest import TestCase
from unittest.mock import MagicMock, patch

from opentelemetry.util.genai._fsspec_upload import (
Completion,
FsspecCompletionExporter,
FsspecUploadHook,
)
from opentelemetry.util.genai.upload_hook import (
_NoOpUploadHook,
load_upload_hook,
)


@patch.dict(
"os.environ",
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec"},
clear=True,
)
class TestFsspecEntryPoint(TestCase):
def test_fsspec_entry_point(self):
self.assertIsInstance(load_upload_hook(), FsspecUploadHook)

def test_fsspec_entry_point_no_fsspec(self):
"""Tests that the a no-op uploader is used when fsspec is not installed"""

from opentelemetry.util.genai import _fsspec_upload

# Simulate fsspec imports failing
with patch.dict(sys.modules, {"fsspec": None}):
importlib.reload(_fsspec_upload)
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)


MAXSIZE = 5
SHUTDOWN_TIMEOUT = 0.5


class TestFsspecUploadHook(TestCase):
def setUp(self):
self.mock_exporter = MagicMock(spec=FsspecCompletionExporter)
self.hook = FsspecUploadHook(self.mock_exporter, maxsize=MAXSIZE)

def tearDown(self) -> None:
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)

def test_fsspec_upload_hook_shutdown_no_items(self):
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)

def test_fsspec_upload_hook_upload_then_shutdown(self):
for _ in range(10):
self.hook.upload(
inputs=[],
outputs=[],
system_instruction=[],
)
# all items should be consumed
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)

def test_fsspec_upload_hook_upload_blocked(self):
unblock_export = threading.Event()

def blocked_export(completion: Completion) -> None:
unblock_export.wait()

self.mock_exporter.export = MagicMock(wraps=blocked_export)

# fill the queue
for _ in range(10):
self.hook.upload(
inputs=[],
outputs=[],
system_instruction=[],
)

self.assertLessEqual(self.mock_exporter.export.call_count, MAXSIZE)

# attempt to enqueue when full should wawrn
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
inputs=[],
outputs=[],
system_instruction=[],
)

self.assertEqual(len(logs.output), 1)
unblock_export.set()

# all items should be consumed
self.hook.shutdown(timeout_sec=SHUTDOWN_TIMEOUT)
Loading