Skip to content

Commit 56992b9

Browse files
committed
Uploader class for _ref events
1 parent 4f27b28 commit 56992b9

File tree

6 files changed

+174
-33
lines changed

6 files changed

+174
-33
lines changed

instrumentation-genai/opentelemetry-instrumentation-google-genai/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ classifiers = [
3737
"Programming Language :: Python :: 3.12"
3838
]
3939
dependencies = [
40+
"fsspec>=2025.5.1",
4041
"opentelemetry-api >=1.31.1, <2",
4142
"opentelemetry-instrumentation >=0.52b1, <2",
42-
"opentelemetry-semantic-conventions >=0.52b1, <2"
43+
"opentelemetry-semantic-conventions >=0.52b1, <2",
4344
]
4445

4546
[project.optional-dependencies]

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717
import json
1818
import logging
1919
import os
20-
from re import S
2120
import time
2221
from typing import Any, AsyncIterator, Awaitable, Iterator, Optional, Union
22+
from uuid import uuid4
2323

24-
from google.genai.models import AsyncModels, Models, t as transformers
24+
from google.genai.models import AsyncModels, Models
25+
from google.genai.models import t as transformers
2526
from google.genai.types import (
2627
BlockedReason,
2728
Candidate,
2829
Content,
2930
ContentListUnion,
30-
GenerateContentConfig,
31-
ContentUnion,
3231
ContentListUnionDict,
33-
Content,
3432
ContentUnion,
3533
ContentUnionDict,
3634
GenerateContentConfig,
@@ -49,14 +47,13 @@
4947
from .custom_semconv import GCP_GENAI_OPERATION_CONFIG
5048
from .dict_util import flatten_dict
5149
from .flags import is_content_recording_enabled
52-
from .otel_wrapper import OTelWrapper
53-
from .tool_call_wrapper import wrapped as wrapped_tool
5450
from .message import (
55-
ContentUnion,
5651
to_input_messages,
5752
to_output_message,
5853
to_system_instruction,
5954
)
55+
from .otel_wrapper import OTelWrapper
56+
from .tool_call_wrapper import wrapped as wrapped_tool
6057

6158
_logger = logging.getLogger(__name__)
6259

@@ -330,7 +327,7 @@ def process_completion(
330327
self._update_finish_reasons(response)
331328
self._maybe_update_token_counts(response)
332329
self._maybe_update_error_type(response)
333-
self._maybe_log_response(response)
330+
# self._maybe_log_response(response)
334331
self._response_index += 1
335332

336333
def process_error(self, e: Exception):
@@ -441,6 +438,15 @@ def _transform_content(
441438
attributes=attributes,
442439
)
443440

441+
# Forward looking remote storage refs
442+
self._otel_wrapper.log_completion_details_refs(
443+
system_instructions=system_instruction,
444+
input_messages=input_messages,
445+
output_messages=output_message,
446+
attributes=attributes,
447+
response_id=response.response_id or str(uuid4()),
448+
)
449+
444450
def _maybe_log_system_instruction(
445451
self, config: Optional[GenerateContentConfigOrDict] = None
446452
):

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,39 +13,28 @@
1313
# limitations under the License.
1414

1515
import logging
16+
17+
from google.genai.types import (
18+
Candidate,
19+
Content,
20+
Part,
21+
)
22+
1623
from .message_models import (
1724
BlobPart,
1825
ChatMessage,
1926
Choice,
2027
FileDataPart,
2128
InputMessages,
2229
MessagePart,
23-
SystemMessage,
2430
OutputMessages,
31+
SystemMessage,
2532
TextPart,
2633
ToolCallPart,
2734
ToolCallResponsePart,
2835
UnknownPart,
2936
)
3037

31-
from google.genai.models import t as transfomers
32-
33-
from google.genai.types import (
34-
BlockedReason,
35-
Candidate,
36-
Content,
37-
ContentListUnion,
38-
ContentListUnionDict,
39-
ContentUnion,
40-
GenerateContentConfig,
41-
ContentUnionDict,
42-
GenerateContentConfig,
43-
GenerateContentConfigOrDict,
44-
GenerateContentResponse,
45-
Part,
46-
PartUnion,
47-
)
48-
4938
_logger = logging.getLogger(__name__)
5039

5140

@@ -87,7 +76,9 @@ def _to_chat_message(
8776
content: Content,
8877
) -> ChatMessage:
8978
parts = content.parts or []
90-
return ChatMessage(role="system", parts=[_to_part(part) for part in parts])
79+
return ChatMessage(
80+
role=content.role, parts=[_to_part(part) for part in parts]
81+
)
9182

9283

9384
def _to_part(part: Part) -> MessagePart:

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class UnknownPart(BaseModel):
8383

8484

8585
class ChatMessage(BaseModel):
86-
role: str
86+
role: str | None
8787
parts: List[MessagePart]
8888

8989

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/otel_wrapper.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
import google.genai
1818

1919
from opentelemetry._events import Event
20-
from opentelemetry.util.types import _ExtendedAttributes
20+
from opentelemetry.instrumentation.google_genai.uploader import (
21+
upload_to_storage,
22+
)
23+
from opentelemetry.util.types import _ExtendedAttributes, AnyValue
2124
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
2225
from opentelemetry.semconv.schemas import Schemas
2326

@@ -96,7 +99,7 @@ def log_completion_details(
9699
# input_tokens: int,
97100
# output_tokens: int,
98101
) -> None:
99-
_logger.debug("Recording user prompt.")
102+
_logger.debug("Recording completion details event.")
100103
event_name = "gen_ai.completion.details"
101104

102105
body = {
@@ -111,6 +114,44 @@ def log_completion_details(
111114
event = Event(event_name, body=body, attributes=attributes)
112115
self._event_logger.emit(event)
113116

117+
def log_completion_details_refs(
118+
self,
119+
*,
120+
attributes: _ExtendedAttributes,
121+
system_instructions: SystemMessage | None,
122+
input_messages: InputMessages,
123+
output_messages: OutputMessages,
124+
response_id: str,
125+
# request_model: str,
126+
# response_model: str,
127+
# input_tokens: int,
128+
# output_tokens: int,
129+
) -> None:
130+
_logger.debug("Recording completion details event as ref.")
131+
event_name = "gen_ai.completion.details"
132+
133+
input_messages_ref = upload_to_storage(
134+
f"{response_id}_input.json",
135+
input_messages.model_dump(mode="json"),
136+
)
137+
output_messages_ref = upload_to_storage(
138+
f"{response_id}_output.json",
139+
output_messages.model_dump(mode="json"),
140+
)
141+
body = {
142+
"gen_ai.input.messages_ref": input_messages_ref,
143+
"gen_ai.output.messages_ref": output_messages_ref,
144+
}
145+
if system_instructions:
146+
system_instructions_ref = upload_to_storage(
147+
f"{response_id}_system_instructions.json",
148+
system_instructions.model_dump(mode="json"),
149+
)
150+
body["gen_ai.system.instructions_ref"] = system_instructions_ref
151+
152+
event = Event(event_name, body=body, attributes=attributes)
153+
self._event_logger.emit(event)
154+
114155
def log_response_content(self, attributes, body):
115156
_logger.debug("Recording response.")
116157
event_name = "gen_ai.choice"
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Generic uploading module which could be part of a GenAI utils
16+
17+
Has a global uploader object using `fsspec` under the hood to work with all sorts of local and
18+
cloud storage.
19+
"""
20+
21+
from abc import ABC, abstractmethod
22+
import io
23+
import json
24+
import logging
25+
from pathlib import Path
26+
27+
from fsspec import AbstractFileSystem
28+
from fsspec.asyn import AsyncFileSystem, AbstractBufferedFile
29+
30+
from opentelemetry.trace import get_tracer
31+
from opentelemetry.util._once import Once
32+
from opentelemetry.util.types import AnyValue
33+
from google.cloud import storage
34+
from google.cloud.storage import transfer_manager
35+
36+
37+
_logger = logging.getLogger(__name__)
38+
_tracer = get_tracer(__name__)
39+
40+
41+
# TODO: try to make async or do background buffering + async
42+
43+
44+
class Uploader(ABC):
45+
@abstractmethod
46+
def upload(self, *, path: Path, value: AnyValue) -> str:
47+
pass
48+
49+
50+
class GcsUploader(Uploader):
51+
def __init__(
52+
self, *, bucket_path: str, client: storage.Client | None = None
53+
) -> None:
54+
self._client = client or storage.Client()
55+
self._bucket_path = bucket_path
56+
57+
def upload(self, *, path: Path, value: AnyValue) -> str:
58+
blob = self._client.bucket(self._bucket_path).blob(str(path))
59+
with blob.open("w") as file:
60+
json.dump(value, file)
61+
_logger.debug("GcsUploader uploaded to %s", file)
62+
return f"gs://{blob.bucket.name}/{blob.name}"
63+
64+
65+
class FsSpecUploader(Uploader):
66+
def __init__(self, *, fs: AbstractFileSystem, base_path: Path) -> None:
67+
self._fs = fs
68+
self._base_path = base_path
69+
70+
@_tracer.start_as_current_span("fsspec_upload")
71+
def upload(self, *, path: Path, value: AnyValue) -> str:
72+
with self._fs.open(self._base_path / path, "wb") as file:
73+
# Encode to utf8 bytes while writing to the file
74+
text_wrapper = io.TextIOWrapper(
75+
file, encoding="utf-8", write_through=True
76+
)
77+
json.dump(value, text_wrapper)
78+
_logger.debug("Uploaded to %s", file)
79+
return file.full_name
80+
81+
82+
_uploader: Uploader | None = None
83+
_uploader_once: Once = Once()
84+
85+
86+
def _get_uploader() -> Uploader | None:
87+
return _uploader
88+
89+
90+
def set_uploader(uploader: Uploader) -> None:
91+
def do_set():
92+
global _uploader
93+
_uploader = uploader
94+
95+
_uploader_once.do_once(do_set)
96+
97+
98+
def upload_to_storage(filename: str, value: AnyValue) -> str:
99+
"""Given an AnyValue, uploads it to storage (local filesystem, GCS, S3, etc.)"""
100+
if not (uploader := _get_uploader()):
101+
return "/dev/null"
102+
return uploader.upload(path=Path(filename), value=value)

0 commit comments

Comments
 (0)