diff --git a/opentelemetry-operations-python b/opentelemetry-operations-python new file mode 160000 index 0000000000..6358cf5626 --- /dev/null +++ b/opentelemetry-operations-python @@ -0,0 +1 @@ +Subproject commit 6358cf56263a875224c3db7fee79b40144866f15 diff --git a/typings/fsspec/__init__.pyi b/typings/fsspec/__init__.pyi index 008a62a4ed..e02c5f278a 100644 --- a/typings/fsspec/__init__.pyi +++ b/typings/fsspec/__init__.pyi @@ -27,5 +27,6 @@ class AbstractFileSystem(RealAbstractFileSystem): def open( self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any ) -> io.TextIOWrapper: ... + def exists(self, path: str) -> bool: ... def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ... diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 4b4795bcd7..3c132b3f4f 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -15,8 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795)) - Make inputs / outputs / system instructions optional params to `on_completion`, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)). - - `opentelemetry-instrumentation-google-genai`: migrate off the deprecated events API to use the logs API - ([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624)) + - Use a SHA256 hash of the system instructions as it's upload filename, and check + if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)). ## Version 0.1b0 (2025-09-25) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index 86cb4f0c51..42c62cd789 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -15,9 +15,11 @@ from __future__ import annotations +import hashlib import logging import posixpath import threading +from collections import OrderedDict from concurrent.futures import Future, ThreadPoolExecutor from contextlib import ExitStack from dataclasses import asdict, dataclass @@ -73,8 +75,16 @@ class CompletionRefs: JsonEncodeable = list[dict[str, Any]] -# mapping of upload path to function computing upload data dict -UploadData = dict[str, Callable[[], JsonEncodeable]] +# mapping of upload path and whether the contents were hashed to the filename to function computing upload data dict +UploadData = dict[tuple[str, bool], Callable[[], JsonEncodeable]] + + +def is_system_instructions_hashable( + system_instruction: list[types.MessagePart] | None, +) -> bool: + return bool(system_instruction) and all( + isinstance(x, types.Text) for x in system_instruction + ) class UploadCompletionHook(CompletionHook): @@ -97,10 +107,13 @@ def __init__( base_path: str, max_size: int = 20, upload_format: Format | None = None, + lru_cache_max_size: int = 1024, ) -> None: self._max_size = max_size self._fs, base_path = fsspec.url_to_fs(base_path) self._base_path = self._fs.unstrip_protocol(base_path) + self.lru_dict: OrderedDict[str, bool] = OrderedDict() + self.lru_cache_max_size = lru_cache_max_size if upload_format not in _FORMATS + (None,): raise ValueError( @@ -132,7 +145,10 @@ def done(future: Future[None]) -> None: finally: self._semaphore.release() - for path, json_encodeable in upload_data.items(): + for ( + path, + contents_hashed_to_filename, + ), json_encodeable in upload_data.items(): # could not acquire, drop data if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with _logger.warning( @@ -143,7 +159,10 @@ def done(future: Future[None]) -> None: try: fut = self._executor.submit( - self._do_upload, path, json_encodeable + self._do_upload, + path, + contents_hashed_to_filename, + json_encodeable, ) fut.add_done_callback(done) except RuntimeError: @@ -152,10 +171,20 @@ def done(future: Future[None]) -> None: ) self._semaphore.release() - def _calculate_ref_path(self) -> CompletionRefs: + def _calculate_ref_path( + self, system_instruction: list[types.MessagePart] + ) -> CompletionRefs: # TODO: experimental with using the trace_id and span_id, or fetching # gen_ai.response.id from the active span. - + system_instruction_hash = None + if is_system_instructions_hashable(system_instruction): + # Get a hash of the text. + system_instruction_hash = hashlib.sha256( + "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] + "utf-8" + ), + usedforsecurity=False, + ).hexdigest() uuid_str = str(uuid4()) return CompletionRefs( inputs_ref=posixpath.join( @@ -166,13 +195,32 @@ def _calculate_ref_path(self) -> CompletionRefs: ), system_instruction_ref=posixpath.join( self._base_path, - f"{uuid_str}_system_instruction.{self._format}", + f"{system_instruction_hash or uuid_str}_system_instruction.{self._format}", ), ) + def _file_exists(self, path: str) -> bool: + if path in self.lru_dict: + self.lru_dict.move_to_end(path) + return True + # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists + file_exists = self._fs.exists(path) + # don't cache this because soon the file will exist.. + if not file_exists: + return False + self.lru_dict[path] = True + if len(self.lru_dict) > self.lru_cache_max_size: + self.lru_dict.popitem(last=False) + return True + def _do_upload( - self, path: str, json_encodeable: Callable[[], JsonEncodeable] + self, + path: str, + contents_hashed_to_filename: bool, + json_encodeable: Callable[[], JsonEncodeable], ) -> None: + if contents_hashed_to_filename and self._file_exists(path): + return if self._format == "json": # output as a single line with the json messages array message_lines = [json_encodeable()] @@ -194,6 +242,11 @@ def _do_upload( gen_ai_json_dump(message, file) file.write("\n") + if contents_hashed_to_filename: + self.lru_dict[path] = True + if len(self.lru_dict) > self.lru_cache_max_size: + self.lru_dict.popitem(last=False) + def on_completion( self, *, @@ -213,7 +266,7 @@ def on_completion( system_instruction=system_instruction or None, ) # generate the paths to upload to - ref_names = self._calculate_ref_path() + ref_names = self._calculate_ref_path(system_instruction) def to_dict( dataclass_list: list[types.InputMessage] @@ -223,35 +276,40 @@ def to_dict( return [asdict(dc) for dc in dataclass_list] references = [ - (ref_name, ref, ref_attr) - for ref_name, ref, ref_attr in [ + (ref_name, ref, ref_attr, contents_hashed_to_filename) + for ref_name, ref, ref_attr, contents_hashed_to_filename in [ ( ref_names.inputs_ref, completion.inputs, GEN_AI_INPUT_MESSAGES_REF, + False, ), ( ref_names.outputs_ref, completion.outputs, GEN_AI_OUTPUT_MESSAGES_REF, + False, ), ( ref_names.system_instruction_ref, completion.system_instruction, GEN_AI_SYSTEM_INSTRUCTIONS_REF, + is_system_instructions_hashable( + completion.system_instruction + ), ), ] - if ref + if ref # Filter out empty input/output/sys instruction ] self._submit_all( { - ref_name: partial(to_dict, ref) - for ref_name, ref, _ in references + (ref_name, contents_hashed_to_filename): partial(to_dict, ref) + for ref_name, ref, _, contents_hashed_to_filename in references } ) # stamp the refs on telemetry - references = {ref_attr: name for name, _, ref_attr in references} + references = {ref_attr: name for name, _, ref_attr, _ in references} if span: span.set_attributes(references) if log_record: diff --git a/util/opentelemetry-util-genai/tests/test_upload.py b/util/opentelemetry-util-genai/tests/test_upload.py index 028ab8f1e2..bed27e9d5e 100644 --- a/util/opentelemetry-util-genai/tests/test_upload.py +++ b/util/opentelemetry-util-genai/tests/test_upload.py @@ -121,10 +121,10 @@ def setUp(self): mock_fsspec = self._fsspec_patcher.start() self.mock_fs = ThreadSafeMagicMock() mock_fsspec.url_to_fs.return_value = self.mock_fs, "" + self.mock_fs.exists.return_value = False self.hook = UploadCompletionHook( - base_path=BASE_PATH, - max_size=MAXSIZE, + base_path=BASE_PATH, max_size=MAXSIZE, lru_cache_max_size=5 ) def tearDown(self) -> None: @@ -157,13 +157,46 @@ def test_upload_then_shutdown(self): # all items should be consumed self.hook.shutdown() # TODO: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3812 fix flaky test that requires sleep. - time.sleep(2) + time.sleep(0.5) self.assertEqual( self.mock_fs.open.call_count, 3, "should have uploaded 3 files", ) + def test_lru_cache_works(self): + record = LogRecord() + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=record, + ) + # Wait a bit for file upload to finish.. + time.sleep(0.5) + self.assertIsNotNone(record.attributes) + self.assertTrue( + self.hook._file_exists( + record.attributes["gen_ai.system_instructions_ref"] + ) + ) + # LRU cache has a size of 5. So only AFTER 5 uploads should the original file be removed from the cache. + for iteration in range(5): + self.assertTrue( + record.attributes["gen_ai.system_instructions_ref"] + in self.hook.lru_dict + ) + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=[types.Text(content=str(iteration))], + ) + self.hook.shutdown() + self.assertFalse( + record.attributes["gen_ai.system_instructions_ref"] + in self.hook.lru_dict + ) + def test_upload_when_inputs_outputs_empty(self): record = LogRecord() self.hook.on_completion( @@ -180,7 +213,7 @@ def test_upload_when_inputs_outputs_empty(self): 1, "should have uploaded 1 file", ) - assert record.attributes is not None + self.assertIsNotNone(record.attributes) for ref_key in [ "gen_ai.input.messages_ref", "gen_ai.output.messages_ref", @@ -335,6 +368,39 @@ def assert_fsspec_equal(self, path: str, value: str) -> None: with fsspec.open(path, "r") as file: self.assertEqual(file.read(), value) + def test_system_insruction_is_hashed_to_avoid_reupload(self): + expected_hash = ( + "7e35acac4feca03ab47929d4cc6cfef1df2190ae1ee1752196a05ffc2a6cb360" + ) + # Create the file before upload.. + expected_file_name = ( + f"memory://{expected_hash}_system_instruction.json" + ) + with fsspec.open(expected_file_name, "wb") as file: + file.write(b"asg") + # FIle should exist. + self.assertTrue(self.hook._file_exists(expected_file_name)) + system_instructions = [ + types.Text(content="You are a helpful assistant."), + types.Text(content="You will do your best."), + ] + record = LogRecord() + self.hook.on_completion( + inputs=[], + outputs=[], + system_instruction=system_instructions, + log_record=record, + ) + self.hook.shutdown() + self.assertIsNotNone(record.attributes) + + self.assertEqual( + record.attributes["gen_ai.system_instructions_ref"], + expected_file_name, + ) + # Content should not have been overwritten. + self.assert_fsspec_equal(expected_file_name, "asg") + def test_upload_completions(self): tracer = self.tracer_provider.get_tracer(__name__) log_record = LogRecord()