Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ chat_completion = client.chat.completions.create(model="gpt-4o-mini", messages=m
- `ELASTIC_OTEL_GENAI_CAPTURE_CONTENT` (default: `false`): when sets to `true` collect more
informations about prompts and responses by enabling content capture

### Elastic specific semantic conventions

- New `embeddings` value for `gen_ai.operation.name`
- New `gen_ai.request.encoding_format` attribute with openai specific values `[float, base64]`

## Development

We use [pytest](https://docs.pytest.org/en/stable/) to execute tests written with the standard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
ELASTIC_OTEL_GENAI_CAPTURE_CONTENT,
)
from opentelemetry.instrumentation.openai.helpers import (
_get_embeddings_span_attributes_from_wrapper,
_get_span_attributes_from_wrapper,
_message_from_choice,
_record_token_usage_metrics,
_record_operation_duration_metric,
_set_span_attributes_from_response,
_set_embeddings_span_attributes_from_response,
)
from opentelemetry.instrumentation.openai.package import _instruments
from opentelemetry.instrumentation.openai.version import __version__
Expand Down Expand Up @@ -95,6 +97,16 @@ def _patch(self, _module):
"AsyncCompletions.create",
self._async_chat_completion_wrapper,
)
wrap_function_wrapper(
"openai.resources.embeddings",
"Embeddings.create",
self._embeddings_wrapper,
)
wrap_function_wrapper(
"openai.resources.embeddings",
"AsyncEmbeddings.create",
self._async_embeddings_wrapper,
)

def _uninstrument(self, **kwargs):
# unwrap only supports uninstrementing real module references so we
Expand All @@ -103,6 +115,8 @@ def _uninstrument(self, **kwargs):

unwrap(openai.resources.chat.completions.Completions, "create")
unwrap(openai.resources.chat.completions.AsyncCompletions, "create")
unwrap(openai.resources.embeddings.Embeddings, "create")
unwrap(openai.resources.embeddings.AsyncEmbeddings, "create")

def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
logger.debug(f"openai.resources.chat.completions.Completions.create kwargs: {kwargs}")
Expand Down Expand Up @@ -226,3 +240,63 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
span.end()

return result

def _embeddings_wrapper(self, wrapped, instance, args, kwargs):
span_attributes = _get_embeddings_span_attributes_from_wrapper(instance, kwargs)

span_name = f"{span_attributes[GEN_AI_OPERATION_NAME]} {span_attributes[GEN_AI_REQUEST_MODEL]}"
with self.tracer.start_as_current_span(
name=span_name,
kind=SpanKind.CLIENT,
attributes=span_attributes,
# this is important to avoid having the span closed before ending the stream
end_on_exit=False,
) as span:
start_time = default_timer()
try:
result = wrapped(*args, **kwargs)
except Exception as exc:
span.set_status(StatusCode.ERROR, str(exc))
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
span.end()
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
raise

_set_embeddings_span_attributes_from_response(span, result.model, result.usage)

_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)

span.end()

return result

async def _async_embeddings_wrapper(self, wrapped, instance, args, kwargs):
span_attributes = _get_embeddings_span_attributes_from_wrapper(instance, kwargs)

span_name = f"{span_attributes[GEN_AI_OPERATION_NAME]} {span_attributes[GEN_AI_REQUEST_MODEL]}"
with self.tracer.start_as_current_span(
name=span_name,
kind=SpanKind.CLIENT,
attributes=span_attributes,
# this is important to avoid having the span closed before ending the stream
end_on_exit=False,
) as span:
start_time = default_timer()
try:
result = await wrapped(*args, **kwargs)
except Exception as exc:
span.set_status(StatusCode.ERROR, str(exc))
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
span.end()
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
raise

_set_embeddings_span_attributes_from_response(span, result.model, result.usage)

_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)

span.end()

return result
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
else:
CompletionUsage = None

GEN_AI_REQUEST_ENCODING_FORMAT = "gen_ai.request.encoding_format"


def _set_span_attributes_from_response(
span: Span, response_id: str, model: str, choices, usage: CompletionUsage
Expand All @@ -61,6 +63,11 @@ def _set_span_attributes_from_response(
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage.completion_tokens)


def _set_embeddings_span_attributes_from_response(span: Span, model: str, usage: CompletionUsage) -> None:
span.set_attribute(GEN_AI_RESPONSE_MODEL, model)
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens)


def _decode_function_arguments(arguments: str):
try:
return json.loads(arguments)
Expand Down Expand Up @@ -98,6 +105,23 @@ def _message_from_stream_choices(choices):
return message


def _attributes_from_client(client):
span_attributes = {}

if base_url := getattr(client, "_base_url", None):
if host := getattr(base_url, "host", None):
span_attributes[SERVER_ADDRESS] = host
if port := getattr(base_url, "port", None):
span_attributes[SERVER_PORT] = port
elif scheme := getattr(base_url, "scheme", None):
if scheme == "http":
span_attributes[SERVER_PORT] = 80
elif scheme == "https":
span_attributes[SERVER_PORT] = 443

return span_attributes


def _get_span_attributes_from_wrapper(instance, kwargs):
span_attributes = {
GEN_AI_OPERATION_NAME: "chat",
Expand All @@ -106,16 +130,7 @@ def _get_span_attributes_from_wrapper(instance, kwargs):
}

if client := getattr(instance, "_client", None):
if base_url := getattr(client, "_base_url", None):
if host := getattr(base_url, "host", None):
span_attributes[SERVER_ADDRESS] = host
if port := getattr(base_url, "port", None):
span_attributes[SERVER_PORT] = port
elif scheme := getattr(base_url, "scheme", None):
if scheme == "http":
span_attributes[SERVER_PORT] = 80
elif scheme == "https":
span_attributes[SERVER_PORT] = 443
span_attributes.update(_attributes_from_client(client))

if (frequency_penalty := kwargs.get("frequency_penalty")) is not None:
span_attributes[GEN_AI_REQUEST_FREQUENCY_PENALTY] = frequency_penalty
Expand All @@ -135,6 +150,22 @@ def _get_span_attributes_from_wrapper(instance, kwargs):
return span_attributes


def _get_embeddings_span_attributes_from_wrapper(instance, kwargs):
span_attributes = {
GEN_AI_OPERATION_NAME: "embeddings",
GEN_AI_REQUEST_MODEL: kwargs["model"],
GEN_AI_SYSTEM: "openai",
}

if client := getattr(instance, "_client", None):
span_attributes.update(_attributes_from_client(client))

if (encoding_format := kwargs.get("encoding_format")) is not None:
span_attributes[GEN_AI_REQUEST_ENCODING_FORMAT] = encoding_format

return span_attributes


def _get_attributes_if_set(span: Span, names: Iterable) -> dict:
"""Returns a dict with any attribute found in the span attributes"""
attributes = span.attributes
Expand All @@ -154,7 +185,9 @@ def _record_token_usage_metrics(metric: Histogram, span: Span, usage: Completion
),
)
metric.record(usage.prompt_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "input"})
metric.record(usage.completion_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "output"})
# embeddings responses only have input tokens
if hasattr(usage, "completion_tokens"):
metric.record(usage.completion_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "output"})


def _record_operation_duration_metric(metric: Histogram, span: Span, start: float):
Expand Down
Loading