Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
)
from opentelemetry.instrumentation.openai.helpers import (
_get_embeddings_span_attributes_from_response,
_get_embeddings_span_attributes_from_wrapper,
_get_event_attributes,
_get_span_attributes_from_response,
_get_span_attributes_from_wrapper,
_record_operation_duration_metric,
_record_token_usage_metrics,
_send_log_events_from_choices,
_send_log_events_from_messages,
_set_embeddings_span_attributes_from_response,
_set_span_attributes_from_response,
_span_name_from_span_attributes,
)
from opentelemetry.instrumentation.openai.package import _instruments
Expand Down Expand Up @@ -160,13 +160,15 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
span.set_status(StatusCode.ERROR, str(exc))
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
span.end()
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
error_attributes = {**span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
raise

if kwargs.get("stream"):
return StreamWrapper(
stream=result,
span=span,
span_attributes=span_attributes,
capture_message_content=self.capture_message_content,
event_attributes=event_attributes,
event_logger=self.event_logger,
Expand All @@ -177,13 +179,16 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):

logger.debug(f"openai.resources.chat.completions.Completions.create result: {result}")

response_attributes = _get_span_attributes_from_response(
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
)
if span.is_recording():
_set_span_attributes_from_response(
span, result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
)
for k, v in response_attributes.items():
span.set_attribute(k, v)

_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
metrics_attributes = {**span_attributes, **response_attributes}
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)

_send_log_events_from_choices(
self.event_logger,
Expand Down Expand Up @@ -225,13 +230,15 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
span.set_status(StatusCode.ERROR, str(exc))
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
span.end()
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
error_attributes = {ERROR_TYPE: exc.__class__.__qualname__}
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
raise

if kwargs.get("stream"):
return StreamWrapper(
stream=result,
span=span,
span_attributes=span_attributes,
capture_message_content=self.capture_message_content,
event_attributes=event_attributes,
event_logger=self.event_logger,
Expand All @@ -242,13 +249,16 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):

logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create result: {result}")

response_attributes = _get_span_attributes_from_response(
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
)
if span.is_recording():
_set_span_attributes_from_response(
span, result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
)
for k, v in response_attributes.items():
span.set_attribute(k, v)

_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
metrics_attributes = {**span_attributes, **response_attributes}
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)

_send_log_events_from_choices(
self.event_logger,
Expand Down Expand Up @@ -279,14 +289,18 @@ def _embeddings_wrapper(self, wrapped, instance, args, kwargs):
span.set_status(StatusCode.ERROR, str(exc))
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
span.end()
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
error_attributes = {**span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
raise

response_attributes = _get_embeddings_span_attributes_from_response(result.model, result.usage)
if span.is_recording():
_set_embeddings_span_attributes_from_response(span, result.model, result.usage)
for k, v in response_attributes.items():
span.set_attribute(k, v)

_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
metrics_attributes = {**span_attributes, **response_attributes}
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)

span.end()

Expand All @@ -310,14 +324,18 @@ async def _async_embeddings_wrapper(self, wrapped, instance, args, kwargs):
span.set_status(StatusCode.ERROR, str(exc))
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
span.end()
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
error_attributes = {**span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
raise

response_attributes = _get_embeddings_span_attributes_from_response(result.model, result.usage)
if span.is_recording():
_set_embeddings_span_attributes_from_response(span, result.model, result.usage)
for k, v in response_attributes.items():
span.set_attribute(k, v)

_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
metrics_attributes = {**span_attributes, **response_attributes}
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)

span.end()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,36 @@
CompletionUsage = None


def _set_span_attributes_from_response(
span: Span,
def _get_span_attributes_from_response(
response_id: str,
model: str,
choices,
usage: CompletionUsage,
service_tier: Optional[str],
) -> None:
span.set_attribute(GEN_AI_RESPONSE_ID, response_id)
span.set_attribute(GEN_AI_RESPONSE_MODEL, model)
) -> Attributes:
# when streaming finish_reason is None for every chunk that is not the last
finish_reasons = [choice.finish_reason for choice in choices if choice.finish_reason]
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons or ["error"])

attributes = {
GEN_AI_RESPONSE_ID: response_id,
GEN_AI_RESPONSE_MODEL: model,
GEN_AI_RESPONSE_FINISH_REASONS: finish_reasons or ["error"],
}
# without `include_usage` in `stream_options` we won't get this
if usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens)
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage.completion_tokens)
attributes[GEN_AI_USAGE_INPUT_TOKENS] = usage.prompt_tokens
attributes[GEN_AI_USAGE_OUTPUT_TOKENS] = usage.completion_tokens
# this is available only if requested
if service_tier:
span.set_attribute(GEN_AI_OPENAI_RESPONSE_SERVICE_TIER, service_tier)
attributes[GEN_AI_OPENAI_RESPONSE_SERVICE_TIER] = service_tier
return attributes


