Skip to content

Commit d29cb64

Browse files
committed
Merge branch 'main' into jacksonweber/populate-synthetic-attributes
2 parents 401d59d + 7819be1 commit d29cb64

File tree

11 files changed

+503
-65
lines changed

11 files changed

+503
-65
lines changed

.github/component_owners.yml

Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,23 @@
11
components:
22

3-
docs/instrumentation:
4-
- nemoshlag
5-
6-
7-
instrumentation/opentelemetry-instrumentation-aio-pika:
8-
- ofek1weiss
3+
instrumentation/opentelemetry-instrumentation-aiokafka:
4+
- dimastbk
95

10-
instrumentation/opentelemetry-instrumentation-boto3sqs:
11-
- oxeye-nikolay
12-
- nikosokolik
13-
146
instrumentation/opentelemetry-instrumentation-asyncclick:
157
- jomcgi
168

17-
instrumentation/opentelemetry-instrumentation-kafka-python:
18-
- nozik
19-
20-
instrumentation/opentelemetry-instrumentation-pika:
21-
- oxeye-nikolay
22-
- nikosokolik
23-
24-
instrumentation/opentelemetry-instrumentation-redis:
25-
- sungwonh
26-
27-
instrumentation/opentelemetry-instrumentation-remoulade:
28-
- ben-natan
29-
- machine424
30-
31-
instrumentation/opentelemetry-instrumentation-confluent-kafka:
32-
- oxeye-dorkolog
33-
- dorkolog
34-
35-
propagator/opentelemetry-propagator-aws-xray:
36-
- jj22ee
37-
38-
sdk-extension/opentelemetry-sdk-extension-aws:
39-
- srprash
40-
- jj22ee
41-
42-
instrumentation/opentelemetry-instrumentation-tortoiseorm:
43-
- tonybaloney
9+
instrumentation/opentelemetry-instrumentation-asyncio:
10+
- bourbonkk
4411

45-
instrumentation/opentelemetry-instrumentation-tornado:
46-
- shalevr
12+
instrumentation/opentelemetry-instrumentation-pymssql:
13+
- guillaumep
4714

4815
instrumentation/opentelemetry-instrumentation-urllib:
4916
- shalevr
5017

5118
instrumentation/opentelemetry-instrumentation-urllib3:
5219
- shalevr
5320

54-
instrumentation/opentelemetry-instrumentation-sqlalchemy:
55-
- shalevr
56-
57-
instrumentation/opentelemetry-instrumentation-cassandra:
58-
- mattcontinisio
59-
60-
instrumentation/opentelemetry-instrumentation-asyncio:
61-
- bourbonkk
62-
63-
instrumentation/opentelemetry-instrumentation-psycopg:
64-
- federicobond
65-
66-
instrumentation/opentelemetry-instrumentation-pymssql:
67-
- guillaumep
68-
69-
instrumentation/opentelemetry-instrumentation-aiokafka:
70-
- dimastbk
71-
72-
processor/opentelemetry-processor-baggage:
73-
- codeboten
74-
7521
instrumentation-genai/:
7622
- karthikscale3
7723
- lmolkova
@@ -80,3 +26,17 @@ components:
8026
- nirga
8127
- alizenhom
8228
- codefromthecrypt
29+
30+
processor/opentelemetry-processor-baggage:
31+
- codeboten
32+
33+
propagator/opentelemetry-propagator-aws-xray:
34+
- jj22ee
35+
36+
sdk-extension/opentelemetry-sdk-extension-aws:
37+
- srprash
38+
- jj22ee
39+
40+
util/opentelemetry-util-genai:
41+
- DylanRussell
42+
- keith-decker

docs-requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pymemcache~=1.3
99
# Required by conf
1010
django>=2.2
1111

12+
# Require by opentelemetry-util-genai
13+
fsspec>=2025.9.0
14+
1215
# Required by instrumentation and exporter packages
1316
aio_pika~=7.2.0
1417
aiohttp~=3.0

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
None,
131131
),
132132
"redis": ("https://redis.readthedocs.io/en/latest/", None),
133+
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
133134
}
134135

135136
# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky

docs/instrumentation-genai/util.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
2525
:members:
2626
:undoc-members:
2727
:show-inheritance:
28+
29+
.. automodule:: opentelemetry.util.genai._fsspec_upload
30+
:members:
31+
:show-inheritance:

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ deps =
10601060
{[testenv]test_deps}
10611061
{toxinidir}/opentelemetry-instrumentation
10621062
{toxinidir}/util/opentelemetry-util-http
1063-
{toxinidir}/util/opentelemetry-util-genai
1063+
{toxinidir}/util/opentelemetry-util-genai[fsspec]
10641064
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
10651065
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
10661066
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]

util/opentelemetry-util-genai/pyproject.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ dependencies = [
3030
"opentelemetry-api>=1.31.0",
3131
]
3232

33+
[project.entry-points.opentelemetry_genai_upload_hook]
34+
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
35+
3336
[project.optional-dependencies]
34-
test = [
35-
"pytest>=7.0.0",
36-
]
37+
test = ["pytest>=7.0.0"]
38+
fsspec = ["fsspec>=2025.9.0"]
3739

