Skip to content

Commit 29cc440

Browse files
authored
openai: fix instrumentation of with_raw_response (elastic#73)
1 parent d559b0c commit 29cc440

File tree

6 files changed

+543
-23
lines changed

6 files changed

+543
-23
lines changed

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
_get_embeddings_attributes_from_response,
3131
_get_embeddings_attributes_from_wrapper,
3232
_get_event_attributes,
33+
_is_raw_response,
3334
_record_operation_duration_metric,
3435
_record_token_usage_metrics,
3536
_send_log_events_from_choices,
@@ -195,6 +196,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
195196
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
196197
raise
197198

199+
is_raw_response = _is_raw_response(result)
198200
if kwargs.get("stream"):
199201
return StreamWrapper(
200202
stream=result,
@@ -206,10 +208,14 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
206208
start_time=start_time,
207209
token_usage_metric=self.token_usage_metric,
208210
operation_duration_metric=self.operation_duration_metric,
211+
is_raw_response=is_raw_response,
209212
)
210213

211214
logger.debug(f"openai.resources.chat.completions.Completions.create result: {result}")
212215

216+
# if the caller is using with_raw_response we need to parse the output to get the response class we expect
217+
if is_raw_response:
218+
result = result.parse()
213219
response_attributes = _get_attributes_from_response(
214220
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
215221
)
@@ -265,6 +271,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
265271
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
266272
raise
267273

274+
is_raw_response = _is_raw_response(result)
268275
if kwargs.get("stream"):
269276
return StreamWrapper(
270277
stream=result,
@@ -276,10 +283,14 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
276283
start_time=start_time,
277284
token_usage_metric=self.token_usage_metric,
278285
operation_duration_metric=self.operation_duration_metric,
286+
is_raw_response=is_raw_response,
279287
)
280288

281289
logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create result: {result}")
282290

291+
# if the caller is using with_raw_response we need to parse the output to get the response class we expect
292+
if is_raw_response:
293+
result = result.parse()
283294
response_attributes = _get_attributes_from_response(
284295
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
285296
)

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,13 @@ def _send_log_events_from_stream_choices(
373373
trace_flags=ctx.trace_flags,
374374
)
375375
event_logger.emit(event)
376+
377+
378+
def _is_raw_response(response):
379+
try:
380+
# available since 1.8.0
381+
from openai._legacy_response import LegacyAPIResponse
382+
except ImportError:
383+
return False
384+
385+
return isinstance(response, LegacyAPIResponse)

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
from opentelemetry.trace import Span
2929
from opentelemetry.trace.status import StatusCode
3030
from opentelemetry.util.types import Attributes
31+
from wrapt import ObjectProxy
3132

3233
EVENT_GEN_AI_CONTENT_COMPLETION = "gen_ai.content.completion"
3334

3435
logger = logging.getLogger(__name__)
3536

3637

37-
class StreamWrapper:
38+
class StreamWrapper(ObjectProxy):
3839
def __init__(
3940
self,
4041
stream,
@@ -46,8 +47,11 @@ def __init__(
4647
start_time: float,
4748
token_usage_metric: Histogram,
4849
operation_duration_metric: Histogram,
50+
is_raw_response: bool,
4951
):
50-
self.stream = stream
52+
# we need to wrap the original response even in case of raw_responses
53+
super().__init__(stream)
54+
5155
self.span = span
5256
self.span_attributes = span_attributes
5357
self.capture_message_content = capture_message_content
@@ -56,6 +60,7 @@ def __init__(
5660
self.token_usage_metric = token_usage_metric
5761
self.operation_duration_metric = operation_duration_metric
5862
self.start_time = start_time
63+
self.is_raw_response = is_raw_response
5964

6065
self.response_id = None
6166
self.model = None
@@ -64,8 +69,7 @@ def __init__(
6469
self.service_tier = None
6570

6671
def end(self, exc=None):
67-
# StopIteration is not an error, it signals that we have consumed all the stream
68-
if exc is not None and not isinstance(exc, (StopIteration, StopAsyncIteration)):
72+
if exc is not None:
6973
self.span.set_status(StatusCode.ERROR, str(exc))
7074
self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
7175
self.span.end()
@@ -107,32 +111,28 @@ def process_chunk(self, chunk):
107111
if hasattr(chunk, "service_tier"):
108112
self.service_tier = chunk.service_tier
109113

110-
def __enter__(self):
111-
return self
112-
113-
def __exit__(self, exc_type, exc_value, traceback):
114-
self.end(exc_value)
115-
116114
def __iter__(self):
117-
return self
118-
119-
def __aiter__(self):
120-
return self
121-
122-
def __next__(self):
115+
stream = self.__wrapped__
123116
try:
124-
chunk = next(self.stream)
125-
self.process_chunk(chunk)
126-
return chunk
117+
if self.is_raw_response:
118+
stream = stream.parse()
119+
for chunk in stream:
120+
self.process_chunk(chunk)
121+
yield chunk
127122
except Exception as exc:
128123
self.end(exc)
129124
raise
125+
self.end()
130126

131-
async def __anext__(self):
127+
async def __aiter__(self):
128+
stream = self.__wrapped__
132129
try:
133-
chunk = await self.stream.__anext__()
134-
self.process_chunk(chunk)
135-
return chunk
130+
if self.is_raw_response:
131+
stream = stream.parse()
132+
async for chunk in stream:
133+
self.process_chunk(chunk)
134+
yield chunk
136135
except Exception as exc:
137136
self.end(exc)
138137
raise
138+
self.end()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
interactions:
2+
- request:
3+
body: |-
4+
{
5+
"messages": [
6+
{
7+
"role": "user",
8+
"content": "Answer in up to 3 words: Which ocean contains Bouvet Island?"
9+
}
10+
],
11+
"model": "gpt-4o-mini",
12+
"stream": true
13+
}
14+
headers:
15+
accept:
16+
- application/json
17+
accept-encoding:
18+
- gzip, deflate
19+
authorization:
20+
- Bearer test_openai_api_key
21+
connection:
22+
- keep-alive
23+
content-length:
24+
- '147'
25+
content-type:
26+
- application/json
27+
host:
28+
- api.openai.com
29+
user-agent:
30+
- OpenAI/Python 1.54.4
31+
x-stainless-arch:
32+
- x64
33+
x-stainless-async:
34+
- 'false'
35+
x-stainless-lang:
36+
- python
37+
x-stainless-os:
38+
- Linux
39+
x-stainless-package-version:
40+
- 1.54.4
41+
x-stainless-raw-response:
42+
- 'true'
43+
x-stainless-retry-count:
44+
- '0'
45+
x-stainless-runtime:
46+
- CPython
47+
x-stainless-runtime-version:
48+
- 3.12.9
49+
method: POST
50+
uri: https://api.openai.com/v1/chat/completions
51+
response:
52+
body:
53+
string: |+
54+
data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}
55+
56+
data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"content":"Atlantic"},"logprobs":null,"finish_reason":null}]}
57+
58+
data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]}
59+
60+
data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}
61+
62+
data: [DONE]
63+
64+
headers:
65+
CF-RAY:
66+
- 9236db6a7e55ed43-MXP
67+
Connection:
68+
- keep-alive
69+
Content-Type:
70+
- text/event-stream; charset=utf-8
71+
Date:
72+
- Thu, 20 Mar 2025 17:16:24 GMT
73+
Server:
74+
- cloudflare
75+
Set-Cookie: test_set_cookie
76+
Transfer-Encoding:
77+
- chunked
78+
X-Content-Type-Options:
79+
- nosniff
80+
access-control-expose-headers:
81+
- X-Request-ID
82+
alt-svc:
83+
- h3=":443"; ma=86400
84+
cf-cache-status:
85+
- DYNAMIC
86+
openai-organization: test_openai_org_id
87+
openai-processing-ms:
88+
- '284'
89+
openai-version:
90+
- '2020-10-01'
91+
strict-transport-security:
92+
- max-age=31536000; includeSubDomains; preload
93+
x-ratelimit-limit-requests:
94+
- '10000'
95+
x-ratelimit-limit-tokens:
96+
- '200000'
97+
x-ratelimit-remaining-requests:
98+
- '9998'
99+
x-ratelimit-remaining-tokens:
100+
- '199983'
101+
x-ratelimit-reset-requests:
102+
- 16.088s
103+
x-ratelimit-reset-tokens:
104+
- 5ms
105+
x-request-id:
106+
- req_d6f6a5d19533f6596e408dd665f07ec5
107+
status:
108+
code: 200
109+
message: OK
110+
version: 1

0 commit comments

Comments
 (0)