def _set_embeddings_span_attributes_from_response(span: Span, model: str, usage: CompletionUsage) -> None:
span.set_attribute(GEN_AI_RESPONSE_MODEL, model)
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens)
def _get_embeddings_span_attributes_from_response(model: str, usage: CompletionUsage) -> Attributes:
return {
GEN_AI_RESPONSE_MODEL: model,
GEN_AI_USAGE_INPUT_TOKENS: usage.prompt_tokens,
}


def _attributes_from_client(client) -> Attributes:
Expand Down Expand Up @@ -190,37 +195,33 @@ def _get_attributes_if_set(span: Span, names: Iterable) -> Attributes:
return {name: attributes[name] for name in names if name in attributes}


def _record_token_usage_metrics(metric: Histogram, span: Span, usage: CompletionUsage):
token_usage_metric_attrs = _get_attributes_if_set(
span,
(
GEN_AI_OPERATION_NAME,
GEN_AI_REQUEST_MODEL,
GEN_AI_RESPONSE_MODEL,
GEN_AI_SYSTEM,
SERVER_ADDRESS,
SERVER_PORT,
),
def _record_token_usage_metrics(metric: Histogram, attributes: Attributes, usage: CompletionUsage):
attribute_names = (
GEN_AI_OPERATION_NAME,
GEN_AI_REQUEST_MODEL,
GEN_AI_RESPONSE_MODEL,
GEN_AI_SYSTEM,
SERVER_ADDRESS,
SERVER_PORT,
)
token_usage_metric_attrs = {k: v for k, v in attributes.items() if k in attribute_names}
metric.record(usage.prompt_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "input"})
# embeddings responses only have input tokens
if hasattr(usage, "completion_tokens"):
metric.record(usage.completion_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "output"})


def _record_operation_duration_metric(metric: Histogram, span: Span, start: float):
operation_duration_metric_attrs = _get_attributes_if_set(
span,
(
GEN_AI_OPERATION_NAME,
GEN_AI_REQUEST_MODEL,
GEN_AI_RESPONSE_MODEL,
GEN_AI_SYSTEM,
ERROR_TYPE,
SERVER_ADDRESS,
SERVER_PORT,
),
def _record_operation_duration_metric(metric: Histogram, attributes: Attributes, start: float):
attribute_names = (
GEN_AI_OPERATION_NAME,
GEN_AI_REQUEST_MODEL,
GEN_AI_RESPONSE_MODEL,
GEN_AI_SYSTEM,
ERROR_TYPE,
SERVER_ADDRESS,
SERVER_PORT,
)
operation_duration_metric_attrs = {k: v for k, v in attributes.items() if k in attribute_names}
duration_s = default_timer() - start
metric.record(duration_s, operation_duration_metric_attrs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

from opentelemetry._events import EventLogger
from opentelemetry.instrumentation.openai.helpers import (
_get_span_attributes_from_response,
_record_operation_duration_metric,
_record_token_usage_metrics,
_send_log_events_from_stream_choices,
_set_span_attributes_from_response,
)
from opentelemetry.metrics import Histogram
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
Expand All @@ -39,6 +39,7 @@ def __init__(
self,
stream,
span: Span,
span_attributes: Attributes,
capture_message_content: bool,
event_attributes: Attributes,
event_logger: EventLogger,
Expand All @@ -48,6 +49,7 @@ def __init__(
):
self.stream = stream
self.span = span
self.span_attributes = span_attributes
self.capture_message_content = capture_message_content
self.event_attributes = event_attributes
self.event_logger = event_logger
Expand All @@ -67,17 +69,21 @@ def end(self, exc=None):
self.span.set_status(StatusCode.ERROR, str(exc))
self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
self.span.end()
_record_operation_duration_metric(self.operation_duration_metric, self.span, self.start_time)
error_attributes = {**self.span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, self.start_time)
return

response_attributes = _get_span_attributes_from_response(
self.response_id, self.model, self.choices, self.usage, self.service_tier
)
if self.span.is_recording():
_set_span_attributes_from_response(
self.span, self.response_id, self.model, self.choices, self.usage, self.service_tier
)
for k, v in response_attributes.items():
self.span.set_attribute(k, v)

_record_operation_duration_metric(self.operation_duration_metric, self.span, self.start_time)
metrics_attributes = {**self.span_attributes, **response_attributes}
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, self.start_time)
if self.usage:
_record_token_usage_metrics(self.token_usage_metric, self.span, self.usage)
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, self.usage)

_send_log_events_from_stream_choices(
self.event_logger,
Expand Down