Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pylint==3.0.2
httpretty==1.1.4
pyright==v1.1.396
pyright==v1.1.404
sphinx==7.1.2
sphinx-rtd-theme==2.0.0rc4
sphinx-autodoc-typehints==1.25.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Start making changes to implement the big semantic convention changes made in https://github.com/open-telemetry/semantic-conventions/pull/2179.
Now only a single event (`gen_ai.client.inference.operation.details`) is used to capture Chat History. These changes will be opt-in,
users will need to set the environment variable OTEL_SEMCONV_STABILITY_OPT_IN to `gen_ai_latest_experimental` to see them ([#3386](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3386)).
- Implement uninstrument for `opentelemetry-instrumentation-vertexai`
([#3328](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3328))
- VertexAI support for async calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ classifiers = [
]
dependencies = [
"opentelemetry-api ~= 1.28",
"opentelemetry-instrumentation ~= 0.49b0",
"opentelemetry-semantic-conventions ~= 0.49b0",
"opentelemetry-instrumentation ~= 0.58b0",
"opentelemetry-util-genai == 0.1b0.dev",
"opentelemetry-semantic-conventions ~= 0.58b0",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
)

from opentelemetry._events import get_event_logger
from opentelemetry.instrumentation._semconv import (
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_StabilityMode,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.vertexai.package import _instruments
Expand Down Expand Up @@ -104,24 +109,49 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs: Any):
"""Enable VertexAI instrumentation."""
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.GEN_AI,
)
tracer_provider = kwargs.get("tracer_provider")
schema = (
Schemas.V1_28_0.value
if sem_conv_opt_in_mode == _StabilityMode.DEFAULT
else Schemas.V1_36_0.value
)
tracer = get_tracer(
__name__,
"",
tracer_provider,
schema_url=Schemas.V1_28_0.value,
schema_url=schema,
)
event_logger_provider = kwargs.get("event_logger_provider")
event_logger = get_event_logger(
__name__,
"",
schema_url=Schemas.V1_28_0.value,
schema_url=schema,
event_logger_provider=event_logger_provider,
)

method_wrappers = MethodWrappers(
tracer, event_logger, is_content_enabled()
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.GEN_AI,
)
if sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
# Type checker now knows sem_conv_opt_in_mode is a Literal[_StabilityMode.DEFAULT]
method_wrappers = MethodWrappers(
tracer,
event_logger,
is_content_enabled(sem_conv_opt_in_mode),
sem_conv_opt_in_mode,
)
elif sem_conv_opt_in_mode == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL:
# Type checker now knows it's the other literal
method_wrappers = MethodWrappers(
tracer,
event_logger,
is_content_enabled(sem_conv_opt_in_mode),
sem_conv_opt_in_mode,
)
else:
raise RuntimeError(f"{sem_conv_opt_in_mode} mode not supported")
for client_class, method_name, wrapper in _methods_to_wrap(
method_wrappers
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@
Any,
Awaitable,
Callable,
Literal,
MutableSequence,
Union,
cast,
overload,
)

from opentelemetry._events import EventLogger
from opentelemetry.instrumentation._semconv import (
_StabilityMode,
)
from opentelemetry.instrumentation.vertexai.utils import (
GenerateContentParams,
create_operation_details_event,
get_genai_request_attributes,
get_genai_response_attributes,
get_server_attributes,
Expand All @@ -34,6 +42,7 @@
response_to_events,
)
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.util.genai.types import ContentCapturingMode

if TYPE_CHECKING:
from google.cloud.aiplatform_v1.services.prediction_service import client
Expand Down Expand Up @@ -89,17 +98,96 @@ def _extract_params(
)


# For details about GEN_AI_LATEST_EXPERIMENTAL stability mode see
# https://github.com/open-telemetry/semantic-conventions/blob/v1.37.0/docs/gen-ai/gen-ai-agent-spans.md?plain=1#L18-L37
class MethodWrappers:
@overload
def __init__(
self,
tracer: Tracer,
event_logger: EventLogger,
capture_content: ContentCapturingMode,
sem_conv_opt_in_mode: Literal[
_StabilityMode.GEN_AI_LATEST_EXPERIMENTAL
],
) -> None: ...

@overload
def __init__(
self,
tracer: Tracer,
event_logger: EventLogger,
capture_content: bool,
sem_conv_opt_in_mode: Literal[_StabilityMode.DEFAULT],
) -> None: ...

def __init__(
self, tracer: Tracer, event_logger: EventLogger, capture_content: bool
self,
tracer: Tracer,
event_logger: EventLogger,
capture_content: Union[bool, ContentCapturingMode],
sem_conv_opt_in_mode: Union[
Literal[_StabilityMode.DEFAULT],
Literal[_StabilityMode.GEN_AI_LATEST_EXPERIMENTAL],
],
) -> None:
self.tracer = tracer
self.event_logger = event_logger
self.capture_content = capture_content
self.sem_conv_opt_in_mode = sem_conv_opt_in_mode

@contextmanager
def _with_new_instrumentation(
self,
capture_content: ContentCapturingMode,
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
args: Any,
kwargs: Any,
):
params = _extract_params(*args, **kwargs)
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
span_attributes = {
**get_genai_request_attributes(False, params),
**get_server_attributes(api_endpoint),
}

span_name = get_span_name(span_attributes)

with self.tracer.start_as_current_span(
name=span_name,
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as span:

def handle_response(
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
| None,
) -> None:
if span.is_recording() and response:
# When streaming, this is called multiple times so attributes would be
# overwritten. In practice, it looks the API only returns the interesting
# attributes on the last streamed response. However, I couldn't find
# documentation for this and setting attributes shouldn't be too expensive.
span.set_attributes(
get_genai_response_attributes(response)
)
self.event_logger.emit(
create_operation_details_event(
api_endpoint=api_endpoint,
params=params,
capture_content=capture_content,
response=response,
)
)

yield handle_response

@contextmanager
def _with_instrumentation(
def _with_default_instrumentation(
self,
capture_content: bool,
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
args: Any,
Expand All @@ -108,7 +196,7 @@ def _with_instrumentation(
params = _extract_params(*args, **kwargs)
api_endpoint: str = instance.api_endpoint # type: ignore[reportUnknownMemberType]
span_attributes = {
**get_genai_request_attributes(params),
**get_genai_request_attributes(False, params),
**get_server_attributes(api_endpoint),
}

Expand All @@ -120,7 +208,7 @@ def _with_instrumentation(
attributes=span_attributes,
) as span:
for event in request_to_events(
params=params, capture_content=self.capture_content
params=params, capture_content=capture_content
):
self.event_logger.emit(event)

Expand All @@ -141,7 +229,7 @@ def handle_response(
)

for event in response_to_events(
response=response, capture_content=self.capture_content
response=response, capture_content=capture_content
):
self.event_logger.emit(event)

Expand All @@ -162,12 +250,25 @@ def generate_content(
prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
):
with self._with_instrumentation(
instance, args, kwargs
) as handle_response:
response = wrapped(*args, **kwargs)
handle_response(response)
return response
if self.sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
capture_content_bool = cast(bool, self.capture_content)
with self._with_default_instrumentation(
capture_content_bool, instance, args, kwargs
) as handle_response:
response = wrapped(*args, **kwargs)
handle_response(response)
return response
else:
capture_content = cast(ContentCapturingMode, self.capture_content)
with self._with_new_instrumentation(
capture_content, instance, args, kwargs
) as handle_response:
response = None
try:
response = wrapped(*args, **kwargs)
return response
finally:
handle_response(response)

async def agenerate_content(
self,
Expand All @@ -186,9 +287,22 @@ async def agenerate_content(
prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse
):
with self._with_instrumentation(
instance, args, kwargs
) as handle_response:
response = await wrapped(*args, **kwargs)
handle_response(response)
return response
if self.sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
capture_content_bool = cast(bool, self.capture_content)
with self._with_default_instrumentation(
capture_content_bool, instance, args, kwargs
) as handle_response:
response = await wrapped(*args, **kwargs)
handle_response(response)
return response
else:
capture_content = cast(ContentCapturingMode, self.capture_content)
with self._with_new_instrumentation(
capture_content, instance, args, kwargs
) as handle_response:
response = None
try:
response = await wrapped(*args, **kwargs)
return response
finally:
handle_response(response)
Loading