Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))
- Make inputs / outputs / system instructions optional params to `on_completion`,
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)).
- Use an md5 hash of the system instructions as it's upload filename, and check
if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)).


## Version 0.1b0 (2025-09-25)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import annotations

import hashlib
import logging
import posixpath
import threading
Expand Down Expand Up @@ -152,10 +153,21 @@ def done(future: Future[None]) -> None:
)
self._semaphore.release()

def _calculate_ref_path(self) -> CompletionRefs:
def _calculate_ref_path(
self, system_instruction: list[types.MessagePart]
) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
# gen_ai.response.id from the active span.

system_instruction_hash = None
# Use an md5 hash of the system instructions as a filename, when system instructions are text.
if all(isinstance(x, types.Text) for x in system_instruction):
md5_hash = hashlib.md5()
md5_hash.update(
"\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType]
"utf-8"
)
)
system_instruction_hash = md5_hash.hexdigest()
uuid_str = str(uuid4())
return CompletionRefs(
inputs_ref=posixpath.join(
Expand All @@ -166,13 +178,17 @@ def _calculate_ref_path(self) -> CompletionRefs:
),
system_instruction_ref=posixpath.join(
self._base_path,
f"{uuid_str}_system_instruction.{self._format}",
f"{system_instruction_hash or uuid_str}_system_instruction.{self._format}",
),
)

def _do_upload(
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
) -> None:
# FileSystem class has this method. Only check for system instructions as that's the only where the filename is a hash.
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists
if "_system_instruction" in path and self._fs.exists(path): # pyright: ignore[reportUnknownMemberType]
return
if self._format == "json":
# output as a single line with the json messages array
message_lines = [json_encodeable()]
Expand Down Expand Up @@ -213,7 +229,7 @@ def on_completion(
system_instruction=system_instruction or None,
)
# generate the paths to upload to
ref_names = self._calculate_ref_path()
ref_names = self._calculate_ref_path(system_instruction)

def to_dict(
dataclass_list: list[types.InputMessage]
Expand Down
44 changes: 44 additions & 0 deletions util/opentelemetry-util-genai/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@


# pylint: disable=import-outside-toplevel,no-name-in-module
import hashlib
import importlib
import logging
import sys
import threading
import time
from contextlib import contextmanager
from typing import Any
from unittest import TestCase
Expand Down Expand Up @@ -120,13 +122,15 @@ def setUp(self):
mock_fsspec = self._fsspec_patcher.start()
self.mock_fs = ThreadSafeMagicMock()
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""
self.mock_fs.exists.return_value = False

self.hook = UploadCompletionHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
)

def tearDown(self) -> None:
self.mock_fs.reset_mock()
self.hook.shutdown()
self._fsspec_patcher.stop()

Expand Down Expand Up @@ -162,6 +166,46 @@ def test_upload_then_shutdown(self):
"should have uploaded 3 files",
)

def test_system_insruction_is_hashed_to_avoid_reupload(self):
system_instructions = [
types.Text(content="You are a helpful assistant."),
types.Text(content="You will do your best."),
]
md5_hash = hashlib.md5()
md5_hash.update(
"\n".join(x.content for x in system_instructions).encode("utf-8")
)
expected_hash = md5_hash.hexdigest()
record = LogRecord()
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=system_instructions,
log_record=record,
)
# Wait a bit for file upload to finish..
time.sleep(0.5)
self.mock_fs.exists.return_value = True
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=system_instructions,
log_record=record,
)
# all items should be consumed
self.hook.shutdown()

self.assertEqual(
self.mock_fs.open.call_count,
1,
"should have uploaded 1 file",
)
assert record.attributes is not None
self.assertEqual(
record.attributes["gen_ai.system_instructions_ref"].split("/")[-1],
f"{expected_hash}_system_instruction.json",
)

def test_upload_when_inputs_outputs_empty(self):
record = LogRecord()
self.hook.on_completion(
Expand Down
Loading