Skip to content

Commit 6ef9bad

Browse files
committed
Stamp gen ai refs on spans and logs
1 parent 456912a commit 6ef9bad

File tree

2 files changed

+127
-10
lines changed

2 files changed

+127
-10
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
import logging
1919
import posixpath
2020
import threading
21+
from collections.abc import Mapping
2122
from concurrent.futures import Future, ThreadPoolExecutor
2223
from dataclasses import asdict, dataclass
2324
from functools import partial
2425
from os import environ
25-
from typing import Any, Callable, Literal, TextIO, cast
26+
from typing import Any, Callable, Final, Literal, TextIO, cast
2627
from uuid import uuid4
2728

2829
from opentelemetry._logs import LogRecord
30+
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
2931
from opentelemetry.trace import Span
3032
from opentelemetry.util.genai import types
3133
from opentelemetry.util.genai.environment_variables import (
@@ -39,6 +41,18 @@
3941
except ImportError:
4042
fsspec = None
4143

44+
45+
GEN_AI_INPUT_MESSAGES_REF: Final = (
46+
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
47+
)
48+
GEN_AI_OUTPUT_MESSAGES_REF: Final = (
49+
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES + "_ref"
50+
)
51+
GEN_AI_SYSTEM_INSTRUCTIONS_REF: Final = (
52+
gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref"
53+
)
54+
55+
4256
_logger = logging.getLogger(__name__)
4357

4458

@@ -201,12 +215,43 @@ def to_dict(
201215
},
202216
)
203217

204-
# TODO: stamp the refs on telemetry
218+
# stamp the refs on telemetry
219+
references = {
220+
GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref,
221+
GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref,
222+
GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref,
223+
}
224+
if span:
225+
self._set_in_span(span, references)
226+
if log_record:
227+
self._set_in_log(log_record, references)
205228

206229
def shutdown(self) -> None:
207230
# TODO: support timeout
208231
self._executor.shutdown()
209232

233+
@staticmethod
234+
def _set_in_span(span: Span, references: dict[str, str]) -> None:
235+
# stamp the refs on telemetry
236+
span.set_attributes(references)
237+
238+
@staticmethod
239+
def _set_in_log(
240+
log_record: LogRecord, references: dict[str, str]
241+
) -> None:
242+
# set in both attributes and body until they are consolidated
243+
# https://github.com/open-telemetry/semantic-conventions/issues/1870
244+
log_record.attributes = {
245+
**(log_record.attributes or {}),
246+
**references,
247+
}
248+
249+
if log_record.body is None or isinstance(log_record.body, Mapping):
250+
log_record.body = {
251+
**(log_record.body or {}),
252+
**references,
253+
}
254+
210255
def fsspec_upload_hook() -> UploadHook:
211256
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
212257
if not base_path:

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
from unittest.mock import MagicMock, patch
2626

2727
import fsspec
28-
from fsspec.implementations.memory import MemoryFileSystem
2928

29+
from opentelemetry._logs import LogRecord
3030
from opentelemetry.test.test_base import TestBase
3131
from opentelemetry.util.genai import types
3232
from opentelemetry.util.genai._fsspec_upload import (
@@ -195,9 +195,6 @@ def test_upload(self):
195195

196196

197197
class TestFsspecUploadHookIntegration(TestBase):
198-
def setUp(self):
199-
MemoryFileSystem.store.clear()
200-
201198
def assert_fsspec_equal(self, path: str, value: str) -> None:
202199
with fsspec.open(path, "r") as file:
203200
self.assertEqual(file.read(), value)
@@ -207,12 +204,87 @@ def test_upload_completions(self):
207204
uploader=FsspecUploader(),
208205
base_path=BASE_PATH,
209206
)
207+
tracer = self.tracer_provider.get_tracer(__name__)
208+
log_record = LogRecord()
209+
210+
with tracer.start_as_current_span("chat mymodel") as span:
211+
hook.upload(
212+
inputs=FAKE_INPUTS,
213+
outputs=FAKE_OUTPUTS,
214+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
215+
span=span,
216+
log_record=log_record,
217+
)
218+
hook.shutdown()
219+
220+
finished_spans = self.get_finished_spans()
221+
self.assertEqual(len(finished_spans), 1)
222+
span = finished_spans[0]
223+
224+
# span attributes, log attributes, and log body have refs
225+
for attributes in [
226+
span.attributes,
227+
log_record.attributes,
228+
log_record.body,
229+
]:
230+
for ref_key in [
231+
"gen_ai.input.messages_ref",
232+
"gen_ai.output.messages_ref",
233+
"gen_ai.system_instructions_ref",
234+
]:
235+
self.assertIn(ref_key, attributes)
236+
237+
self.assert_fsspec_equal(
238+
span.attributes["gen_ai.input.messages_ref"],
239+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
240+
)
241+
self.assert_fsspec_equal(
242+
span.attributes["gen_ai.output.messages_ref"],
243+
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
244+
)
245+
self.assert_fsspec_equal(
246+
span.attributes["gen_ai.system_instructions_ref"],
247+
'[{"content":"You are a helpful assistant.","type":"text"}]',
248+
)
249+
250+
@staticmethod
251+
def upload_with_log(log_record: LogRecord):
252+
hook = FsspecUploadHook(
253+
uploader=FsspecUploader(),
254+
base_path=BASE_PATH,
255+
)
256+
210257
hook.upload(
211258
inputs=FAKE_INPUTS,
212259
outputs=FAKE_OUTPUTS,
213260
system_instruction=FAKE_SYSTEM_INSTRUCTION,
261+
log_record=log_record,
214262
)
215-
216-
fs = fsspec.open(BASE_PATH).fs
217-
self.assertEqual(len(fs.ls(BASE_PATH)), 3)
218-
# TODO: test stamped telemetry
263+
hook.shutdown()
264+
265+
def test_stamps_empty_log(self):
266+
log_record = LogRecord()
267+
self.upload_with_log(log_record)
268+
269+
# stamp on both body and attributes
270+
self.assertIn("gen_ai.input.messages_ref", log_record.attributes)
271+
self.assertIn("gen_ai.output.messages_ref", log_record.attributes)
272+
self.assertIn("gen_ai.system_instructions_ref", log_record.attributes)
273+
self.assertIn("gen_ai.input.messages_ref", log_record.body)
274+
self.assertIn("gen_ai.output.messages_ref", log_record.body)
275+
self.assertIn("gen_ai.system_instructions_ref", log_record.body)
276+
277+
def test_stamps_log_with_map_body(self):
278+
log_record = LogRecord(body={"hello": "world"})
279+
self.upload_with_log(log_record)
280+
281+
# stamp on both body and attributes, preserving existing
282+
self.assertEqual(log_record.body["hello"], "world")
283+
self.assertIn("gen_ai.input.messages_ref", log_record.body)
284+
self.assertIn("gen_ai.output.messages_ref", log_record.body)
285+
self.assertIn("gen_ai.system_instructions_ref", log_record.body)
286+
287+
def test_doesnt_stamp_log_string_body(self):
288+
log_record = LogRecord(body="hello world")
289+
self.upload_with_log(log_record)
290+
self.assertEqual(log_record.body, "hello world")

0 commit comments

Comments
 (0)