Skip to content

Commit b8f038f

Browse files
authored
openai: decouple metrics from span recording (elastic#56)
* openai: decouple metrics from span recording When spans are not recording the attributes are not accessible and so we'll crash trying to access them in metrics code. So decouple the metrics code from the span and instead use the plain attributes. While at it remove span prefix from helpers that extract attributes
1 parent c573aab commit b8f038f

File tree

3 files changed

+103
-78
lines changed

3 files changed

+103
-78
lines changed

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@
2525
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
2626
)
2727
from opentelemetry.instrumentation.openai.helpers import (
28-
_get_embeddings_span_attributes_from_wrapper,
28+
_get_attributes_from_response,
29+
_get_attributes_from_wrapper,
30+
_get_embeddings_attributes_from_response,
31+
_get_embeddings_attributes_from_wrapper,
2932
_get_event_attributes,
30-
_get_span_attributes_from_wrapper,
3133
_record_operation_duration_metric,
3234
_record_token_usage_metrics,
3335
_send_log_events_from_choices,
3436
_send_log_events_from_messages,
35-
_set_embeddings_span_attributes_from_response,
36-
_set_span_attributes_from_response,
37-
_span_name_from_span_attributes,
37+
_span_name_from_attributes,
3838
)
3939
from opentelemetry.instrumentation.openai.package import _instruments
4040
from opentelemetry.instrumentation.openai.version import __version__
@@ -134,10 +134,10 @@ def _uninstrument(self, **kwargs):
134134
def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
135135
logger.debug(f"openai.resources.chat.completions.Completions.create kwargs: {kwargs}")
136136

137-
span_attributes = _get_span_attributes_from_wrapper(instance, kwargs)
137+
span_attributes = _get_attributes_from_wrapper(instance, kwargs)
138138
event_attributes = _get_event_attributes()
139139

