Skip to content

Commit 9bd6aee

Browse files
committed
Add jsonlines support to fsspec uploader
1 parent 4ae8662 commit 9bd6aee

File tree

4 files changed

+181
-32
lines changed

4 files changed

+181
-32
lines changed

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Add jsonlines support to fsspec uploader
11+
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791))
12+
1013
## Version 0.1b0 (2025-09-24)
1114

1215
- Add completion hook to genai utils to implement semconv v1.37.

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/completion_hook.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from contextlib import ExitStack
2525
from dataclasses import asdict, dataclass
2626
from functools import partial
27+
from os import environ
2728
from time import time
2829
from typing import Any, Callable, Final, Literal, TextIO, cast
2930
from uuid import uuid4
@@ -35,6 +36,9 @@
3536
from opentelemetry.trace import Span
3637
from opentelemetry.util.genai import types
3738
from opentelemetry.util.genai.completion_hook import CompletionHook
39+
from opentelemetry.util.genai.environment_variables import (
40+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT,
41+
)
3842

3943
GEN_AI_INPUT_MESSAGES_REF: Final = (
4044
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
@@ -46,6 +50,10 @@
4650
gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref"
4751
)
4852

53+
_MESSAGE_INDEX_KEY = "index"
54+
55+
Format = Literal["json", "jsonl"]
56+
_FORMATS: tuple[Format, ...] = ("json", "jsonl")
4957

5058
_logger = logging.getLogger(__name__)
5159

@@ -70,9 +78,11 @@ class CompletionRefs:
7078
UploadData = dict[str, Callable[[], JsonEncodeable]]
7179

7280

73-
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
81+
def fsspec_open(
82+
urlpath: str, mode: Literal["w"], *args: Any, **kwargs: Any
83+
) -> TextIO:
7484
"""typed wrapper around `fsspec.open`"""
75-
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
85+
return cast(TextIO, fsspec.open(urlpath, mode, *args, **kwargs)) # pyright: ignore[reportUnknownMemberType]
7686

7787

7888
class FsspecUploadCompletionHook(CompletionHook):
@@ -94,10 +104,27 @@ def __init__(
94104
*,
95105
base_path: str,
96106
max_size: int = 20,
107+
upload_format: Format | None = None,
97108
) -> None:
98109
self._base_path = base_path
99110
self._max_size = max_size
100111

112+
if upload_format not in _FORMATS + (None,):
113+
raise ValueError(
114+
f"Invalid {upload_format=}. Must be one of {_FORMATS}"
115+
)
116+
117+
if upload_format is None:
118+
environ_format = environ.get(
119+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT, "json"
120+
).lower()
121+
if environ_format not in _FORMATS:
122+
upload_format = "json"
123+
else:
124+
upload_format = environ_format
125+
126+
self._format: Final[Literal["json", "jsonl"]] = upload_format
127+
101128
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
102129
# limits the number of queued tasks. If the queue is full, data will be dropped.
103130
self._executor = ThreadPoolExecutor(max_workers=max_size)
@@ -139,27 +166,37 @@ def _calculate_ref_path(self) -> CompletionRefs:
139166
uuid_str = str(uuid4())
140167
return CompletionRefs(
141168
inputs_ref=posixpath.join(
142-
self._base_path, f"{uuid_str}_inputs.json"
169+
self._base_path, f"{uuid_str}_inputs.{self._format}"
143170
),
144171
outputs_ref=posixpath.join(
145-
self._base_path, f"{uuid_str}_outputs.json"
172+
self._base_path, f"{uuid_str}_outputs.{self._format}"
146173
),
147174
system_instruction_ref=posixpath.join(
148-
self._base_path, f"{uuid_str}_system_instruction.json"
175+
self._base_path,
176+
f"{uuid_str}_system_instruction.{self._format}",
149177
),
150178
)
151179

152-
@staticmethod
153180
def _do_upload(
154-
path: str, json_encodeable: Callable[[], JsonEncodeable]
181+
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
155182
) -> None:
183+
if self._format == "json":
184+
message_lines = [json_encodeable()]
185+
else:
186+
message_lines = json_encodeable()
187+
# add an index for streaming readers of jsonl
188+
for message_idx, line in enumerate(message_lines):
189+
line[_MESSAGE_INDEX_KEY] = message_idx
190+
156191
with fsspec_open(path, "w") as file:
157-
json.dump(
158-
json_encodeable(),
159-
file,
160-
separators=(",", ":"),
161-
cls=Base64JsonEncoder,
162-
)
192+
for message in message_lines:
193+
json.dump(
194+
message,
195+
file,
196+
separators=(",", ":"),
197+
cls=Base64JsonEncoder,
198+
)
199+
file.write("\n")
163200

