Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Migrate off the deprecated events API to use the logs API
([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624))

## Version 0.3b0 (2025-07-08)

- Add automatic instrumentation to tool call functions ([#3446](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3446))
Expand Down
1 change: 1 addition & 0 deletions typings/fsspec/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ class AbstractFileSystem(RealAbstractFileSystem):
def open(
self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any
) -> io.TextIOWrapper: ...
def exists(self, path: str) -> bool: ...

def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ...
4 changes: 2 additions & 2 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))
- Make inputs / outputs / system instructions optional params to `on_completion`,
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)).
- `opentelemetry-instrumentation-google-genai`: migrate off the deprecated events API to use the logs API
([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624))
- Use an crc32 checksum of the system instructions as it's upload filename, and check
if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)).


## Version 0.1b0 (2025-09-25)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

from __future__ import annotations

import hashlib
import logging
import posixpath
import threading
from collections import OrderedDict
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import ExitStack
from dataclasses import asdict, dataclass
Expand Down Expand Up @@ -73,8 +75,16 @@ class CompletionRefs:

JsonEncodeable = list[dict[str, Any]]

# mapping of upload path to function computing upload data dict
UploadData = dict[str, Callable[[], JsonEncodeable]]
# mapping of upload path and whether the contents were hashed to the filename to function computing upload data dict
UploadData = dict[tuple[str, bool], Callable[[], JsonEncodeable]]


def is_system_instructions_hashable(
system_instruction: list[types.MessagePart] | None,
) -> bool:
return bool(system_instruction) and all(
isinstance(x, types.Text) for x in system_instruction
)


class UploadCompletionHook(CompletionHook):
Expand All @@ -97,10 +107,13 @@ def __init__(
base_path: str,
max_size: int = 20,
upload_format: Format | None = None,
lru_cache_max_size: int = 1024,
) -> None:
self._max_size = max_size
self._fs, base_path = fsspec.url_to_fs(base_path)
self._base_path = self._fs.unstrip_protocol(base_path)
self.lru_dict: OrderedDict[str, bool] = OrderedDict()
self.lru_cache_max_size = lru_cache_max_size

if upload_format not in _FORMATS + (None,):
raise ValueError(
Expand Down Expand Up @@ -132,7 +145,10 @@ def done(future: Future[None]) -> None:
finally:
self._semaphore.release()

for path, json_encodeable in upload_data.items():
for (
path,
contents_hashed_to_filename,
), json_encodeable in upload_data.items():
# could not acquire, drop data
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
_logger.warning(
Expand All @@ -143,7 +159,10 @@ def done(future: Future[None]) -> None:

try:
fut = self._executor.submit(
self._do_upload, path, json_encodeable
self._do_upload,
path,
contents_hashed_to_filename,
json_encodeable,
)
fut.add_done_callback(done)
except RuntimeError:
Expand All @@ -152,10 +171,20 @@ def done(future: Future[None]) -> None:
)
self._semaphore.release()

def _calculate_ref_path(self) -> CompletionRefs:
def _calculate_ref_path(
self, system_instruction: list[types.MessagePart]
) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
# gen_ai.response.id from the active span.

system_instruction_hash = None
if is_system_instructions_hashable(system_instruction):
# Get a hash of the text.
system_instruction_hash = hashlib.sha256(
"\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType]
"utf-8"
),
usedforsecurity=False,
).hexdigest()
uuid_str = str(uuid4())
return CompletionRefs(
inputs_ref=posixpath.join(
Expand All @@ -166,13 +195,32 @@ def _calculate_ref_path(self) -> CompletionRefs:
),
system_instruction_ref=posixpath.join(
self._base_path,
f"{uuid_str}_system_instruction.{self._format}",
f"{system_instruction_hash or uuid_str}_system_instruction.{self._format}",
),
)

def _file_exists(self, path: str) -> bool:
if path in self.lru_dict:
self.lru_dict.move_to_end(path)
return True
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists
file_exists = self._fs.exists(path)
# don't cache this because soon the file will exist..
if not file_exists:
return False
self.lru_dict[path] = True
if len(self.lru_dict) > self.lru_cache_max_size:
self.lru_dict.popitem(last=False)
return True