140-
span_name = _span_name_from_span_attributes(span_attributes)
140+
span_name = _span_name_from_attributes(span_attributes)
141141
with self.tracer.start_as_current_span(
142142
name=span_name,
143143
kind=SpanKind.CLIENT,
@@ -160,13 +160,15 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
160160
span.set_status(StatusCode.ERROR, str(exc))
161161
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
162162
span.end()
163-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
163+
error_attributes = {**span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
164+
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
164165
raise
165166

166167
if kwargs.get("stream"):
167168
return StreamWrapper(
168169
stream=result,
169170
span=span,
171+
span_attributes=span_attributes,
170172
capture_message_content=self.capture_message_content,
171173
event_attributes=event_attributes,
172174
event_logger=self.event_logger,
@@ -177,13 +179,16 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
177179

178180
logger.debug(f"openai.resources.chat.completions.Completions.create result: {result}")
179181

182+
response_attributes = _get_attributes_from_response(
183+
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
184+
)
180185
if span.is_recording():
181-
_set_span_attributes_from_response(
182-
span, result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
183-
)
186+
for k, v in response_attributes.items():
187+
span.set_attribute(k, v)
184188

185-
_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
186-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
189+
metrics_attributes = {**span_attributes, **response_attributes}
190+
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
191+
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)
187192

188193
_send_log_events_from_choices(
189194
self.event_logger,
@@ -199,10 +204,10 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
199204
async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
200205
logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create kwargs: {kwargs}")
201206

202-
span_attributes = _get_span_attributes_from_wrapper(instance, kwargs)
207+
span_attributes = _get_attributes_from_wrapper(instance, kwargs)
203208
event_attributes = _get_event_attributes()
204209

205-
span_name = _span_name_from_span_attributes(span_attributes)
210+
span_name = _span_name_from_attributes(span_attributes)
206211
with self.tracer.start_as_current_span(
207212
name=span_name,
208213
kind=SpanKind.CLIENT,
@@ -225,13 +230,15 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
225230
span.set_status(StatusCode.ERROR, str(exc))
226231
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
227232
span.end()
228-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
233+
error_attributes = {ERROR_TYPE: exc.__class__.__qualname__}
234+
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
229235
raise
230236

231237
if kwargs.get("stream"):
232238
return StreamWrapper(
233239
stream=result,
234240
span=span,
241+
span_attributes=span_attributes,
235242
capture_message_content=self.capture_message_content,
236243
event_attributes=event_attributes,
237244
event_logger=self.event_logger,
@@ -242,13 +249,16 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
242249

243250
logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create result: {result}")
244251

252+
response_attributes = _get_attributes_from_response(
253+
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
254+
)
245255
if span.is_recording():
246-
_set_span_attributes_from_response(
247-
span, result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
248-
)
256+
for k, v in response_attributes.items():
257+
span.set_attribute(k, v)
249258

250-
_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
251-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
259+
metrics_attributes = {**span_attributes, **response_attributes}
260+
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
261+
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)
252262

253263
_send_log_events_from_choices(
254264
self.event_logger,
@@ -262,9 +272,9 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
262272
return result
263273

264274
def _embeddings_wrapper(self, wrapped, instance, args, kwargs):
265-
span_attributes = _get_embeddings_span_attributes_from_wrapper(instance, kwargs)
275+
span_attributes = _get_embeddings_attributes_from_wrapper(instance, kwargs)
266276

267-
span_name = _span_name_from_span_attributes(span_attributes)
277+
span_name = _span_name_from_attributes(span_attributes)
268278
with self.tracer.start_as_current_span(
269279
name=span_name,
270280
kind=SpanKind.CLIENT,
@@ -279,23 +289,27 @@ def _embeddings_wrapper(self, wrapped, instance, args, kwargs):
279289
span.set_status(StatusCode.ERROR, str(exc))
280290
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
281291
span.end()
282-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
292+
error_attributes = {**span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
293+
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
283294
raise
284295

296+
response_attributes = _get_embeddings_attributes_from_response(result.model, result.usage)
285297
if span.is_recording():
286-
_set_embeddings_span_attributes_from_response(span, result.model, result.usage)
298+
for k, v in response_attributes.items():
299+
span.set_attribute(k, v)
287300

288-
_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
289-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
301+
metrics_attributes = {**span_attributes, **response_attributes}
302+
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
303+
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)
290304

291305
span.end()
292306

293307
return result
294308

295309
async def _async_embeddings_wrapper(self, wrapped, instance, args, kwargs):
296-
span_attributes = _get_embeddings_span_attributes_from_wrapper(instance, kwargs)
310+
span_attributes = _get_embeddings_attributes_from_wrapper(instance, kwargs)
297311

298-
span_name = _span_name_from_span_attributes(span_attributes)
312+
span_name = _span_name_from_attributes(span_attributes)
299313
with self.tracer.start_as_current_span(
300314
name=span_name,
301315
kind=SpanKind.CLIENT,
@@ -310,14 +324,18 @@ async def _async_embeddings_wrapper(self, wrapped, instance, args, kwargs):
310324
span.set_status(StatusCode.ERROR, str(exc))
311325
span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
312326
span.end()
313-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
327+
error_attributes = {**span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
328+
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
314329
raise
315330

331+
response_attributes = _get_embeddings_attributes_from_response(result.model, result.usage)
316332
if span.is_recording():
317-
_set_embeddings_span_attributes_from_response(span, result.model, result.usage)
333+
for k, v in response_attributes.items():
334+
span.set_attribute(k, v)
318335

319-
_record_token_usage_metrics(self.token_usage_metric, span, result.usage)
320-
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
336+
metrics_attributes = {**span_attributes, **response_attributes}
337+
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, result.usage)
338+
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, start_time)
321339

322340
span.end()
323341

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -68,31 +68,36 @@
6868
CompletionUsage = None
6969

7070

71-
def _set_span_attributes_from_response(
72-
span: Span,
71+
def _get_attributes_from_response(
7372
response_id: str,
7473
model: str,
7574
choices,
7675
usage: CompletionUsage,
7776
service_tier: Optional[str],
78-
) -> None:
79-
span.set_attribute(GEN_AI_RESPONSE_ID, response_id)
80-
span.set_attribute(GEN_AI_RESPONSE_MODEL, model)
77+
) -> Attributes:
8178
# when streaming finish_reason is None for every chunk that is not the last
8279
finish_reasons = [choice.finish_reason for choice in choices if choice.finish_reason]
83-
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons or ["error"])
80+
81+
attributes = {
82+
GEN_AI_RESPONSE_ID: response_id,
83+
GEN_AI_RESPONSE_MODEL: model,
84+
GEN_AI_RESPONSE_FINISH_REASONS: finish_reasons or ["error"],
85+
}
8486
# without `include_usage` in `stream_options` we won't get this
8587
if usage:
86-
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens)
87-
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage.completion_tokens)
88+
attributes[GEN_AI_USAGE_INPUT_TOKENS] = usage.prompt_tokens
89+
attributes[GEN_AI_USAGE_OUTPUT_TOKENS] = usage.completion_tokens
8890
# this is available only if requested
8991
if service_tier:
90-
span.set_attribute(GEN_AI_OPENAI_RESPONSE_SERVICE_TIER, service_tier)
92+
attributes[GEN_AI_OPENAI_RESPONSE_SERVICE_TIER] = service_tier
93+
return attributes
9194

9295

93-
def _set_embeddings_span_attributes_from_response(span: Span, model: str, usage: CompletionUsage) -> None:
94-
span.set_attribute(GEN_AI_RESPONSE_MODEL, model)
95-
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens)
96+
def _get_embeddings_attributes_from_response(model: str, usage: CompletionUsage) -> Attributes:
97+
return {
98+
GEN_AI_RESPONSE_MODEL: model,
99+
GEN_AI_USAGE_INPUT_TOKENS: usage.prompt_tokens,
100+
}
96101

97102

98103
def _attributes_from_client(client) -> Attributes:
@@ -112,7 +117,7 @@ def _attributes_from_client(client) -> Attributes:
112117
return span_attributes
113118

114119

115-
def _get_span_attributes_from_wrapper(instance, kwargs) -> Attributes:
120+
def _get_attributes_from_wrapper(instance, kwargs) -> Attributes:
116121
span_attributes = {
117122
GEN_AI_OPERATION_NAME: "chat",
118123
GEN_AI_SYSTEM: "openai",
@@ -153,7 +158,7 @@ def _get_span_attributes_from_wrapper(instance, kwargs) -> Attributes:
153158
return span_attributes
154159

155160

156-
def _span_name_from_span_attributes(attributes: Attributes) -> str:
161+
def _span_name_from_attributes(attributes: Attributes) -> str:
157162
request_model = attributes.get(GEN_AI_REQUEST_MODEL)
158163
return (
159164
f"{attributes[GEN_AI_OPERATION_NAME]} {request_model}"
@@ -162,7 +167,7 @@ def _span_name_from_span_attributes(attributes: Attributes) -> str:
162167
)
163168

164169

165-
def _get_embeddings_span_attributes_from_wrapper(instance, kwargs) -> Attributes:
170+
def _get_embeddings_attributes_from_wrapper(instance, kwargs) -> Attributes:
166171
span_attributes = {
167172
GEN_AI_OPERATION_NAME: "embeddings",
168173
GEN_AI_SYSTEM: "openai",
@@ -190,37 +195,33 @@ def _get_attributes_if_set(span: Span, names: Iterable) -> Attributes:
190195
return {name: attributes[name] for name in names if name in attributes}
191196

192197

193-
def _record_token_usage_metrics(metric: Histogram, span: Span, usage: CompletionUsage):
194-
token_usage_metric_attrs = _get_attributes_if_set(
195-
span,
196-
(
197-
GEN_AI_OPERATION_NAME,
198-
GEN_AI_REQUEST_MODEL,
199-
GEN_AI_RESPONSE_MODEL,
200-
GEN_AI_SYSTEM,
201-
SERVER_ADDRESS,
202-
SERVER_PORT,
203-
),
198+
def _record_token_usage_metrics(metric: Histogram, attributes: Attributes, usage: CompletionUsage):
199+
attribute_names = (
200+
GEN_AI_OPERATION_NAME,
201+
GEN_AI_REQUEST_MODEL,
202+
GEN_AI_RESPONSE_MODEL,
203+
GEN_AI_SYSTEM,
204+
SERVER_ADDRESS,
205+
SERVER_PORT,
204206
)
207+
token_usage_metric_attrs = {k: v for k, v in attributes.items() if k in attribute_names}
205208
metric.record(usage.prompt_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "input"})
206209
# embeddings responses only have input tokens
207210
if hasattr(usage, "completion_tokens"):
208211
metric.record(usage.completion_tokens, {**token_usage_metric_attrs, GEN_AI_TOKEN_TYPE: "output"})
209212

210213

211-
def _record_operation_duration_metric(metric: Histogram, span: Span, start: float):
212-
operation_duration_metric_attrs = _get_attributes_if_set(
213-
span,
214-
(
215-
GEN_AI_OPERATION_NAME,
216-
GEN_AI_REQUEST_MODEL,
217-
GEN_AI_RESPONSE_MODEL,
218-
GEN_AI_SYSTEM,
219-
ERROR_TYPE,
220-
SERVER_ADDRESS,
221-
SERVER_PORT,
222-
),
214+
def _record_operation_duration_metric(metric: Histogram, attributes: Attributes, start: float):
215+
attribute_names = (
216+
GEN_AI_OPERATION_NAME,
217+
GEN_AI_REQUEST_MODEL,
218+
GEN_AI_RESPONSE_MODEL,
219+
GEN_AI_SYSTEM,
220+
ERROR_TYPE,
221+
SERVER_ADDRESS,
222+
SERVER_PORT,
223223
)
224+
operation_duration_metric_attrs = {k: v for k, v in attributes.items() if k in attribute_names}
224225
duration_s = default_timer() - start
225226
metric.record(duration_s, operation_duration_metric_attrs)
226227

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
from opentelemetry._events import EventLogger
2020
from opentelemetry.instrumentation.openai.helpers import (
21+
_get_attributes_from_response,
2122
_record_operation_duration_metric,
2223
_record_token_usage_metrics,
2324
_send_log_events_from_stream_choices,
24-
_set_span_attributes_from_response,
2525
)
2626
from opentelemetry.metrics import Histogram
2727
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
@@ -39,6 +39,7 @@ def __init__(
3939
self,
4040
stream,
4141
span: Span,
42+
span_attributes: Attributes,
4243
capture_message_content: bool,
4344
event_attributes: Attributes,
4445
event_logger: EventLogger,
@@ -48,6 +49,7 @@ def __init__(
4849
):
4950
self.stream = stream
5051
self.span = span
52+
self.span_attributes = span_attributes
5153
self.capture_message_content = capture_message_content
5254
self.event_attributes = event_attributes
5355
self.event_logger = event_logger
@@ -67,17 +69,21 @@ def end(self, exc=None):
6769
self.span.set_status(StatusCode.ERROR, str(exc))
6870
self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
6971
self.span.end()
70-
_record_operation_duration_metric(self.operation_duration_metric, self.span, self.start_time)
72+
error_attributes = {**self.span_attributes, ERROR_TYPE: exc.__class__.__qualname__}
73+
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, self.start_time)
7174
return
7275

76+
response_attributes = _get_attributes_from_response(
77+
self.response_id, self.model, self.choices, self.usage, self.service_tier
78+
)
7379
if self.span.is_recording():
74-
_set_span_attributes_from_response(
75-
self.span, self.response_id, self.model, self.choices, self.usage, self.service_tier
76-
)
80+
for k, v in response_attributes.items():
81+
self.span.set_attribute(k, v)
7782

78-
_record_operation_duration_metric(self.operation_duration_metric, self.span, self.start_time)
83+
metrics_attributes = {**self.span_attributes, **response_attributes}
84+
_record_operation_duration_metric(self.operation_duration_metric, metrics_attributes, self.start_time)
7985
if self.usage:
80-
_record_token_usage_metrics(self.token_usage_metric, self.span, self.usage)
86+
_record_token_usage_metrics(self.token_usage_metric, metrics_attributes, self.usage)
8187

8288
_send_log_events_from_stream_choices(
8389
self.event_logger,

0 commit comments

Comments
 (0)