Skip to content

Commit 210c6d0

Browse files
committed
Working with events
1 parent 74f347e commit 210c6d0

File tree

3 files changed

+206
-55
lines changed

3 files changed

+206
-55
lines changed

instrumentation-genai/opentelemetry-instrumentation-vertexai-v2/src/opentelemetry/instrumentation/vertexai_v2/__init__.py

Lines changed: 118 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,32 @@
1414

1515
"""OpenTelemetry Vertex AI instrumentation"""
1616

17+
from functools import partial
1718
import logging
18-
import os
1919
import types
20-
from typing import Collection
20+
from typing import Collection, Optional
2121

2222
from wrapt import wrap_function_wrapper
2323

24-
from opentelemetry import context as context_api
24+
from opentelemetry._events import (
25+
EventLogger,
26+
EventLoggerProvider,
27+
Event,
28+
get_event_logger,
29+
)
2530
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
2631
from opentelemetry.instrumentation.utils import (
2732
is_instrumentation_enabled,
2833
unwrap,
2934
)
35+
from opentelemetry.instrumentation.vertexai_v2.events import (
36+
assistant_event,
37+
user_event,
38+
)
3039
from opentelemetry.instrumentation.vertexai_v2.utils import dont_throw
3140
from opentelemetry.instrumentation.vertexai_v2.version import __version__
3241
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
33-
from opentelemetry.semconv.trace import (
34-
SpanAttributes,
35-
)
36-
from opentelemetry.trace import SpanKind, get_tracer
42+
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
3743
from opentelemetry.trace.status import Status, StatusCode
3844

3945
logger = logging.getLogger(__name__)
@@ -116,11 +122,14 @@
116122

117123

118124
def should_send_prompts():
119-
return (
120-
os.getenv("TRACELOOP_TRACE_CONTENT") or "true"
121-
).lower() == "true" or context_api.get_value(
122-
"override_enable_content_tracing"
123-
)
125+
# Previously was opt-in by the following check for privacy reasons:
126+
#
127+
# return (
128+
# os.getenv("TRACELOOP_TRACE_CONTENT") or "true"
129+
# ).lower() == "true" or context_api.get_value(
130+
# "override_enable_content_tracing"
131+
# )
132+
return True
124133

125134

126135
def is_streaming_response(response):
@@ -138,7 +147,9 @@ def _set_span_attribute(span, name, value):
138147
return
139148

140149

141-
def _set_input_attributes(span, args, kwargs, llm_model):
150+
def _set_input_attributes(
151+
span, event_logger: EventLogger, args, kwargs, llm_model
152+
):
142153
if should_send_prompts() and args is not None and len(args) > 0:
143154
prompt = ""
144155
for arg in args:
@@ -148,18 +159,36 @@ def _set_input_attributes(span, args, kwargs, llm_model):
148159
for subarg in arg:
149160
prompt = f"{prompt}{subarg}\n"
150161

151-
_set_span_attribute(
152-
span,
153-
f"{SpanAttributes.LLM_PROMPTS}.0.user",
154-
prompt,
155-
)
162+
# _set_span_attribute(
163+
# span,
164+
# f"{SpanAttributes.LLM_PROMPTS}.0.user",
165+
# prompt,
166+
# )
167+
if prompt:
168+
event_logger.emit(
169+
user_event(
170+
gen_ai_system=gen_ai_attributes.GenAiSystemValues.VERTEX_AI.value,
171+
content=prompt,
172+
span_context=span.get_span_context(),
173+
)
174+
)
175+
176+
# Copied from openllmetry logic
177+
# https://github.com/traceloop/openllmetry/blob/v0.33.12/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py#L141-L143
178+
# I guess prompt may be in kwargs instead or in addition?
179+
prompt = kwargs.get("prompt")
180+
if prompt:
181+
event_logger.emit(
182+
user_event(
183+
gen_ai_system=gen_ai_attributes.GenAiSystemValues.VERTEX_AI.value,
184+
content=prompt,
185+
span_context=span.get_span_context(),
186+
)
187+
)
156188

