Skip to content

Commit 3157b97

Browse files
committed
Use md5 hash of system instructions as the filename for system instructions.
1 parent 74ff31b commit 3157b97

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from __future__ import annotations
1717

18+
import hashlib
1819
import logging
1920
import posixpath
2021
import threading
@@ -152,10 +153,21 @@ def done(future: Future[None]) -> None:
152153
)
153154
self._semaphore.release()
154155

155-
def _calculate_ref_path(self) -> CompletionRefs:
156+
def _calculate_ref_path(
157+
self, system_instruction: list[types.MessagePart]
158+
) -> CompletionRefs:
156159
# TODO: experimental with using the trace_id and span_id, or fetching
157160
# gen_ai.response.id from the active span.
158-
161+
system_instruction_hash = None
162+
# Use an md5 hash of the system instructions as a filename, when system instructions are text.
163+
if all(isinstance(x, types.Text) for x in system_instruction):
164+
md5_hash = hashlib.md5()
165+
md5_hash.update(
166+
"\n".join(x.content for x in system_instruction).encode(
167+
"utf-8"
168+
)
169+
)
170+
system_instruction_hash = md5_hash.hexdigest()
159171
uuid_str = str(uuid4())
160172
return CompletionRefs(
161173
inputs_ref=posixpath.join(
@@ -166,13 +178,17 @@ def _calculate_ref_path(self) -> CompletionRefs:
166178
),
167179
system_instruction_ref=posixpath.join(
168180
self._base_path,
169-
f"{uuid_str}_system_instruction.{self._format}",
181+
f"{system_instruction_hash or uuid_str}_system_instruction.{self._format}",
170182
),
171183
)
172184

173185
def _do_upload(
174186
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
175187
) -> None:
188+
# FileSystem class has this method. Only check for system instructions as that's the only where the filename is a hash.
189+
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists
190+
if "_system_instruction" in path and self._fs.exists(path):
191+
return
176192
if self._format == "json":
177193
# output as a single line with the json messages array
178194
message_lines = [json_encodeable()]
@@ -213,7 +229,7 @@ def on_completion(
213229
system_instruction=system_instruction or None,
214230
)
215231
# generate the paths to upload to
216-
ref_names = self._calculate_ref_path()
232+
ref_names = self._calculate_ref_path(system_instruction)
217233

218234
def to_dict(
219235
dataclass_list: list[types.InputMessage]

util/opentelemetry-util-genai/tests/test_upload.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414

1515

1616
# pylint: disable=import-outside-toplevel,no-name-in-module
17+
import hashlib
1718
import importlib
1819
import logging
1920
import sys
2021
import threading
22+
import time
2123
from contextlib import contextmanager
2224
from typing import Any
2325
from unittest import TestCase
@@ -120,13 +122,15 @@ def setUp(self):
120122
mock_fsspec = self._fsspec_patcher.start()
121123
self.mock_fs = ThreadSafeMagicMock()
122124
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""
125+
self.mock_fs.exists.return_value = False
123126

124127
self.hook = UploadCompletionHook(
125128
base_path=BASE_PATH,
126129
max_size=MAXSIZE,
127130
)
128131

129132
def tearDown(self) -> None:
133+
self.mock_fs.reset_mock()
130134
self.hook.shutdown()
131135
self._fsspec_patcher.stop()
132136

@@ -162,6 +166,46 @@ def test_upload_then_shutdown(self):
162166
"should have uploaded 3 files",
163167
)
164168

169+
def test_system_insruction_is_hashed_to_avoid_reupload(self):
170+
system_instructions = [
171+
types.Text(content="You are a helpful assistant."),
172+
types.Text(content="You will do your best."),
173+
]
174+
md5_hash = hashlib.md5()
175+
md5_hash.update(
176+
"\n".join(x.content for x in system_instructions).encode("utf-8")
177+
)
178+
expected_hash = md5_hash.hexdigest()
179+
record = LogRecord()
180+
self.hook.on_completion(
181+
inputs=[],
182+
outputs=[],
183+
system_instruction=system_instructions,
184+
log_record=record,
185+
)
186+
# Wait a bit for file upload to finish..
187+
time.sleep(0.5)
188+
self.mock_fs.exists.return_value = True
189+
self.hook.on_completion(
190+
inputs=[],
191+
outputs=[],
192+
system_instruction=system_instructions,
193+
log_record=record,
194+
)
195+
# all items should be consumed
196+
self.hook.shutdown()
197+
198+
self.assertEqual(
199+
self.mock_fs.open.call_count,
200+
1,
201+
"should have uploaded 1 file",
202+
)
203+
assert record.attributes is not None
204+
self.assertEqual(
205+
record.attributes["gen_ai.system_instructions_ref"].split("/")[-1],
206+
f"{expected_hash}_system_instruction.json",
207+
)
208+
165209
def test_upload_when_inputs_outputs_empty(self):
166210
record = LogRecord()
167211
self.hook.on_completion(

0 commit comments

Comments
 (0)