Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/instrumentation-genai/util.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ OpenTelemetry Python - GenAI Util
:undoc-members:
:show-inheritance:

.. automodule:: opentelemetry.util.genai._fsspec_upload
.. automodule:: opentelemetry.util.genai._upload
:members:
:show-inheritance:
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ deps =
{[testenv]test_deps}
{toxinidir}/opentelemetry-instrumentation
{toxinidir}/util/opentelemetry-util-http
{toxinidir}/util/opentelemetry-util-genai[fsspec]
{toxinidir}/util/opentelemetry-util-genai[upload]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]
Expand Down
2 changes: 2 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Add jsonlines support to fsspec uploader
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791))
- Rename "fsspec_upload" entry point and classes to more generic "upload"
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3798](#3798))

## Version 0.1b0 (2025-09-24)

Expand Down
4 changes: 2 additions & 2 deletions util/opentelemetry-util-genai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ dependencies = [
]

[project.entry-points.opentelemetry_genai_completion_hook]
fsspec_upload = "opentelemetry.util.genai._fsspec_upload:fsspec_completion_upload_hook"
upload = "opentelemetry.util.genai._upload:upload_completion_hook"

[project.optional-dependencies]
test = ["pytest>=7.0.0"]
fsspec = ["fsspec>=2025.9.0"]
upload = ["fsspec>=2025.9.0"]

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
)


def fsspec_completion_upload_hook() -> CompletionHook:
def upload_completion_hook() -> CompletionHook:
# If fsspec is not installed the hook will be a no-op.
try:
# pylint: disable=import-outside-toplevel
from opentelemetry.util.genai._fsspec_upload.completion_hook import (
FsspecUploadCompletionHook,
from opentelemetry.util.genai._upload.completion_hook import (
UploadCompletionHook,
)
except ImportError:
return _NoOpCompletionHook()
Expand All @@ -39,4 +39,4 @@ def fsspec_completion_upload_hook() -> CompletionHook:
if not base_path:
return _NoOpCompletionHook()

return FsspecUploadCompletionHook(base_path=base_path)
return UploadCompletionHook(base_path=base_path)
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]


class FsspecUploadCompletionHook(CompletionHook):
class UploadCompletionHook(CompletionHook):
"""An completion hook using ``fsspec`` to upload to external storage

This function can be used as the
:func:`~opentelemetry.util.genai.completion_hook.load_completion_hook` implementation by
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``fsspec_upload``.
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``upload``.
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
base path for uploads.

Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]``
implementation will be used instead. You can use ``opentelemetry-util-genai[upload]``
as a requirement to achieve this.
"""

Expand Down Expand Up @@ -133,15 +133,15 @@ def done(future: Future[None]) -> None:
try:
future.result()
except Exception: # pylint: disable=broad-except
_logger.exception("fsspec uploader failed")
_logger.exception("uploader failed")
finally:
self._semaphore.release()