157189
_set_span_attribute(
158190
span, gen_ai_attributes.GEN_AI_REQUEST_MODEL, llm_model
159191
)
160-
_set_span_attribute(
161-
span, f"{SpanAttributes.LLM_PROMPTS}.0.user", kwargs.get("prompt")
162-
)
163192
_set_span_attribute(
164193
span,
165194
gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE,
@@ -191,7 +220,9 @@ def _set_input_attributes(span, args, kwargs, llm_model):
191220

192221

193222
@dont_throw
194-
def _set_response_attributes(span, llm_model, generation_text, token_usage):
223+
def _set_response_attributes(
224+
span, event_logger: EventLogger, llm_model, generation_text, token_usage
225+
):
195226
_set_span_attribute(
196227
span, gen_ai_attributes.GEN_AI_RESPONSE_MODEL, llm_model
197228
)
@@ -208,17 +239,19 @@ def _set_response_attributes(span, llm_model, generation_text, token_usage):
208239
token_usage.prompt_token_count,
209240
)
210241

211-
_set_span_attribute(
212-
span, f"{SpanAttributes.LLM_COMPLETIONS}.0.role", "assistant"
213-
)
214-
_set_span_attribute(
215-
span,
216-
f"{SpanAttributes.LLM_COMPLETIONS}.0.content",
217-
generation_text,
218-
)
242+
if generation_text:
243+
event_logger.emit(
244+
assistant_event(
245+
gen_ai_system=gen_ai_attributes.GenAiSystemValues.VERTEX_AI.value,
246+
content=generation_text,
247+
span_context=span.get_span_context(),
248+
)
249+
)
219250

220251

221-
def _build_from_streaming_response(span, response, llm_model):
252+
def _build_from_streaming_response(
253+
span, event_logger: EventLogger, response, llm_model
254+
):
222255
complete_response = ""
223256
token_usage = None
224257
for item in response:
@@ -229,13 +262,17 @@ def _build_from_streaming_response(span, response, llm_model):
229262

230263
yield item_to_yield
231264

232-
_set_response_attributes(span, llm_model, complete_response, token_usage)
265+
_set_response_attributes(
266+
span, event_logger, llm_model, complete_response, token_usage
267+
)
233268

234269
span.set_status(Status(StatusCode.OK))
235270
span.end()
236271

237272

238-
async def _abuild_from_streaming_response(span, response, llm_model):
273+
async def _abuild_from_streaming_response(
274+
span, event_logger: EventLogger, response, llm_model
275+
):
239276
complete_response = ""
240277
token_usage = None
241278
async for item in response:
@@ -246,23 +283,26 @@ async def _abuild_from_streaming_response(span, response, llm_model):
246283

247284
yield item_to_yield
248285

249-
_set_response_attributes(span, llm_model, complete_response, token_usage)
286+
_set_response_attributes(
287+
span, event_logger, llm_model, complete_response, token_usage
288+
)
250289

251290
span.set_status(Status(StatusCode.OK))
252291
span.end()
253292

254293

255294
@dont_throw
256-
def _handle_request(span, args, kwargs, llm_model):
295+
def _handle_request(span, event_logger, args, kwargs, llm_model):
257296
if span.is_recording():
258-
_set_input_attributes(span, args, kwargs, llm_model)
297+
_set_input_attributes(span, event_logger, args, kwargs, llm_model)
259298

260299

261300
@dont_throw
262-
def _handle_response(span, response, llm_model):
301+
def _handle_response(span, event_logger: EventLogger, response, llm_model):
263302
if span.is_recording():
264303
_set_response_attributes(
265304
span,
305+
event_logger,
266306
llm_model,
267307
response.candidates[0].text,
268308
response.usage_metadata,
@@ -283,8 +323,10 @@ def wrapper(wrapped, instance, args, kwargs):
283323
return _with_tracer
284324

285325

286-
@_with_tracer_wrapper
287-
async def _awrap(tracer, to_wrap, wrapped, instance, args, kwargs):
326+
# @_with_tracer_wrapper
327+
async def _awrap(
328+
tracer, event_logger: EventLogger, to_wrap, wrapped, instance, args, kwargs
329+
):
288330
"""Instruments and calls every function defined in TO_WRAP."""
289331
if not is_instrumentation_enabled():
290332
return await wrapped(*args, **kwargs)
@@ -310,24 +352,30 @@ async def _awrap(tracer, to_wrap, wrapped, instance, args, kwargs):
310352
},
311353
)
312354