def _do_upload(
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
self,
path: str,
contents_hashed_to_filename: bool,
json_encodeable: Callable[[], JsonEncodeable],
) -> None:
if contents_hashed_to_filename and self._file_exists(path):
return
if self._format == "json":
# output as a single line with the json messages array
message_lines = [json_encodeable()]
Expand All @@ -194,6 +242,11 @@ def _do_upload(
gen_ai_json_dump(message, file)
file.write("\n")

if contents_hashed_to_filename:
self.lru_dict[path] = True
if len(self.lru_dict) > self.lru_cache_max_size:
self.lru_dict.popitem(last=False)

def on_completion(
self,
*,
Expand All @@ -213,7 +266,7 @@ def on_completion(
system_instruction=system_instruction or None,
)
# generate the paths to upload to
ref_names = self._calculate_ref_path()
ref_names = self._calculate_ref_path(system_instruction)

def to_dict(
dataclass_list: list[types.InputMessage]
Expand All @@ -223,35 +276,40 @@ def to_dict(
return [asdict(dc) for dc in dataclass_list]

references = [
(ref_name, ref, ref_attr)
for ref_name, ref, ref_attr in [
(ref_name, ref, ref_attr, contents_hashed_to_filename)
for ref_name, ref, ref_attr, contents_hashed_to_filename in [
(
ref_names.inputs_ref,
completion.inputs,
GEN_AI_INPUT_MESSAGES_REF,
False,
),
(
ref_names.outputs_ref,
completion.outputs,
GEN_AI_OUTPUT_MESSAGES_REF,
False,
),
(
ref_names.system_instruction_ref,
completion.system_instruction,
GEN_AI_SYSTEM_INSTRUCTIONS_REF,
is_system_instructions_hashable(
completion.system_instruction
),
),
]
if ref
if ref # Filter out empty input/output/sys instruction
]
self._submit_all(
{
ref_name: partial(to_dict, ref)
for ref_name, ref, _ in references
(ref_name, contents_hashed_to_filename): partial(to_dict, ref)
for ref_name, ref, _, contents_hashed_to_filename in references
}
)

# stamp the refs on telemetry
references = {ref_attr: name for name, _, ref_attr in references}
references = {ref_attr: name for name, _, ref_attr, _ in references}
if span:
span.set_attributes(references)
if log_record:
Expand Down
82 changes: 79 additions & 3 deletions util/opentelemetry-util-genai/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


# pylint: disable=import-outside-toplevel,no-name-in-module
import hashlib
import importlib
import logging
import sys
Expand Down Expand Up @@ -121,13 +122,14 @@ def setUp(self):
mock_fsspec = self._fsspec_patcher.start()
self.mock_fs = ThreadSafeMagicMock()
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""
self.mock_fs.exists.return_value = False

self.hook = UploadCompletionHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
base_path=BASE_PATH, max_size=MAXSIZE, lru_cache_max_size=5
)

def tearDown(self) -> None:
self.mock_fs.reset_mock()
self.hook.shutdown()
self._fsspec_patcher.stop()

Expand Down Expand Up @@ -157,13 +159,87 @@ def test_upload_then_shutdown(self):
# all items should be consumed
self.hook.shutdown()
# TODO: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3812 fix flaky test that requires sleep.
time.sleep(2)
time.sleep(0.5)
self.assertEqual(
self.mock_fs.open.call_count,
3,
"should have uploaded 3 files",
)

def test_system_insruction_is_hashed_to_avoid_reupload(self):
system_instructions = [
types.Text(content="You are a helpful assistant."),
types.Text(content="You will do your best."),
]
expected_hash = hashlib.sha256(
"\n".join(text.content for text in system_instructions).encode(
"utf-8"
),
usedforsecurity=False,
).hexdigest()
record = LogRecord()
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=system_instructions,
log_record=record,
)
# Wait a bit for file upload to finish..
time.sleep(0.5)
self.mock_fs.exists.return_value = True
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=system_instructions,
log_record=record,
)
# all items should be consumed
self.hook.shutdown()

self.assertEqual(
self.mock_fs.open.call_count,
1,
"should have uploaded 1 file",
)
assert record.attributes is not None
self.assertEqual(
record.attributes["gen_ai.system_instructions_ref"].split("/")[-1],
f"{expected_hash}_system_instruction.json",
)

def test_lru_cache_works(self):
record = LogRecord()
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=record,
)
# Wait a bit for file upload to finish..
time.sleep(0.5)
assert record.attributes is not None
self.assertTrue(
self.hook._file_exists(
record.attributes["gen_ai.system_instructions_ref"]
)
)
# LRU cache has a size of 5. So only AFTER 5 uploads should the original file be removed from the cache.
for iteration in range(5):
self.assertTrue(
record.attributes["gen_ai.system_instructions_ref"]
in self.hook.lru_dict
)
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=[types.Text(content=str(iteration))],
)
self.hook.shutdown()
self.assertFalse(
record.attributes["gen_ai.system_instructions_ref"]
in self.hook.lru_dict
)

def test_upload_when_inputs_outputs_empty(self):
record = LogRecord()
self.hook.on_completion(
Expand Down
Loading