Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pylint==3.0.2
httpretty==1.1.4
pyright==v1.1.396
pyright==v1.1.404
sphinx==7.1.2
sphinx-rtd-theme==2.0.0rc4
sphinx-autodoc-typehints==1.25.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Start making changes to implement the big semantic convention changes made in https://github.com/open-telemetry/semantic-conventions/pull/2179.
Now only a single event (`gen_ai.client.inference.operation.details`) is used to capture Chat History. These changes will be opt-in,
users will need to set the environment variable OTEL_SEMCONV_STABILITY_OPT_IN to `gen_ai_latest_experimental` to see them ([#3386](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3386)).
- Implement uninstrument for `opentelemetry-instrumentation-vertexai`
([#3328](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3328))
- VertexAI support for async calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ classifiers = [
]
dependencies = [
"opentelemetry-api ~= 1.28",
"opentelemetry-instrumentation ~= 0.49b0",
"opentelemetry-semantic-conventions ~= 0.49b0",
"opentelemetry-instrumentation == 0.58b0dev",
"opentelemetry-semantic-conventions == 0.58b0dev",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
)

from opentelemetry._events import get_event_logger
from opentelemetry.instrumentation._semconv import (
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.vertexai.package import _instruments
Expand Down Expand Up @@ -118,9 +122,12 @@ def _instrument(self, **kwargs: Any):
schema_url=Schemas.V1_28_0.value,
event_logger_provider=event_logger_provider,
)
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.GEN_AI,
)

method_wrappers = MethodWrappers(
tracer, event_logger, is_content_enabled()
tracer, event_logger, is_content_enabled(), sem_conv_opt_in_mode
)
for client_class, method_name, wrapper in _methods_to_wrap(
method_wrappers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
)

from opentelemetry._events import EventLogger
from opentelemetry.instrumentation._semconv import (
_StabilityMode,
)
from opentelemetry.instrumentation.vertexai.utils import (
GenerateContentParams,
create_operation_details_event,
get_genai_request_attributes,
get_genai_response_attributes,
get_server_attributes,
Expand Down Expand Up @@ -91,14 +95,72 @@ def _extract_params(

class MethodWrappers:
def __init__(
self, tracer: Tracer, event_logger: EventLogger, capture_content: bool
self,
tracer: Tracer,
event_logger: EventLogger,
capture_content: bool,
sem_conv_opt_in_mode: _StabilityMode,
) -> None:
self.tracer = tracer
self.event_logger = event_logger
self.capture_content = capture_content
self.sem_conv_opt_in_mode = sem_conv_opt_in_mode

# Deprecations:
# - `gen_ai.system.message` event - use `gen_ai.system_instructions` or
# `gen_ai.input.messages` attributes instead.
# - `gen_ai.user.message`, `gen_ai.assistant.message`, `gen_ai.tool.message` events
# (use `gen_ai.input.messages` attribute instead)
# - `gen_ai.choice` event (use `gen_ai.output.messages` attribute instead)

@contextmanager
def _with_new_instrumentation(
self,
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
args: Any,
kwargs: Any,
):
params = _extract_params(*args, **kwargs)
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
span_attributes = {
**get_genai_request_attributes(False, params),
**get_server_attributes(api_endpoint),
}

span_name = get_span_name(span_attributes)

with self.tracer.start_as_current_span(
name=span_name,
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as span:

def handle_response(
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
) -> None:
if span.is_recording():
# When streaming, this is called multiple times so attributes would be
# overwritten. In practice, it looks the API only returns the interesting
# attributes on the last streamed response. However, I couldn't find
# documentation for this and setting attributes shouldn't be too expensive.
span.set_attributes(
get_genai_response_attributes(response)
)
self.event_logger.emit(
create_operation_details_event(
api_endpoint=api_endpoint,
params=params,
capture_content=self.capture_content,
response=response,
)
)

yield handle_response

@contextmanager
def _with_instrumentation(
def _with_default_instrumentation(
self,
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
Expand All @@ -108,7 +170,7 @@ def _with_instrumentation(
params = _extract_params(*args, **kwargs)
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
span_attributes = {
**get_genai_request_attributes(params),
**get_genai_request_attributes(False, params),
**get_server_attributes(api_endpoint),
}

Expand Down Expand Up @@ -162,12 +224,32 @@ def generate_content(
prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
):
with self._with_instrumentation(
instance, args, kwargs
) as handle_response:
response = wrapped(*args, **kwargs)
handle_response(response)
return response
if self.sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
with self._with_default_instrumentation(
instance, args, kwargs
) as handle_response:
response = wrapped(*args, **kwargs)
handle_response(response)
return response
else:
with self._with_new_instrumentation(
instance, args, kwargs
) as handle_response:
try:
response = wrapped(*args, **kwargs)
except Exception as e:
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
self.event_logger.emit(
create_operation_details_event(
params=_extract_params(*args, **kwargs),
response=None,
capture_content=self.capture_content,
api_endpoint=api_endpoint,
)
)
raise e
handle_response(response)
return response

async def agenerate_content(
self,
Expand All @@ -186,9 +268,29 @@ async def agenerate_content(
prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
):
with self._with_instrumentation(
instance, args, kwargs
) as handle_response:
response = await wrapped(*args, **kwargs)
handle_response(response)
return response
if self.sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
with self._with_default_instrumentation(
instance, args, kwargs
) as handle_response:
response = await wrapped(*args, **kwargs)
handle_response(response)
return response
else:
with self._with_new_instrumentation(
instance, args, kwargs
) as handle_response:
try:
response = await wrapped(*args, **kwargs)
except Exception as e:
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
self.event_logger.emit(
create_operation_details_event(
params=_extract_params(*args, **kwargs),
response=None,
capture_content=self.capture_content,
api_endpoint=api_endpoint,
)
)
raise e
handle_response(response)
return response
Loading
Loading