Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Start making changes to implement the big semantic convention changes made in https://github.com/open-telemetry/semantic-conventions/pull/2179.
Now only a single event (`gen_ai.client.inference.operation.details`) is used to capture Chat History. These changes will be opt-in,
users will need to set the environment variable OTEL_SEMCONV_STABILITY_OPT_IN to `gen_ai_latest_experimental` to see them ([#3386](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3386)).
- Update instrumentation to use the latest semantic convention changes made in https://github.com/open-telemetry/semantic-conventions/pull/2179.
Now only a single event and span (`gen_ai.client.inference.operation.details`) are used to capture prompt and response content. These changes are opt-in,
users will need to set the environment variable OTEL_SEMCONV_STABILITY_OPT_IN to `gen_ai_latest_experimental` to see them ([#3799](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3799)) and ([#3709](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3709)).
- Implement uninstrument for `opentelemetry-instrumentation-vertexai`
([#3328](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3328))
- VertexAI support for async calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,17 @@ OTEL_SERVICE_NAME=opentelemetry-python-vertexai

# Change to 'false' to hide prompt and completion content
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true

# Alternatively set this env var to enable the latest semantic conventions:
OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental

# When using the latest experimental flag this env var controls which telemetry signals will have prompt and response content included in them.
# Choices are NO_CONTENT, SPAN_ONLY, EVENT_ONLY, SPAN_AND_EVENT.
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=SPAN_AND_EVENT

# Optional hook that will upload prompt and response content to some external destination.
# For example fsspec.
OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK = "upload"

# Required if using a completion hook. The path to upload content to for example gs://my_bucket.
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = "gs://my_bucket"
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,17 @@ OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true

# Change to 'false' to hide prompt and completion content
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true

# Alternatively set this env var to enable the latest semantic conventions:
OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental

# When using the latest experimental flag this env var controls which telemetry signals will have prompt and response content included in them.
# Choices are NO_CONTENT, SPAN_ONLY, EVENT_ONLY, SPAN_AND_EVENT.
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=SPAN_AND_EVENT

# Optional hook that will upload prompt and response content to some external destination.
# For example fsspec.
OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK = "upload"

# Required if using a completion hook. The path to upload content to for example gs://my_bucket.
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = "gs://my_bucket"
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from opentelemetry.instrumentation.vertexai.utils import is_content_enabled
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import get_tracer
from opentelemetry.util.genai.completion_hook import load_completion_hook


def _methods_to_wrap(
Expand Down Expand Up @@ -109,6 +110,9 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs: Any):
"""Enable VertexAI instrumentation."""
completion_hook = (
kwargs.get("completion_hook") or load_completion_hook()
)
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.GEN_AI,
)
Expand Down Expand Up @@ -141,6 +145,7 @@ def _instrument(self, **kwargs: Any):
event_logger,
is_content_enabled(sem_conv_opt_in_mode),
sem_conv_opt_in_mode,
completion_hook,
)
elif sem_conv_opt_in_mode == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL:
# Type checker now knows it's the other literal
Expand All @@ -149,6 +154,7 @@ def _instrument(self, **kwargs: Any):
event_logger,
is_content_enabled(sem_conv_opt_in_mode),
sem_conv_opt_in_mode,
completion_hook,
)
else:
raise RuntimeError(f"{sem_conv_opt_in_mode} mode not supported")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

from contextlib import contextmanager
from dataclasses import asdict
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -27,22 +28,32 @@
overload,
)

from opentelemetry._events import EventLogger
from opentelemetry._events import Event, EventLogger
from opentelemetry.instrumentation._semconv import (
_StabilityMode,
)
from opentelemetry.instrumentation.vertexai.utils import (
GenerateContentParams,
create_operation_details_event,
_map_finish_reason,
convert_content_to_message_parts,
get_genai_request_attributes,
get_genai_response_attributes,
get_server_attributes,
get_span_name,
request_to_events,
response_to_events,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.util.genai.types import ContentCapturingMode
from opentelemetry.util.genai.completion_hook import CompletionHook
from opentelemetry.util.genai.types import (
ContentCapturingMode,
InputMessage,
OutputMessage,
)
from opentelemetry.util.genai.utils import gen_ai_json_dumps

if TYPE_CHECKING:
from google.cloud.aiplatform_v1.services.prediction_service import client
Expand Down Expand Up @@ -110,6 +121,7 @@ def __init__(
sem_conv_opt_in_mode: Literal[
_StabilityMode.GEN_AI_LATEST_EXPERIMENTAL
],
completion_hook: CompletionHook,
) -> None: ...

@overload
Expand All @@ -119,6 +131,7 @@ def __init__(
event_logger: EventLogger,
capture_content: bool,
sem_conv_opt_in_mode: Literal[_StabilityMode.DEFAULT],
completion_hook: CompletionHook,
) -> None: ...

def __init__(
Expand All @@ -130,11 +143,13 @@ def __init__(
Literal[_StabilityMode.DEFAULT],
Literal[_StabilityMode.GEN_AI_LATEST_EXPERIMENTAL],
],
completion_hook: CompletionHook,
) -> None:
self.tracer = tracer
self.event_logger = event_logger
self.capture_content = capture_content
self.sem_conv_opt_in_mode = sem_conv_opt_in_mode
self.completion_hook = completion_hook

@contextmanager
def _with_new_instrumentation(
Expand All @@ -146,40 +161,88 @@ def _with_new_instrumentation(
kwargs: Any,
):
params = _extract_params(*args, **kwargs)
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
span_attributes = {
**get_genai_request_attributes(False, params),
**get_server_attributes(api_endpoint),
}

span_name = get_span_name(span_attributes)

request_attributes = get_genai_request_attributes(True, params)
with self.tracer.start_as_current_span(
name=span_name,
name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {request_attributes.get(GenAI.GEN_AI_REQUEST_MODEL, '')}".strip(),
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as span:

def handle_response(
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
| None,
) -> None:
if span.is_recording() and response:
# When streaming, this is called multiple times so attributes would be
# overwritten. In practice, it looks the API only returns the interesting
# attributes on the last streamed response. However, I couldn't find
# documentation for this and setting attributes shouldn't be too expensive.
span.set_attributes(
get_genai_response_attributes(response)
)
self.event_logger.emit(
create_operation_details_event(
api_endpoint=api_endpoint,
params=params,
capture_content=capture_content,
response=response,
attributes = (
get_server_attributes(instance.api_endpoint) # type: ignore[reportUnknownMemberType]
| request_attributes
| get_genai_response_attributes(response)
)
system_instructions, inputs, outputs = [], [], []
if params.system_instruction:
system_instructions = convert_content_to_message_parts(
params.system_instruction
)
if params.contents:
inputs = [
InputMessage(
role=content.role,
parts=convert_content_to_message_parts(content),
)
for content in params.contents
]
if response:
outputs = [
OutputMessage(
finish_reason=_map_finish_reason(
candidate.finish_reason
),
role=candidate.content.role,
parts=convert_content_to_message_parts(
candidate.content
),
)
for candidate in response.candidates
]
content_attributes = {
k: [asdict(x) for x in v]
for k, v in [
(
GenAI.GEN_AI_SYSTEM_INSTRUCTIONS,
system_instructions,
),
(GenAI.GEN_AI_INPUT_MESSAGES, inputs),
(GenAI.GEN_AI_OUTPUT_MESSAGES, outputs),
]
if v
}
if span.is_recording():
span.set_attributes(attributes)
if capture_content in (
ContentCapturingMode.SPAN_AND_EVENT,
ContentCapturingMode.SPAN_ONLY,
):
span.set_attributes(
{
k: gen_ai_json_dumps(v)
for k, v in content_attributes.items()
}
)
event = Event(
name="gen_ai.client.inference.operation.details",
)
event.attributes = attributes
if capture_content in (
ContentCapturingMode.SPAN_AND_EVENT,
ContentCapturingMode.EVENT_ONLY,
):
event.attributes |= content_attributes
self.event_logger.emit(event)
self.completion_hook.on_completion(
inputs=inputs,
outputs=outputs,
system_instruction=system_instructions,
span=span,
log_record=event,
)

yield handle_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

import re
from dataclasses import asdict, dataclass
from dataclasses import dataclass
from os import environ
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -53,9 +53,7 @@
from opentelemetry.util.genai.types import (
ContentCapturingMode,
FinishReason,
InputMessage,
MessagePart,
OutputMessage,
Text,
ToolCall,
ToolCallResponse,
Expand Down Expand Up @@ -192,8 +190,11 @@ def get_genai_request_attributes( # pylint: disable=too-many-branches

def get_genai_response_attributes(
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
| prediction_service_v1beta1.GenerateContentResponse
| None,
) -> dict[str, AttributeValue]:
if not response:
return {}
finish_reasons: list[str] = [
_map_finish_reason(candidate.finish_reason)
for candidate in response.candidates
Expand Down Expand Up @@ -307,68 +308,9 @@ def request_to_events(
yield user_event(role=content.role, content=request_content)


def create_operation_details_event(
*,
api_endpoint: str,
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
| None,
params: GenerateContentParams,
capture_content: ContentCapturingMode,
) -> Event:
event = Event(name="gen_ai.client.inference.operation.details")
attributes: dict[str, AnyValue] = {
**get_genai_request_attributes(True, params),
**get_server_attributes(api_endpoint),
**(get_genai_response_attributes(response) if response else {}),
}
event.attributes = attributes
if capture_content in {
ContentCapturingMode.NO_CONTENT,
ContentCapturingMode.SPAN_ONLY,
}:
return event
if params.system_instruction:
attributes[GenAIAttributes.GEN_AI_SYSTEM_INSTRUCTIONS] = [
{
"type": "text",
"content": "\n".join(
part.text for part in params.system_instruction.parts
),
}
]
if params.contents:
attributes[GenAIAttributes.GEN_AI_INPUT_MESSAGES] = [
asdict(_convert_content_to_message(content))
for content in params.contents
]
if response and response.candidates:
attributes[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES] = [
asdict(x) for x in _convert_response_to_output_messages(response)
]
return event


def _convert_response_to_output_messages(
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
) -> list[OutputMessage]:
output_messages: list[OutputMessage] = []
for candidate in response.candidates:
message = _convert_content_to_message(candidate.content)
output_messages.append(
OutputMessage(
finish_reason=_map_finish_reason(candidate.finish_reason),
role=message.role,
parts=message.parts,
)
)
return output_messages


def _convert_content_to_message(
def convert_content_to_message_parts(
content: content.Content | content_v1beta1.Content,
) -> InputMessage:
) -> list[MessagePart]:
parts: MessagePart = []
for idx, part in enumerate(content.parts):
if "function_response" in part:
Expand Down Expand Up @@ -398,7 +340,7 @@ def _convert_content_to_message(
)
dict_part["type"] = type(part)
parts.append(dict_part)
return InputMessage(role=content.role, parts=parts)
return parts


def response_to_events(
Expand Down
Loading