164201
def on_completion(
165202
self,

util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,13 @@
4343
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
4444
use cases.
4545
"""
46+
47+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT = (
48+
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT"
49+
)
50+
"""
51+
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT
52+
53+
The format to use when uploading prompt and response data. Must be one of ``json`` or
54+
``jsonl``. Defaults to ``json``.
55+
"""

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 118 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import sys
2020
import threading
2121
from contextlib import contextmanager
22-
from dataclasses import asdict
2322
from typing import Any
2423
from unittest import TestCase
2524
from unittest.mock import MagicMock, patch
@@ -76,6 +75,24 @@ def test_fsspec_entry_point_no_fsspec(self):
7675
role="user",
7776
parts=[types.Text(content="What is the capital of France?")],
7877
),
78+
types.InputMessage(
79+
role="assistant",
80+
parts=[
81+
types.ToolCall(
82+
id="get_capital_0",
83+
name="get_capital",
84+
arguments={"city": "Paris"},
85+
)
86+
],
87+
),
88+
types.InputMessage(
89+
role="user",
90+
parts=[
91+
types.ToolCallResponse(
92+
id="get_capital_0", response={"capital": "Paris"}
93+
)
94+
],
95+
),
7996
]
8097
FAKE_OUTPUTS = [
8198
types.OutputMessage(
@@ -197,6 +214,44 @@ def test_failed_upload_logs(self):
197214

198215
self.assertIn("fsspec uploader failed", logs.output[0])
199216

217+
def test_invalid_upload_format(self):
218+
with self.assertRaisesRegex(ValueError, "Invalid upload_format"):
219+
FsspecUploadCompletionHook(
220+
base_path=BASE_PATH, upload_format="invalid"
221+
)
222+
223+
def test_parse_upload_format_envvar(self):
224+
for envvar_value, expect in (
225+
("", "json"),
226+
("json", "json"),
227+
("invalid", "json"),
228+
("jsonl", "jsonl"),
229+
("jSoNl", "jsonl"),
230+
):
231+
with patch.dict(
232+
"os.environ",
233+
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": envvar_value},
234+
clear=True,
235+
):
236+
hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
237+
self.addCleanup(hook.shutdown)
238+
self.assertEqual(
239+
hook._format,
240+
expect,
241+
f"expected upload format {expect=} with {envvar_value=} got {hook._format}",
242+
)
243+
244+
with patch.dict(
245+
"os.environ",
246+
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": "json"},
247+
clear=True,
248+
):
249+
hook = FsspecUploadCompletionHook(
250+
base_path=BASE_PATH, upload_format="jsonl"
251+
)
252+
self.addCleanup(hook.shutdown)
253+
self.assertEqual(hook._format, "jsonl")
254+
200255
def test_upload_after_shutdown_logs(self):
201256
self.hook.shutdown()
202257
with self.assertLogs(level=logging.INFO) as logs:
@@ -212,25 +267,15 @@ def test_upload_after_shutdown_logs(self):
212267
)
213268

214269

215-
class FsspecUploaderTest(TestCase):
216-
def test_upload(self):
217-
FsspecUploadCompletionHook._do_upload(
218-
"memory://my_path",
219-
lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS],
220-
)
221-
222-
with fsspec.open("memory://my_path", "r") as file:
223-
self.assertEqual(
224-
file.read(),
225-
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
226-
)
227-
228-
229270
class TestFsspecUploadCompletionHookIntegration(TestBase):
230271
def setUp(self):
231272
super().setUp()
232273
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
233274

275+
def create_hook(self) -> FsspecUploadCompletionHook:
276+
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
277+
return self.hook
278+
234279
def tearDown(self):
235280
super().tearDown()
236281
self.hook.shutdown()
@@ -271,15 +316,15 @@ def test_upload_completions(self):
271316

272317
self.assert_fsspec_equal(
273318
span.attributes["gen_ai.input.messages_ref"],
274-
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
319+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]},{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}]},{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}]}]\n',
275320
)
276321
self.assert_fsspec_equal(
277322
span.attributes["gen_ai.output.messages_ref"],
278-
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
323+
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]\n',
279324
)
280325
self.assert_fsspec_equal(
281326
span.attributes["gen_ai.system_instructions_ref"],
282-
'[{"content":"You are a helpful assistant.","type":"text"}]',
327+
'[{"content":"You are a helpful assistant.","type":"text"}]\n',
283328
)
284329

285330
def test_stamps_empty_log(self):
@@ -316,5 +361,59 @@ def test_upload_bytes(self) -> None:
316361

317362
self.assert_fsspec_equal(
318363
log_record.attributes["gen_ai.input.messages_ref"],
319-
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]',
364+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]\n',
365+
)
366+
367+
def test_upload_json(self) -> None:
368+
hook = FsspecUploadCompletionHook(
369+
base_path=BASE_PATH, upload_format="json"
370+
)
371+
self.addCleanup(hook.shutdown)
372+
log_record = LogRecord()
373+
374+
hook.on_completion(
375+
inputs=FAKE_INPUTS,
376+
outputs=FAKE_OUTPUTS,
377+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
378+
log_record=log_record,
379+
)
380+
hook.shutdown()
381+
382+
ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
383+
self.assertTrue(
384+
ref_uri.endswith(".json"), f"{ref_uri=} does not end with .json"
385+
)
386+
387+
self.assert_fsspec_equal(
388+
ref_uri,
389+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]},{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}]},{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}]}]\n',
390+
)
391+
392+
def test_upload_jsonlines(self) -> None:
393+
hook = FsspecUploadCompletionHook(
394+
base_path=BASE_PATH, upload_format="jsonl"
395+
)
396+
self.addCleanup(hook.shutdown)
397+
log_record = LogRecord()
398+
399+
hook.on_completion(
400+
inputs=FAKE_INPUTS,
401+
outputs=FAKE_OUTPUTS,
402+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
403+
log_record=log_record,
404+
)
405+
hook.shutdown()
406+
407+
ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"]
408+
self.assertTrue(
409+
ref_uri.endswith(".jsonl"), f"{ref_uri=} does not end with .jsonl"
410+
)
411+
412+
self.assert_fsspec_equal(
413+
ref_uri,
414+
"""\
415+
{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}],"index":0}
416+
{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}],"index":1}
417+
{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2}
418+
""",
320419
)

0 commit comments

Comments
 (0)