Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add jsonlines support to fsspec uploader
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791))

## Version 0.1b0 (2025-09-24)

- Add completion hook to genai utils to implement semconv v1.37.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from contextlib import ExitStack
from dataclasses import asdict, dataclass
from functools import partial
from os import environ
from time import time
from typing import Any, Callable, Final, Literal, TextIO, cast
from uuid import uuid4
Expand All @@ -35,6 +36,9 @@
from opentelemetry.trace import Span
from opentelemetry.util.genai import types
from opentelemetry.util.genai.completion_hook import CompletionHook
from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT,
)

GEN_AI_INPUT_MESSAGES_REF: Final = (
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
Expand All @@ -46,6 +50,10 @@
gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref"
)

_MESSAGE_INDEX_KEY = "index"

Format = Literal["json", "jsonl"]
_FORMATS: tuple[Format, ...] = ("json", "jsonl")

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,10 +102,27 @@ def __init__(
*,
base_path: str,
max_size: int = 20,
upload_format: Format | None = None,
) -> None:
self._base_path = base_path
self._max_size = max_size

if upload_format not in _FORMATS + (None,):
raise ValueError(
f"Invalid {upload_format=}. Must be one of {_FORMATS}"
)

if upload_format is None:
environ_format = environ.get(
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT, "json"
).lower()
if environ_format not in _FORMATS:
upload_format = "json"
else:
upload_format = environ_format

self._format: Final[Literal["json", "jsonl"]] = upload_format

# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
# limits the number of queued tasks. If the queue is full, data will be dropped.
self._executor = ThreadPoolExecutor(max_workers=max_size)
Expand Down Expand Up @@ -139,27 +164,39 @@ def _calculate_ref_path(self) -> CompletionRefs:
uuid_str = str(uuid4())
return CompletionRefs(
inputs_ref=posixpath.join(
self._base_path, f"{uuid_str}_inputs.json"
self._base_path, f"{uuid_str}_inputs.{self._format}"
),
outputs_ref=posixpath.join(
self._base_path, f"{uuid_str}_outputs.json"
self._base_path, f"{uuid_str}_outputs.{self._format}"
),
system_instruction_ref=posixpath.join(
self._base_path, f"{uuid_str}_system_instruction.json"
self._base_path,
f"{uuid_str}_system_instruction.{self._format}",
),
)

@staticmethod
def _do_upload(
path: str, json_encodeable: Callable[[], JsonEncodeable]
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
) -> None:
if self._format == "json":
# output as a single line with the json messages array
message_lines = [json_encodeable()]
else:
# output as one line per message in the array
message_lines = json_encodeable()
# add an index for streaming readers of jsonl
for message_idx, line in enumerate(message_lines):
line[_MESSAGE_INDEX_KEY] = message_idx

with fsspec_open(path, "w") as file:
json.dump(
json_encodeable(),
file,
separators=(",", ":"),
cls=Base64JsonEncoder,
)
for message in message_lines:
json.dump(
message,
file,
separators=(",", ":"),
cls=Base64JsonEncoder,
)
file.write("\n")

def on_completion(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
use cases.
"""

OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT = (
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT"
)
"""
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT
The format to use when uploading prompt and response data. Must be one of ``json`` or
``jsonl``. Defaults to ``json``.
"""
137 changes: 118 additions & 19 deletions util/opentelemetry-util-genai/tests/test_fsspec_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import sys
import threading
from contextlib import contextmanager
from dataclasses import asdict
from typing import Any
from unittest import TestCase
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -76,6 +75,24 @@ def test_fsspec_entry_point_no_fsspec(self):
role="user",
parts=[types.Text(content="What is the capital of France?")],
),
types.InputMessage(
role="assistant",
parts=[
types.ToolCall(
id="get_capital_0",
name="get_capital",
arguments={"city": "Paris"},
)
],
),
types.InputMessage(
role="user",
parts=[
types.ToolCallResponse(
id="get_capital_0", response={"capital": "Paris"}
)
],
),
]
FAKE_OUTPUTS = [
types.OutputMessage(
Expand Down Expand Up @@ -197,6 +214,44 @@ def test_failed_upload_logs(self):

self.assertIn("fsspec uploader failed", logs.output[0])

def test_invalid_upload_format(self):
with self.assertRaisesRegex(ValueError, "Invalid upload_format"):
FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="invalid"
)

def test_parse_upload_format_envvar(self):
for envvar_value, expect in (
("", "json"),
("json", "json"),
("invalid", "json"),
("jsonl", "jsonl"),
("jSoNl", "jsonl"),
):
with patch.dict(
"os.environ",
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": envvar_value},
clear=True,
):
hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
self.addCleanup(hook.shutdown)
self.assertEqual(
hook._format,
expect,
f"expected upload format {expect=} with {envvar_value=} got {hook._format}",
)

with patch.dict(
"os.environ",
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": "json"},
clear=True,
):
hook = FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="jsonl"
)
self.addCleanup(hook.shutdown)
self.assertEqual(hook._format, "jsonl")

def test_upload_after_shutdown_logs(self):
self.hook.shutdown()
with self.assertLogs(level=logging.INFO) as logs:
Expand All @@ -212,25 +267,15 @@ def test_upload_after_shutdown_logs(self):
)


class FsspecUploaderTest(TestCase):
def test_upload(self):
FsspecUploadCompletionHook._do_upload(
"memory://my_path",
lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS],
)

with fsspec.open("memory://my_path", "r") as file:
self.assertEqual(
file.read(),
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
)


class TestFsspecUploadCompletionHookIntegration(TestBase):
def setUp(self):
super().setUp()
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)

def create_hook(self) -> FsspecUploadCompletionHook:
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
return self.hook

def tearDown(self):
super().tearDown()
self.hook.shutdown()
Expand Down Expand Up @@ -271,15 +316,15 @@ def test_upload_completions(self):

self.assert_fsspec_equal(
span.attributes["gen_ai.input.messages_ref"],
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]},{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}]},{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}]}]\n',
)
self.assert_fsspec_equal(
span.attributes["gen_ai.output.messages_ref"],
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]\n',
)
self.assert_fsspec_equal(
span.attributes["gen_ai.system_instructions_ref"],
'[{"content":"You are a helpful assistant.","type":"text"}]',
'[{"content":"You are a helpful assistant.","type":"text"}]\n',
)

def test_stamps_empty_log(self):
Expand Down Expand Up @@ -316,5 +361,59 @@ def test_upload_bytes(self) -> None:

self.assert_fsspec_equal(
log_record.attributes["gen_ai.input.messages_ref"],
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]',
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]\n',
)

def test_upload_json(self) -> None:
hook = FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="json"
)
self.addCleanup(hook.shutdown)
log_record = LogRecord()

hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=log_record,
)
hook.shutdown()

ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
self.assertTrue(
ref_uri.endswith(".json"), f"{ref_uri=} does not end with .json"
)

self.assert_fsspec_equal(
ref_uri,
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]},{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}]},{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}]}]\n',
)

def test_upload_jsonlines(self) -> None:
hook = FsspecUploadCompletionHook(
base_path=BASE_PATH, upload_format="jsonl"
)
self.addCleanup(hook.shutdown)
log_record = LogRecord()

hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=log_record,
)
hook.shutdown()

ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
self.assertTrue(
ref_uri.endswith(".jsonl"), f"{ref_uri=} does not end with .jsonl"
)

self.assert_fsspec_equal(
ref_uri,
"""\
{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}],"index":0}
{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}],"index":1}
{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2}
""",
)