for path, json_encodeable in upload_data.items():
# could not acquire, drop data
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
_logger.warning(
"fsspec upload queue is full, dropping upload %s",
"upload queue is full, dropping upload %s",
path,
)
continue
Expand All @@ -153,7 +153,7 @@ def done(future: Future[None]) -> None:
fut.add_done_callback(done)
except RuntimeError:
_logger.info(
"attempting to upload file after FsspecUploadCompletionHook.shutdown() was already called"
"attempting to upload file after UploadCompletionHook.shutdown() was already called"
)
self._semaphore.release()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
* `Configuration
<https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_ for
configuring a backend with environment variables.
* `URL Chaining
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
use cases.
"""

OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from opentelemetry._logs import LogRecord
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.genai import types
from opentelemetry.util.genai._fsspec_upload.completion_hook import (
FsspecUploadCompletionHook,
from opentelemetry.util.genai._upload.completion_hook import (
UploadCompletionHook,
)
from opentelemetry.util.genai.completion_hook import (
_NoOpCompletionHook,
Expand All @@ -44,28 +44,26 @@
@patch.dict(
"os.environ",
{
"OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK": "fsspec_upload",
"OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK": "upload",
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH": BASE_PATH,
},
clear=True,
)
class TestFsspecEntryPoint(TestCase):
def test_fsspec_entry_point(self):
self.assertIsInstance(
load_completion_hook(), FsspecUploadCompletionHook
)
class TestUploadEntryPoint(TestCase):
def test_upload_entry_point(self):
self.assertIsInstance(load_completion_hook(), UploadCompletionHook)

def test_fsspec_entry_point_no_fsspec(self):
def test_upload_entry_point_no_fsspec(self):
"""Tests that the a no-op uploader is used when fsspec is not installed"""

from opentelemetry.util.genai import _fsspec_upload
from opentelemetry.util.genai import _upload

# Simulate fsspec imports failing
with patch.dict(
sys.modules,
{"opentelemetry.util.genai._fsspec_upload.completion_hook": None},
{"opentelemetry.util.genai._upload.completion_hook": None},
):
importlib.reload(_fsspec_upload)
importlib.reload(_upload)
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)


Expand Down Expand Up @@ -114,15 +112,15 @@ def _increment_mock_call(self, /, *args, **kwargs):
super()._increment_mock_call(*args, **kwargs)


class TestFsspecUploadCompletionHook(TestCase):
class TestUploadCompletionHook(TestCase):
def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._fsspec_upload.completion_hook.fsspec"
"opentelemetry.util.genai._upload.completion_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.mock_fsspec.open = ThreadSafeMagicMock()

self.hook = FsspecUploadCompletionHook(
self.hook = UploadCompletionHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
)
Expand Down Expand Up @@ -187,7 +185,7 @@ def test_upload_blocked(self):
)

self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
"upload queue is full, dropping upload", logs.output[0]
)

def test_shutdown_timeout(self):
Expand All @@ -212,13 +210,11 @@ def test_failed_upload_logs(self):
)
self.hook.shutdown()

self.assertIn("fsspec uploader failed", logs.output[0])
self.assertIn("uploader failed", logs.output[0])

def test_invalid_upload_format(self):
with self.assertRaisesRegex(ValueError, "Invalid upload_format"):
FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="invalid"
)
UploadCompletionHook(base_path=BASE_PATH, upload_format="invalid")

def test_parse_upload_format_envvar(self):
for envvar_value, expect in (
Expand All @@ -233,7 +229,7 @@ def test_parse_upload_format_envvar(self):
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": envvar_value},
clear=True,
):
hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
hook = UploadCompletionHook(base_path=BASE_PATH)
self.addCleanup(hook.shutdown)
self.assertEqual(
hook._format,
Expand All @@ -246,7 +242,7 @@ def test_parse_upload_format_envvar(self):
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": "json"},
clear=True,
):
hook = FsspecUploadCompletionHook(
hook = UploadCompletionHook(
base_path=BASE_PATH, upload_format="jsonl"
)
self.addCleanup(hook.shutdown)
Expand All @@ -262,18 +258,18 @@ def test_upload_after_shutdown_logs(self):
)
self.assertEqual(len(logs.output), 3)
self.assertIn(
"attempting to upload file after FsspecUploadCompletionHook.shutdown() was already called",
"attempting to upload file after UploadCompletionHook.shutdown() was already called",
logs.output[0],
)


class TestFsspecUploadCompletionHookIntegration(TestBase):
class TestUploadCompletionHookIntegration(TestBase):
def setUp(self):
super().setUp()
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
self.hook = UploadCompletionHook(base_path=BASE_PATH)

def create_hook(self) -> FsspecUploadCompletionHook:
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
def create_hook(self) -> UploadCompletionHook:
self.hook = UploadCompletionHook(base_path=BASE_PATH)
return self.hook

def tearDown(self):
Expand Down Expand Up @@ -365,9 +361,7 @@ def test_upload_bytes(self) -> None:
)

def test_upload_json(self) -> None:
hook = FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="json"
)
hook = UploadCompletionHook(base_path=BASE_PATH, upload_format="json")
self.addCleanup(hook.shutdown)
log_record = LogRecord()

Expand All @@ -390,9 +384,7 @@ def test_upload_json(self) -> None:
)

def test_upload_jsonlines(self) -> None:
hook = FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="jsonl"
)
hook = UploadCompletionHook(base_path=BASE_PATH, upload_format="jsonl")
self.addCleanup(hook.shutdown)
log_record = LogRecord()

Expand Down
12 changes: 6 additions & 6 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.