3840
[project.urls]
3941
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from os import environ
18+
19+
from opentelemetry.util.genai.environment_variables import (
20+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
21+
)
22+
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
23+
24+
25+
def fsspec_upload_hook() -> UploadHook:
26+
# If fsspec is not installed the hook will be a no-op.
27+
try:
28+
# pylint: disable=import-outside-toplevel
29+
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
30+
FsspecUploadHook,
31+
)
32+
except ImportError:
33+
return _NoOpUploadHook()
34+
35+
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
36+
if not base_path:
37+
return _NoOpUploadHook()
38+
39+
return FsspecUploadHook(base_path=base_path)
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from __future__ import annotations
17+
18+
import json
19+
import logging
20+
import posixpath
21+
import threading
22+
from concurrent.futures import Future, ThreadPoolExecutor
23+
from dataclasses import asdict, dataclass
24+
from functools import partial
25+
from typing import Any, Callable, Literal, TextIO, cast
26+
from uuid import uuid4
27+
28+
import fsspec
29+
30+
from opentelemetry._logs import LogRecord
31+
from opentelemetry.trace import Span
32+
from opentelemetry.util.genai import types
33+
from opentelemetry.util.genai.upload_hook import UploadHook
34+
35+
_logger = logging.getLogger(__name__)
36+
37+
38+
@dataclass
39+
class Completion:
40+
inputs: list[types.InputMessage]
41+
outputs: list[types.OutputMessage]
42+
system_instruction: list[types.MessagePart]
43+
44+
45+
@dataclass
46+
class CompletionRefs:
47+
inputs_ref: str
48+
outputs_ref: str
49+
system_instruction_ref: str
50+
51+
52+
JsonEncodeable = list[dict[str, Any]]
53+
54+
# mapping of upload path to function computing upload data dict
55+
UploadData = dict[str, Callable[[], JsonEncodeable]]
56+
57+
58+
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
59+
"""typed wrapper around `fsspec.open`"""
60+
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
61+
62+
63+
class FsspecUploadHook(UploadHook):
64+
"""An upload hook using ``fsspec`` to upload to external storage
65+
66+
This function can be used as the
67+
:func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by
68+
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``.
69+
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
70+
base path for uploads.
71+
72+
Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
73+
implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]``
74+
as a requirement to achieve this.
75+
"""
76+
77+
def __init__(
78+
self,
79+
*,
80+
base_path: str,
81+
max_size: int = 20,
82+
) -> None:
83+
self._base_path = base_path
84+
self._max_size = max_size
85+
86+
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
87+
# limits the number of queued tasks. If the queue is full, data will be dropped.
88+
self._executor = ThreadPoolExecutor(max_workers=max_size)
89+
self._semaphore = threading.BoundedSemaphore(max_size)
90+
91+
def _submit_all(self, upload_data: UploadData) -> None:
92+
def done(future: Future[None]) -> None:
93+
self._semaphore.release()
94+
95+
try:
96+
future.result()
97+
except Exception: # pylint: disable=broad-except
98+
_logger.exception("fsspec uploader failed")
99+
100+
for path, json_encodeable in upload_data.items():
101+
# could not acquire, drop data
102+
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
103+
_logger.warning(
104+
"fsspec upload queue is full, dropping upload %s",
105+
path,
106+
)
107+
continue
108+
109+
try:
110+
fut = self._executor.submit(
111+
self._do_upload, path, json_encodeable
112+
)
113+
fut.add_done_callback(done)
114+
except RuntimeError:
115+
_logger.info(
116+
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
117+
)
118+
break
119+
120+
def _calculate_ref_path(self) -> CompletionRefs:
121+
# TODO: experimental with using the trace_id and span_id, or fetching
122+
# gen_ai.response.id from the active span.
123+
124+
uuid_str = str(uuid4())
125+
return CompletionRefs(
126+
inputs_ref=posixpath.join(
127+
self._base_path, f"{uuid_str}_inputs.json"
128+
),
129+
outputs_ref=posixpath.join(
130+
self._base_path, f"{uuid_str}_outputs.json"
131+
),
132+
system_instruction_ref=posixpath.join(
133+
self._base_path, f"{uuid_str}_system_instruction.json"
134+
),
135+
)
136+
137+
@staticmethod
138+
def _do_upload(
139+
path: str, json_encodeable: Callable[[], JsonEncodeable]
140+
) -> None:
141+
with fsspec_open(path, "w") as file:
142+
json.dump(json_encodeable(), file, separators=(",", ":"))
143+
144+
def upload(
145+
self,
146+
*,
147+
inputs: list[types.InputMessage],
148+
outputs: list[types.OutputMessage],
149+
system_instruction: list[types.MessagePart],
150+
span: Span | None = None,
151+
log_record: LogRecord | None = None,
152+
**kwargs: Any,
153+
) -> None:
154+
completion = Completion(
155+
inputs=inputs,
156+
outputs=outputs,
157+
system_instruction=system_instruction,
158+
)
159+
# generate the paths to upload to
160+
ref_names = self._calculate_ref_path()
161+
162+
def to_dict(
163+
dataclass_list: list[types.InputMessage]
164+
| list[types.OutputMessage]
165+
| list[types.MessagePart],
166+
) -> JsonEncodeable:
167+
return [asdict(dc) for dc in dataclass_list]
168+
169+
self._submit_all(
170+
{
171+
# Use partial to defer as much as possible to the background threads
172+
ref_names.inputs_ref: partial(to_dict, completion.inputs),
173+
ref_names.outputs_ref: partial(to_dict, completion.outputs),
174+
ref_names.system_instruction_ref: partial(
175+
to_dict, completion.system_instruction
176+
),
177+
},
178+
)
179+
180+
# TODO: stamp the refs on telemetry
181+
182+
def shutdown(self) -> None:
183+
# TODO: support timeout
184+
self._executor.shutdown()

0 commit comments

Comments
 (0)