313-
_handle_request(span, args, kwargs, llm_model)
355+
_handle_request(span, event_logger, args, kwargs, llm_model)
314356

315357
response = await wrapped(*args, **kwargs)
316358

317359
if response:
318360
if is_streaming_response(response):
319-
return _build_from_streaming_response(span, response, llm_model)
361+
return _build_from_streaming_response(
362+
span, event_logger, response, llm_model
363+
)
320364
elif is_async_streaming_response(response):
321-
return _abuild_from_streaming_response(span, response, llm_model)
365+
return _abuild_from_streaming_response(
366+
span, event_logger, response, llm_model
367+
)
322368
else:
323-
_handle_response(span, response, llm_model)
369+
_handle_response(span, event_logger, response, llm_model)
324370

325371
span.end()
326372
return response
327373

328374

329-
@_with_tracer_wrapper
330-
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
375+
# @_with_tracer_wrapper
376+
def _wrap(
377+
tracer, event_logger: EventLogger, to_wrap, wrapped, instance, args, kwargs
378+
):
331379
"""Instruments and calls every function defined in TO_WRAP."""
332380
if not is_instrumentation_enabled():
333381
return wrapped(*args, **kwargs)
@@ -353,17 +401,21 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
353401
},
354402
)
355403

356-
_handle_request(span, args, kwargs, llm_model)
404+
_handle_request(span, event_logger, args, kwargs, llm_model)
357405

358406
response = wrapped(*args, **kwargs)
359407

360408
if response:
361409
if is_streaming_response(response):
362-
return _build_from_streaming_response(span, response, llm_model)
410+
return _build_from_streaming_response(
411+
span, event_logger, response, llm_model
412+
)
363413
elif is_async_streaming_response(response):
364-
return _abuild_from_streaming_response(span, response, llm_model)
414+
return _abuild_from_streaming_response(
415+
span, event_logger, response, llm_model
416+
)
365417
else:
366-
_handle_response(span, response, llm_model)
418+
_handle_response(span, event_logger, response, llm_model)
367419

368420
span.end()
369421
return response
@@ -378,9 +430,21 @@ def __init__(self, exception_logger=None):
378430
def instrumentation_dependencies(self) -> Collection[str]:
379431
return _instruments
380432

381-
def _instrument(self, **kwargs):
382-
tracer_provider = kwargs.get("tracer_provider")
383-
tracer = get_tracer(__name__, __version__, tracer_provider)
433+
def _instrument(
434+
self,
435+
*,
436+
tracer_provider: Optional[TracerProvider] = None,
437+
event_logger_provider: Optional[EventLoggerProvider] = None,
438+
**kwargs,
439+
):
440+
tracer = get_tracer(
441+
__name__, __version__, tracer_provider=tracer_provider
442+
)
443+
event_logger = get_event_logger(
444+
__name__,
445+
version=__version__,
446+
event_logger_provider=event_logger_provider,
447+
)
384448
for wrapped_method in WRAPPED_METHODS:
385449
wrap_package = wrapped_method.get("package")
386450
wrap_object = wrapped_method.get("object")
@@ -390,9 +454,9 @@ def _instrument(self, **kwargs):
390454
wrap_package,
391455
f"{wrap_object}.{wrap_method}",
392456
(
393-
_awrap(tracer, wrapped_method)
457+
partial(_awrap, tracer, event_logger, wrapped_method)
394458
if wrapped_method.get("is_async")
395-
else _wrap(tracer, wrapped_method)
459+
else partial(_wrap, tracer, event_logger, wrapped_method)
396460
),
397461
)
398462

0 commit comments

Comments
 (0)