diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index 319b5252cb..60481f65ff 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -19,6 +19,7 @@ import logging import posixpath import threading +from base64 import b64encode from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import asdict, dataclass from functools import partial @@ -151,7 +152,12 @@ def _do_upload( path: str, json_encodeable: Callable[[], JsonEncodeable] ) -> None: with fsspec_open(path, "w") as file: - json.dump(json_encodeable(), file, separators=(",", ":")) + json.dump( + json_encodeable(), + file, + separators=(",", ":"), + cls=Base64JsonEncoder, + ) def upload( self, @@ -206,3 +212,10 @@ def to_dict( def shutdown(self) -> None: # TODO: support timeout self._executor.shutdown() + + +class Base64JsonEncoder(json.JSONEncoder): + def default(self, o: Any) -> Any: + if isinstance(o, bytes): + return b64encode(o).decode() + return super().default(o) diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index 670a3b9342..4e6f2470dc 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -14,7 +14,6 @@ # pylint: disable=import-outside-toplevel,no-name-in-module - import importlib import logging import sys @@ -200,26 +199,31 @@ def test_upload(self): class TestFsspecUploadHookIntegration(TestBase): + def setUp(self): + super().setUp() + self.hook = FsspecUploadHook(base_path=BASE_PATH) + + def tearDown(self): + super().tearDown() + self.hook.shutdown() + def assert_fsspec_equal(self, path: str, value: str) -> None: with fsspec.open(path, "r") as file: self.assertEqual(file.read(), value) def test_upload_completions(self): - hook = FsspecUploadHook( - base_path=BASE_PATH, - ) tracer = self.tracer_provider.get_tracer(__name__) log_record = LogRecord() with tracer.start_as_current_span("chat mymodel") as span: - hook.upload( + self.hook.upload( inputs=FAKE_INPUTS, outputs=FAKE_OUTPUTS, system_instruction=FAKE_SYSTEM_INSTRUCTION, span=span, log_record=log_record, ) - hook.shutdown() + self.hook.shutdown() finished_spans = self.get_finished_spans() self.assertEqual(len(finished_spans), 1) @@ -250,25 +254,39 @@ def test_upload_completions(self): '[{"content":"You are a helpful assistant.","type":"text"}]', ) - @staticmethod - def upload_with_log(log_record: LogRecord): - hook = FsspecUploadHook( - base_path=BASE_PATH, - ) - - hook.upload( + def test_stamps_empty_log(self): + log_record = LogRecord() + self.hook.upload( inputs=FAKE_INPUTS, outputs=FAKE_OUTPUTS, system_instruction=FAKE_SYSTEM_INSTRUCTION, log_record=log_record, ) - hook.shutdown() - - def test_stamps_empty_log(self): - log_record = LogRecord() - self.upload_with_log(log_record) # stamp on both body and attributes self.assertIn("gen_ai.input.messages_ref", log_record.attributes) self.assertIn("gen_ai.output.messages_ref", log_record.attributes) self.assertIn("gen_ai.system_instructions_ref", log_record.attributes) + + def test_upload_bytes(self) -> None: + log_record = LogRecord() + self.hook.upload( + inputs=[ + types.InputMessage( + role="user", + parts=[ + types.Text(content="What is the capital of France?"), + {"type": "generic_bytes", "bytes": b"hello"}, + ], + ) + ], + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=log_record, + ) + self.hook.shutdown() + + self.assert_fsspec_equal( + log_record.attributes["gen_ai.input.messages_ref"], + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]', + )