From 3157b97006c4e349bc457a2f6913b9594037c00d Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 7 Oct 2025 17:47:10 +0000 Subject: [PATCH 01/12] Use md5 hash of system instructions as the filename for system instructions. --- .../util/genai/_upload/completion_hook.py | 24 ++++++++-- .../tests/test_upload.py | 44 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 86cb4f0c51..f377779895 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -15,6 +15,7 @@ from __future__ import annotations +import hashlib import logging import posixpath import threading @@ -152,10 +153,21 @@ def done(future: Future[None]) -> None: ) self._semaphore.release() - def _calculate_ref_path(self) -> CompletionRefs: + def _calculate_ref_path( + self, system_instruction: list[types.MessagePart] + ) -> CompletionRefs: # TODO: experimental with using the trace_id and span_id, or fetching # gen_ai.response.id from the active span. - + system_instruction_hash = None + # Use an md5 hash of the system instructions as a filename, when system instructions are text. + if all(isinstance(x, types.Text) for x in system_instruction): + md5_hash = hashlib.md5() + md5_hash.update( + "\n".join(x.content for x in system_instruction).encode( + "utf-8" + ) + ) + system_instruction_hash = md5_hash.hexdigest() uuid_str = str(uuid4()) return CompletionRefs( inputs_ref=posixpath.join( @@ -166,13 +178,17 @@ def _calculate_ref_path(self) -> CompletionRefs: ), system_instruction_ref=posixpath.join( self._base_path, - f"{uuid_str}_system_instruction.{self._format}", + f"{system_instruction_hash or uuid_str}_system_instruction.{self._format}", ), ) def _do_upload( self, path: str, json_encodeable: Callable[[], JsonEncodeable] ) -> None: + # FileSystem class has this method. Only check for system instructions as that's the only where the filename is a hash. + # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists + if "_system_instruction" in path and self._fs.exists(path): + return if self._format == "json": # output as a single line with the json messages array message_lines = [json_encodeable()] @@ -213,7 +229,7 @@ def on_completion( system_instruction=system_instruction or None, ) # generate the paths to upload to - ref_names = self._calculate_ref_path() + ref_names = self._calculate_ref_path(system_instruction) def to_dict( dataclass_list: list[types.InputMessage] diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index ae43d3b4a7..fecd09b684 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -14,10 +14,12 @@ # pylint: disable=import-outside-toplevel,no-name-in-module +import hashlib import importlib import logging import sys import threading +import time from contextlib import contextmanager from typing import Any from unittest import TestCase @@ -120,6 +122,7 @@ def setUp(self): mock_fsspec = self._fsspec_patcher.start() self.mock_fs = ThreadSafeMagicMock() mock_fsspec.url_to_fs.return_value = self.mock_fs, "" + self.mock_fs.exists.return_value = False self.hook = UploadCompletionHook( base_path=BASE_PATH, @@ -127,6 +130,7 @@ def setUp(self): ) def tearDown(self) -> None: + self.mock_fs.reset_mock() self.hook.shutdown() self._fsspec_patcher.stop() @@ -162,6 +166,46 @@ def test_upload_then_shutdown(self): "should have uploaded 3 files", ) + def test_system_insruction_is_hashed_to_avoid_reupload(self): + system_instructions = [ + types.Text(content="You are a helpful assistant."), + types.Text(content="You will do your best."), + ] + md5_hash = hashlib.md5() + md5_hash.update( + "\n".join(x.content for x in system_instructions).encode("utf-8") + ) + expected_hash = md5_hash.hexdigest() + record = LogRecord() + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=system_instructions, + log_record=record, + ) + # Wait a bit for file upload to finish.. + time.sleep(0.5) + self.mock_fs.exists.return_value = True + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=system_instructions, + log_record=record, + ) + # all items should be consumed + self.hook.shutdown() + + self.assertEqual( + self.mock_fs.open.call_count, + 1, + "should have uploaded 1 file", + ) + assert record.attributes is not None + self.assertEqual( + record.attributes["gen_ai.system_instructions_ref"].split("/")[-1], + f"{expected_hash}_system_instruction.json", + ) + def test_upload_when_inputs_outputs_empty(self): record = LogRecord() self.hook.on_completion( From 6497376cc5a47d47c409eb5e8ea1301f0f9657e9 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 7 Oct 2025 18:04:56 +0000 Subject: [PATCH 02/12] Fix typecheck add changelog --- util/opentelemetry-util-genai/CHANGELOG.md | 3 +++ .../src/opentelemetry/util/genai/_upload/completion_hook.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index ee539d6b15..251b66ad15 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795)) - Make inputs / outputs / system instructions optional params to `on_completion`, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)). +- Use an md5 hash of the system instructions as it's upload filename, and check + if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)). + ## Version 0.1b0 (2025-09-25) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index f377779895..95b0286729 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -163,7 +163,7 @@ def _calculate_ref_path( if all(isinstance(x, types.Text) for x in system_instruction): md5_hash = hashlib.md5() md5_hash.update( - "\n".join(x.content for x in system_instruction).encode( + "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] "utf-8" ) ) @@ -187,7 +187,7 @@ def _do_upload( ) -> None: # FileSystem class has this method. Only check for system instructions as that's the only where the filename is a hash. # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists - if "_system_instruction" in path and self._fs.exists(path): + if "_system_instruction" in path and self._fs.exists(path): # pyright: ignore[reportUnknownMemberType] return if self._format == "json": # output as a single line with the json messages array From 5505a381772e58ff237a225732fe3ea592d6e53c Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 8 Oct 2025 17:08:15 +0000 Subject: [PATCH 03/12] Respond to comments --- typings/fsspec/__init__.pyi | 1 + .../util/genai/_upload/completion_hook.py | 27 +++++++++++-------- .../tests/test_upload.py | 12 +++++---- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/typings/fsspec/__init__.pyi b/typings/fsspec/__init__.pyi index 008a62a4ed..e02c5f278a 100644 --- a/typings/fsspec/__init__.pyi +++ b/typings/fsspec/__init__.pyi @@ -27,5 +27,6 @@ class AbstractFileSystem(RealAbstractFileSystem): def open( self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any ) -> io.TextIOWrapper: ... + def exists(self, path: str) -> bool: ... def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ... diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 95b0286729..2bbdf9cbac 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -15,14 +15,14 @@ from __future__ import annotations -import hashlib +import binascii import logging import posixpath import threading from concurrent.futures import Future, ThreadPoolExecutor from contextlib import ExitStack from dataclasses import asdict, dataclass -from functools import partial +from functools import lru_cache, partial from os import environ from time import time from typing import Any, Callable, Final, Literal @@ -159,15 +159,15 @@ def _calculate_ref_path( # TODO: experimental with using the trace_id and span_id, or fetching # gen_ai.response.id from the active span. system_instruction_hash = None - # Use an md5 hash of the system instructions as a filename, when system instructions are text. if all(isinstance(x, types.Text) for x in system_instruction): - md5_hash = hashlib.md5() - md5_hash.update( - "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] - "utf-8" + # Get a checksum of the text. + system_instruction_hash = hex( + binascii.crc32( + "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] + "utf-8" + ) ) ) - system_instruction_hash = md5_hash.hexdigest() uuid_str = str(uuid4()) return CompletionRefs( inputs_ref=posixpath.join( @@ -182,12 +182,17 @@ def _calculate_ref_path( ), ) + @lru_cache(maxsize=512) + def _file_exists(self, path: str) -> bool: + # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists + return self._fs.exists(path) + def _do_upload( self, path: str, json_encodeable: Callable[[], JsonEncodeable] ) -> None: - # FileSystem class has this method. Only check for system instructions as that's the only where the filename is a hash. - # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists - if "_system_instruction" in path and self._fs.exists(path): # pyright: ignore[reportUnknownMemberType] + # Only check for system instruction file existence as that's the only file where the filename is a hash + # of the content. + if "_system_instruction" in path and self._file_exists(path): return if self._format == "json": # output as a single line with the json messages array diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index fecd09b684..a1b1160207 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -14,7 +14,7 @@ # pylint: disable=import-outside-toplevel,no-name-in-module -import hashlib +import binascii import importlib import logging import sys @@ -171,11 +171,13 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): types.Text(content="You are a helpful assistant."), types.Text(content="You will do your best."), ] - md5_hash = hashlib.md5() - md5_hash.update( - "\n".join(x.content for x in system_instructions).encode("utf-8") + expected_hash = hex( + binascii.crc32( + "\n".join(x.content for x in system_instructions).encode( + "utf-8" + ) + ) ) - expected_hash = md5_hash.hexdigest() record = LogRecord() self.hook.on_completion( inputs=[], From 6589de38c7b1ed61f69b8268f3e68fe103e1c49b Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Thu, 9 Oct 2025 18:30:34 +0000 Subject: [PATCH 04/12] Merge to main and fix changelog --- .../opentelemetry-instrumentation-google-genai/CHANGELOG.md | 3 +++ util/opentelemetry-util-genai/CHANGELOG.md | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md index 39a9aa5eee..ca90de8515 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased + - Migrate off the deprecated events API to use the logs API + ([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624)) + ## Version 0.3b0 (2025-07-08) - Add automatic instrumentation to tool call functions ([#3446](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3446)) diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index b5faafb305..d885dc17e4 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -15,8 +15,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795)) - Make inputs / outputs / system instructions optional params to `on_completion`, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)). - - `opentelemetry-instrumentation-google-genai`: migrate off the deprecated events API to use the logs API - ([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624)) - Use an crc32 checksum of the system instructions as it's upload filename, and check if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)). From 8200c8888d3d4ea443a8e8edc4d9ca1eb09a8f01 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Thu, 9 Oct 2025 20:34:51 +0000 Subject: [PATCH 05/12] Switch to Sha 256 --- .../util/genai/_upload/completion_hook.py | 17 ++++++++--------- .../tests/test_upload.py | 17 +++++++---------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 2bbdf9cbac..16cfaf8c4e 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -15,7 +15,7 @@ from __future__ import annotations -import binascii +import hashlib import logging import posixpath import threading @@ -160,14 +160,13 @@ def _calculate_ref_path( # gen_ai.response.id from the active span. system_instruction_hash = None if all(isinstance(x, types.Text) for x in system_instruction): - # Get a checksum of the text. - system_instruction_hash = hex( - binascii.crc32( - "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] - "utf-8" - ) - ) - ) + # Get a hash of the text. + system_instruction_hash = hashlib.sha256( + "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] + "utf-8" + ), + usedforsecurity=False, + ).hexdigest() uuid_str = str(uuid4()) return CompletionRefs( inputs_ref=posixpath.join( diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index a37b5fec04..2403474df5 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -14,7 +14,7 @@ # pylint: disable=import-outside-toplevel,no-name-in-module -import binascii +import hashlib import importlib import logging import sys @@ -160,7 +160,7 @@ def test_upload_then_shutdown(self): # all items should be consumed self.hook.shutdown() # TODO: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3812 fix flaky test that requires sleep. - time.sleep(2) + time.sleep(0.5) self.assertEqual( self.mock_fs.open.call_count, 3, @@ -172,13 +172,10 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): types.Text(content="You are a helpful assistant."), types.Text(content="You will do your best."), ] - expected_hash = hex( - binascii.crc32( - "\n".join(x.content for x in system_instructions).encode( - "utf-8" - ) - ) - ) + expected_hash = hashlib.sha256( + "\n".join(x.content for x in system_instructions).encode("utf-8"), + usedforsecurity=False, + ).hexdigest() record = LogRecord() self.hook.on_completion( inputs=[], @@ -187,7 +184,7 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): log_record=record, ) # Wait a bit for file upload to finish.. - time.sleep(0.5) + time.sleep(2) self.mock_fs.exists.return_value = True self.hook.on_completion( inputs=[], From ce28e72c2dc96ba4b0bb02fb08ebe90a47abc360 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 10 Oct 2025 14:50:12 +0000 Subject: [PATCH 06/12] Make changes --- .../util/genai/_upload/completion_hook.py | 31 +++++++++++++-- .../tests/test_upload.py | 38 +++++++++++++++++-- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 16cfaf8c4e..d895a71d7b 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -19,10 +19,11 @@ import logging import posixpath import threading +from collections import OrderedDict from concurrent.futures import Future, ThreadPoolExecutor from contextlib import ExitStack from dataclasses import asdict, dataclass -from functools import lru_cache, partial +from functools import partial from os import environ from time import time from typing import Any, Callable, Final, Literal @@ -78,6 +79,12 @@ class CompletionRefs: UploadData = dict[str, Callable[[], JsonEncodeable]] +def is_system_instructions_hashable( + system_instruction: list[types.MessagePart], +) -> bool: + return all(isinstance(x, types.Text) for x in system_instruction) + + class UploadCompletionHook(CompletionHook): """An completion hook using ``fsspec`` to upload to external storage @@ -98,10 +105,13 @@ def __init__( base_path: str, max_size: int = 20, upload_format: Format | None = None, + lru_cache_max_size: int = 1024, ) -> None: self._max_size = max_size self._fs, base_path = fsspec.url_to_fs(base_path) self._base_path = self._fs.unstrip_protocol(base_path) + self.lru_dict = OrderedDict() + self.lru_cache_max_size = lru_cache_max_size if upload_format not in _FORMATS + (None,): raise ValueError( @@ -159,7 +169,7 @@ def _calculate_ref_path( # TODO: experimental with using the trace_id and span_id, or fetching # gen_ai.response.id from the active span. system_instruction_hash = None - if all(isinstance(x, types.Text) for x in system_instruction): + if is_system_instructions_hashable(system_instruction): # Get a hash of the text. system_instruction_hash = hashlib.sha256( "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] @@ -181,10 +191,18 @@ def _calculate_ref_path( ), ) - @lru_cache(maxsize=512) def _file_exists(self, path: str) -> bool: + if path in self.lru_dict: + self.lru_dict.move_to_end(path) + return True # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists - return self._fs.exists(path) + file_exists = self._fs.exists(path) + if not file_exists: + return False + self.lru_dict[path] = True + if len(self.lru_dict) > self.lru_cache_max_size: + self.lru_dict.popitem(last=False) + return True def _do_upload( self, path: str, json_encodeable: Callable[[], JsonEncodeable] @@ -214,6 +232,11 @@ def _do_upload( gen_ai_json_dump(message, file) file.write("\n") + if "_system_instruction" in path: + self.lru_dict[path] = True + if len(self.lru_dict) > self.lru_cache_max_size: + self.lru_dict.popitem(last=False) + def on_completion( self, *, diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index 2403474df5..44955d90af 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -125,8 +125,7 @@ def setUp(self): self.mock_fs.exists.return_value = False self.hook = UploadCompletionHook( - base_path=BASE_PATH, - max_size=MAXSIZE, + base_path=BASE_PATH, max_size=MAXSIZE, lru_cache_max_size=5 ) def tearDown(self) -> None: @@ -184,7 +183,7 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): log_record=record, ) # Wait a bit for file upload to finish.. - time.sleep(2) + time.sleep(0.5) self.mock_fs.exists.return_value = True self.hook.on_completion( inputs=[], @@ -206,6 +205,39 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): f"{expected_hash}_system_instruction.json", ) + def test_lru_cache_works(self): + record = LogRecord() + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=record, + ) + # Wait a bit for file upload to finish.. + time.sleep(0.5) + assert record.attributes is not None + self.assertTrue( + self.hook._file_exists( + record.attributes["gen_ai.system_instructions_ref"] + ) + ) + # LRU cache has a size of 5. So only AFTER 5 uploads should the original file be removed from the cache. + for x in range(5): + self.assertTrue( + record.attributes["gen_ai.system_instructions_ref"] + in self.hook.lru_dict + ) + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=[types.Text(content=str(x))], + ) + print(self.hook.lru_dict) + self.assertFalse( + record.attributes["gen_ai.system_instructions_ref"] + in self.hook.lru_dict + ) + def test_upload_when_inputs_outputs_empty(self): record = LogRecord() self.hook.on_completion( From 152057c9b0addd3af96f91df75e73d928933e8fc Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 10 Oct 2025 15:32:46 +0000 Subject: [PATCH 07/12] Fix hashing --- .../util/genai/_upload/completion_hook.py | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index d895a71d7b..5c8920f88a 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -75,8 +75,8 @@ class CompletionRefs: JsonEncodeable = list[dict[str, Any]] -# mapping of upload path to function computing upload data dict -UploadData = dict[str, Callable[[], JsonEncodeable]] +# mapping of upload path and whether the contents were hashed to the filename to function computing upload data dict +UploadData = dict[tuple[str, bool], Callable[[], JsonEncodeable]] def is_system_instructions_hashable( @@ -143,7 +143,10 @@ def done(future: Future[None]) -> None: finally: self._semaphore.release() - for path, json_encodeable in upload_data.items(): + for ( + path, + contents_hashed_to_filename, + ), json_encodeable in upload_data.items(): # could not acquire, drop data if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with _logger.warning( @@ -154,7 +157,10 @@ def done(future: Future[None]) -> None: try: fut = self._executor.submit( - self._do_upload, path, json_encodeable + self._do_upload, + path, + contents_hashed_to_filename, + json_encodeable, ) fut.add_done_callback(done) except RuntimeError: @@ -197,6 +203,7 @@ def _file_exists(self, path: str) -> bool: return True # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists file_exists = self._fs.exists(path) + # don't cache this because soon the file will exist.. if not file_exists: return False self.lru_dict[path] = True @@ -205,11 +212,12 @@ def _file_exists(self, path: str) -> bool: return True def _do_upload( - self, path: str, json_encodeable: Callable[[], JsonEncodeable] + self, + path: str, + contents_hashed_to_filename: bool, + json_encodeable: Callable[[], JsonEncodeable], ) -> None: - # Only check for system instruction file existence as that's the only file where the filename is a hash - # of the content. - if "_system_instruction" in path and self._file_exists(path): + if contents_hashed_to_filename and self._file_exists(path): return if self._format == "json": # output as a single line with the json messages array @@ -232,7 +240,7 @@ def _do_upload( gen_ai_json_dump(message, file) file.write("\n") - if "_system_instruction" in path: + if contents_hashed_to_filename: self.lru_dict[path] = True if len(self.lru_dict) > self.lru_cache_max_size: self.lru_dict.popitem(last=False) @@ -266,35 +274,40 @@ def to_dict( return [asdict(dc) for dc in dataclass_list] references = [ - (ref_name, ref, ref_attr) - for ref_name, ref, ref_attr in [ + (ref_name, ref, ref_attr, contents_hashed_to_filename) + for ref_name, ref, ref_attr, contents_hashed_to_filename in [ ( ref_names.inputs_ref, completion.inputs, GEN_AI_INPUT_MESSAGES_REF, + False, ), ( ref_names.outputs_ref, completion.outputs, GEN_AI_OUTPUT_MESSAGES_REF, + False, ), ( ref_names.system_instruction_ref, completion.system_instruction, GEN_AI_SYSTEM_INSTRUCTIONS_REF, + is_system_instructions_hashable( + completion.system_instruction + ), ), ] - if ref + if ref # Filter out empty input/output/sys instruction ] self._submit_all( { - ref_name: partial(to_dict, ref) - for ref_name, ref, _ in references + (ref_name, contents_hashed_to_filename): partial(to_dict, ref) + for ref_name, ref, _, contents_hashed_to_filename in references } ) # stamp the refs on telemetry - references = {ref_attr: name for name, _, ref_attr in references} + references = {ref_attr: name for name, _, ref_attr, _ in references} if span: span.set_attributes(references) if log_record: From d4afff9adccdc0316f11147ef428a18b0f05daf0 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 10 Oct 2025 15:54:23 +0000 Subject: [PATCH 08/12] Fix lint, typecheck, broke test --- .../util/genai/_upload/completion_hook.py | 8 +++++--- util/opentelemetry-util-genai/tests/test_upload.py | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 5c8920f88a..42c62cd789 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -80,9 +80,11 @@ class CompletionRefs: def is_system_instructions_hashable( - system_instruction: list[types.MessagePart], + system_instruction: list[types.MessagePart] | None, ) -> bool: - return all(isinstance(x, types.Text) for x in system_instruction) + return bool(system_instruction) and all( + isinstance(x, types.Text) for x in system_instruction + ) class UploadCompletionHook(CompletionHook): @@ -110,7 +112,7 @@ def __init__( self._max_size = max_size self._fs, base_path = fsspec.url_to_fs(base_path) self._base_path = self._fs.unstrip_protocol(base_path) - self.lru_dict = OrderedDict() + self.lru_dict: OrderedDict[str, bool] = OrderedDict() self.lru_cache_max_size = lru_cache_max_size if upload_format not in _FORMATS + (None,): diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index 44955d90af..7571d32478 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -172,7 +172,9 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): types.Text(content="You will do your best."), ] expected_hash = hashlib.sha256( - "\n".join(x.content for x in system_instructions).encode("utf-8"), + "\n".join(text.content for text in system_instructions).encode( + "utf-8" + ), usedforsecurity=False, ).hexdigest() record = LogRecord() @@ -222,7 +224,7 @@ def test_lru_cache_works(self): ) ) # LRU cache has a size of 5. So only AFTER 5 uploads should the original file be removed from the cache. - for x in range(5): + for iteration in range(5): self.assertTrue( record.attributes["gen_ai.system_instructions_ref"] in self.hook.lru_dict @@ -230,9 +232,9 @@ def test_lru_cache_works(self): self.hook.on_completion( inputs=[], outputs=[], - system_instruction=[types.Text(content=str(x))], + system_instruction=[types.Text(content=str(iteration))], ) - print(self.hook.lru_dict) + self.hook.shutdown() self.assertFalse( record.attributes["gen_ai.system_instructions_ref"] in self.hook.lru_dict From c5721c092ad0b2f33ab0e2f6984833962f3f40b0 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 14 Oct 2025 14:30:13 +0000 Subject: [PATCH 09/12] Respond to comments --- .../CHANGELOG.md | 3 - .../tests/test_upload.py | 76 ++++++++----------- 2 files changed, 33 insertions(+), 46 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md index ca90de8515..39a9aa5eee 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md @@ -7,9 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - - Migrate off the deprecated events API to use the logs API - ([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624)) - ## Version 0.3b0 (2025-07-08) - Add automatic instrumentation to tool call functions ([#3446](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3446)) diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index 7571d32478..b583fdfb34 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -14,7 +14,6 @@ # pylint: disable=import-outside-toplevel,no-name-in-module -import hashlib import importlib import logging import sys @@ -129,7 +128,6 @@ def setUp(self): ) def tearDown(self) -> None: - self.mock_fs.reset_mock() self.hook.shutdown() self._fsspec_patcher.stop() @@ -166,47 +164,6 @@ def test_upload_then_shutdown(self): "should have uploaded 3 files", ) - def test_system_insruction_is_hashed_to_avoid_reupload(self): - system_instructions = [ - types.Text(content="You are a helpful assistant."), - types.Text(content="You will do your best."), - ] - expected_hash = hashlib.sha256( - "\n".join(text.content for text in system_instructions).encode( - "utf-8" - ), - usedforsecurity=False, - ).hexdigest() - record = LogRecord() - self.hook.on_completion( - inputs=[], - outputs=[], - system_instruction=system_instructions, - log_record=record, - ) - # Wait a bit for file upload to finish.. - time.sleep(0.5) - self.mock_fs.exists.return_value = True - self.hook.on_completion( - inputs=[], - outputs=[], - system_instruction=system_instructions, - log_record=record, - ) - # all items should be consumed - self.hook.shutdown() - - self.assertEqual( - self.mock_fs.open.call_count, - 1, - "should have uploaded 1 file", - ) - assert record.attributes is not None - self.assertEqual( - record.attributes["gen_ai.system_instructions_ref"].split("/")[-1], - f"{expected_hash}_system_instruction.json", - ) - def test_lru_cache_works(self): record = LogRecord() self.hook.on_completion( @@ -411,6 +368,39 @@ def assert_fsspec_equal(self, path: str, value: str) -> None: with fsspec.open(path, "r") as file: self.assertEqual(file.read(), value) + def test_system_insruction_is_hashed_to_avoid_reupload(self): + expected_hash = ( + "7e35acac4feca03ab47929d4cc6cfef1df2190ae1ee1752196a05ffc2a6cb360" + ) + # Create the file before upload.. + expected_file_name = ( + f"memory://{expected_hash}_system_instruction.json" + ) + with fsspec.open(expected_file_name, "wb") as f: + f.write(b"asg") + # FIle should exist. + assert self.hook._file_exists(expected_file_name) is True + system_instructions = [ + types.Text(content="You are a helpful assistant."), + types.Text(content="You will do your best."), + ] + record = LogRecord() + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=system_instructions, + log_record=record, + ) + self.hook.shutdown() + assert record.attributes is not None + + self.assertEqual( + record.attributes["gen_ai.system_instructions_ref"], + expected_file_name, + ) + # Content should not have been overwritten. + self.assert_fsspec_equal(expected_file_name, b"asg") + def test_upload_completions(self): tracer = self.tracer_provider.get_tracer(__name__) log_record = LogRecord() From 2e01e03013fd7dfa3e1a7e4d45a1a36e007ca59e Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 14 Oct 2025 14:31:33 +0000 Subject: [PATCH 10/12] fix test --- util/opentelemetry-util-genai/tests/test_upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index b583fdfb34..1e18e7618f 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -399,7 +399,7 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): expected_file_name, ) # Content should not have been overwritten. - self.assert_fsspec_equal(expected_file_name, b"asg") + self.assert_fsspec_equal(expected_file_name, "asg") def test_upload_completions(self): tracer = self.tracer_provider.get_tracer(__name__) From abca1d5f769711a008a2e0cf2fce686cb62db645 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 14 Oct 2025 14:47:10 +0000 Subject: [PATCH 11/12] Fix linter --- util/opentelemetry-util-genai/tests/test_upload.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index 1e18e7618f..d7c6aa422c 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -376,8 +376,8 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): expected_file_name = ( f"memory://{expected_hash}_system_instruction.json" ) - with fsspec.open(expected_file_name, "wb") as f: - f.write(b"asg") + with fsspec.open(expected_file_name, "wb") as file: + file.write(b"asg") # FIle should exist. assert self.hook._file_exists(expected_file_name) is True system_instructions = [ From 2aa283a718515ec377e9162c9ddb82f96a65e1cf Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 14 Oct 2025 20:41:06 +0000 Subject: [PATCH 12/12] Fix assert statements and update changelog --- opentelemetry-operations-python | 1 + util/opentelemetry-util-genai/CHANGELOG.md | 2 +- util/opentelemetry-util-genai/tests/test_upload.py | 8 ++++---- 3 files changed, 6 insertions(+), 5 deletions(-) create mode 160000 opentelemetry-operations-python diff --git a/opentelemetry-operations-python b/opentelemetry-operations-python new file mode 160000 index 0000000000..6358cf5626 --- /dev/null +++ b/opentelemetry-operations-python @@ -0,0 +1 @@ +Subproject commit 6358cf56263a875224c3db7fee79b40144866f15 diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index cbb46fcceb..3c132b3f4f 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -15,7 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795)) - Make inputs / outputs / system instructions optional params to `on_completion`, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)). - - Use an crc32 checksum of the system instructions as it's upload filename, and check + - Use a SHA256 hash of the system instructions as it's upload filename, and check if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)). diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index d7c6aa422c..bed27e9d5e 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -174,7 +174,7 @@ def test_lru_cache_works(self): ) # Wait a bit for file upload to finish.. time.sleep(0.5) - assert record.attributes is not None + self.assertIsNotNone(record.attributes) self.assertTrue( self.hook._file_exists( record.attributes["gen_ai.system_instructions_ref"] @@ -213,7 +213,7 @@ def test_upload_when_inputs_outputs_empty(self): 1, "should have uploaded 1 file", ) - assert record.attributes is not None + self.assertIsNotNone(record.attributes) for ref_key in [ "gen_ai.input.messages_ref", "gen_ai.output.messages_ref", @@ -379,7 +379,7 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): with fsspec.open(expected_file_name, "wb") as file: file.write(b"asg") # FIle should exist. - assert self.hook._file_exists(expected_file_name) is True + self.assertTrue(self.hook._file_exists(expected_file_name)) system_instructions = [ types.Text(content="You are a helpful assistant."), types.Text(content="You will do your best."), @@ -392,7 +392,7 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self): log_record=record, ) self.hook.shutdown() - assert record.attributes is not None + self.assertIsNotNone(record.attributes) self.assertEqual( record.attributes["gen_ai.system_instructions_ref"],