Skip to content

Commit 3eb7ab0

Browse files
committed
Merge branch 'vertexai_semconv_v2' of github.com:DylanRussell/opentelemetry-python-contrib into vertexai_semconv_v2
2 parents ccd2b9f + 49c9b80 commit 3eb7ab0

File tree

3 files changed

+126
-13
lines changed

3 files changed

+126
-13
lines changed

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Add upload hook to genai utils to implement semconv v1.37.
11+
12+
The hook uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support
13+
various pluggable backends.
14+
([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
15+
([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
16+
([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763))
1017
- Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable.
1118
Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)).

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,32 @@
1919
import logging
2020
import posixpath
2121
import threading
22+
from base64 import b64encode
2223
from concurrent.futures import Future, ThreadPoolExecutor
2324
from dataclasses import asdict, dataclass
2425
from functools import partial
25-
from typing import Any, Callable, Literal, TextIO, cast
26+
from typing import Any, Callable, Final, Literal, TextIO, cast
2627
from uuid import uuid4
2728

2829
import fsspec
2930

3031
from opentelemetry._logs import LogRecord
32+
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
3133
from opentelemetry.trace import Span
3234
from opentelemetry.util.genai import types
3335
from opentelemetry.util.genai.upload_hook import UploadHook
3436

37+
GEN_AI_INPUT_MESSAGES_REF: Final = (
38+
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
39+
)
40+
GEN_AI_OUTPUT_MESSAGES_REF: Final = (
41+
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES + "_ref"
42+
)
43+
GEN_AI_SYSTEM_INSTRUCTIONS_REF: Final = (
44+
gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref"
45+
)
46+
47+
3548
_logger = logging.getLogger(__name__)
3649

3750

@@ -139,7 +152,12 @@ def _do_upload(
139152
path: str, json_encodeable: Callable[[], JsonEncodeable]
140153
) -> None:
141154
with fsspec_open(path, "w") as file:
142-
json.dump(json_encodeable(), file, separators=(",", ":"))
155+
json.dump(
156+
json_encodeable(),
157+
file,
158+
separators=(",", ":"),
159+
cls=Base64JsonEncoder,
160+
)
143161

144162
def upload(
145163
self,
@@ -177,8 +195,27 @@ def to_dict(
177195
},
178196
)
179197

180-
# TODO: stamp the refs on telemetry
198+
# stamp the refs on telemetry
199+
references = {
200+
GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref,
201+
GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref,
202+
GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref,
203+
}
204+
if span:
205+
span.set_attributes(references)
206+
if log_record:
207+
log_record.attributes = {
208+
**(log_record.attributes or {}),
209+
**references,
210+
}
181211

182212
def shutdown(self) -> None:
183213
# TODO: support timeout
184214
self._executor.shutdown()
215+
216+
217+
class Base64JsonEncoder(json.JSONEncoder):
218+
def default(self, o: Any) -> Any:
219+
if isinstance(o, bytes):
220+
return b64encode(o).decode()
221+
return super().default(o)

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515

1616
# pylint: disable=import-outside-toplevel,no-name-in-module
17-
1817
import importlib
1918
import logging
2019
import sys
@@ -25,8 +24,8 @@
2524
from unittest.mock import MagicMock, patch
2625

2726
import fsspec
28-
from fsspec.implementations.memory import MemoryFileSystem
2927

28+
from opentelemetry._logs import LogRecord
3029
from opentelemetry.test.test_base import TestBase
3130
from opentelemetry.util.genai import types
3231
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
@@ -201,23 +200,93 @@ def test_upload(self):
201200

202201
class TestFsspecUploadHookIntegration(TestBase):
203202
def setUp(self):
204-
MemoryFileSystem.store.clear()
203+
super().setUp()
204+
self.hook = FsspecUploadHook(base_path=BASE_PATH)
205+
206+
def tearDown(self):
207+
super().tearDown()
208+
self.hook.shutdown()
205209

206210
def assert_fsspec_equal(self, path: str, value: str) -> None:
207211
with fsspec.open(path, "r") as file:
208212
self.assertEqual(file.read(), value)
209213

210214
def test_upload_completions(self):
211-
hook = FsspecUploadHook(
212-
base_path=BASE_PATH,
215+
tracer = self.tracer_provider.get_tracer(__name__)
216+
log_record = LogRecord()
217+
218+
with tracer.start_as_current_span("chat mymodel") as span:
219+
self.hook.upload(
220+
inputs=FAKE_INPUTS,
221+
outputs=FAKE_OUTPUTS,
222+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
223+
span=span,
224+
log_record=log_record,
225+
)
226+
self.hook.shutdown()
227+
228+
finished_spans = self.get_finished_spans()
229+
self.assertEqual(len(finished_spans), 1)
230+
span = finished_spans[0]
231+
232+
# span attributes, log attributes, and log body have refs
233+
for attributes in [
234+
span.attributes,
235+
log_record.attributes,
236+
]:
237+
for ref_key in [
238+
"gen_ai.input.messages_ref",
239+
"gen_ai.output.messages_ref",
240+
"gen_ai.system_instructions_ref",
241+
]:
242+
self.assertIn(ref_key, attributes)
243+
244+
self.assert_fsspec_equal(
245+
span.attributes["gen_ai.input.messages_ref"],
246+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
213247
)
214-
hook.upload(
248+
self.assert_fsspec_equal(
249+
span.attributes["gen_ai.output.messages_ref"],
250+
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
251+
)
252+
self.assert_fsspec_equal(
253+
span.attributes["gen_ai.system_instructions_ref"],
254+
'[{"content":"You are a helpful assistant.","type":"text"}]',
255+
)
256+
257+
def test_stamps_empty_log(self):
258+
log_record = LogRecord()
259+
self.hook.upload(
215260
inputs=FAKE_INPUTS,
216261
outputs=FAKE_OUTPUTS,
217262
system_instruction=FAKE_SYSTEM_INSTRUCTION,
263+
log_record=log_record,
264+
)
265+
266+
# stamp on both body and attributes
267+
self.assertIn("gen_ai.input.messages_ref", log_record.attributes)
268+
self.assertIn("gen_ai.output.messages_ref", log_record.attributes)
269+
self.assertIn("gen_ai.system_instructions_ref", log_record.attributes)
270+
271+
def test_upload_bytes(self) -> None:
272+
log_record = LogRecord()
273+
self.hook.upload(
274+
inputs=[
275+
types.InputMessage(
276+
role="user",
277+
parts=[
278+
types.Text(content="What is the capital of France?"),
279+
{"type": "generic_bytes", "bytes": b"hello"},
280+
],
281+
)
282+
],
283+
outputs=FAKE_OUTPUTS,
284+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
285+
log_record=log_record,
218286
)
219-
hook.shutdown()
287+
self.hook.shutdown()
220288

221-
fs = fsspec.open(BASE_PATH).fs
222-
self.assertEqual(len(fs.ls(BASE_PATH)), 3)
223-
# TODO: test stamped telemetry
289+
self.assert_fsspec_equal(
290+
log_record.attributes["gen_ai.input.messages_ref"],
291+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]',
292+
)

0 commit comments

Comments
 (0)