Skip to content

Commit 2cbf2ae

Browse files
committed
openai: Fix missing parse attribute on StreamWrapper
1 parent 9b7281e commit 2cbf2ae

File tree

3 files changed

+269
-0
lines changed

3 files changed

+269
-0
lines changed

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,29 @@ async def __aiter__(self):
154154
self.end(exc)
155155
raise
156156
self.end()
157+
158+
def parse(self):
159+
"""
160+
Handles direct parse() call on the client in order to maintain instrumentation on the parsed iterator.
161+
"""
162+
parsed_iterator = self.__wrapped__.parse()
163+
164+
parsed_wrapper = StreamWrapper(
165+
stream=parsed_iterator,
166+
span=self.span,
167+
span_attributes=self.span_attributes,
168+
capture_message_content=self.capture_message_content,
169+
event_attributes=self.event_attributes,
170+
event_logger=self.event_logger,
171+
start_time=self.start_time,
172+
token_usage_metric=self.token_usage_metric,
173+
operation_duration_metric=self.operation_duration_metric,
174+
# Crucially, mark the new wrapper as NOT raw after parsing
175+
is_raw_response=False,
176+
)
177+
178+
# Handle original sync/async iterators accordingly
179+
if hasattr(parsed_iterator, "__aiter__"):
180+
return parsed_wrapper.__aiter__()
181+
182+
return parsed_wrapper.__iter__()
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
interactions:
2+
- request:
3+
body: |-
4+
{
5+
"messages": [
6+
{
7+
"role": "user",
8+
"content": "Answer in up to 3 words: Which ocean contains Bouvet Island?"
9+
}
10+
],
11+
"model": "gpt-4o-mini",
12+
"stream": true
13+
}
14+
headers:
15+
accept:
16+
- application/json
17+
accept-encoding:
18+
- gzip, deflate, zstd
19+
authorization:
20+
- Bearer test_openai_api_key
21+
connection:
22+
- keep-alive
23+
content-length:
24+
- '147'
25+
content-type:
26+
- application/json
27+
host:
28+
- api.openai.com
29+
user-agent:
30+
- OpenAI/Python 1.76.0
31+
x-stainless-arch:
32+
- arm64
33+
x-stainless-async:
34+
- 'false'
35+
x-stainless-lang:
36+
- python
37+
x-stainless-os:
38+
- MacOS
39+
x-stainless-package-version:
40+
- 1.76.0
41+
x-stainless-raw-response:
42+
- 'true'
43+
x-stainless-read-timeout:
44+
- '600'
45+
x-stainless-retry-count:
46+
- '0'
47+
x-stainless-runtime:
48+
- CPython
49+
x-stainless-runtime-version:
50+
- 3.12.6
51+
method: POST
52+
uri: https://api.openai.com/v1/chat/completions
53+
response:
54+
body:
55+
string: |+
56+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}
57+
58+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]}
59+
60+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]}
61+
62+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]}
63+
64+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]}
65+
66+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}
67+
68+
data: [DONE]
69+
70+
headers:
71+
CF-RAY:
72+
- 938698a5dfdcf41c-LHR
73+
Connection:
74+
- keep-alive
75+
Content-Type:
76+
- text/event-stream; charset=utf-8
77+
Date:
78+
- Wed, 30 Apr 2025 11:11:05 GMT
79+
Server:
80+
- cloudflare
81+
Set-Cookie: test_set_cookie
82+
Transfer-Encoding:
83+
- chunked
84+
X-Content-Type-Options:
85+
- nosniff
86+
access-control-expose-headers:
87+
- X-Request-ID
88+
alt-svc:
89+
- h3=":443"; ma=86400
90+
cf-cache-status:
91+
- DYNAMIC
92+
openai-organization: test_openai_org_id
93+
openai-processing-ms:
94+
- '112'
95+
openai-version:
96+
- '2020-10-01'
97+
strict-transport-security:
98+
- max-age=31536000; includeSubDomains; preload
99+
x-ratelimit-limit-requests:
100+
- '200'
101+
x-ratelimit-limit-tokens:
102+
- '100000'
103+
x-ratelimit-remaining-requests:
104+
- '199'
105+
x-ratelimit-remaining-tokens:
106+
- '99962'
107+
x-ratelimit-reset-requests:
108+
- 7m12s
109+
x-ratelimit-reset-tokens:
110+
- 16m11.453s
111+
x-request-id:
112+
- req_859b175255c1479e917cced0197dfe32
113+
status:
114+
code: 200
115+
message: OK
116+
version: 1

instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,6 +1218,68 @@ def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metri
12181218
)
12191219

12201220

1221+
@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
1222+
@pytest.mark.vcr()
1223+
def test_chat_stream_with_raw_response_parsed(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
1224+
client = openai.OpenAI()
1225+
1226+
messages = [
1227+
{
1228+
"role": "user",
1229+
"content": TEST_CHAT_INPUT,
1230+
}
1231+
]
1232+
1233+
raw_response = client.chat.completions.with_raw_response.create(
1234+
model=TEST_CHAT_MODEL, messages=messages, stream=True
1235+
)
1236+
1237+
# Explicit parse of the raw response
1238+
chat_completion = raw_response.parse()
1239+
1240+
chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices]
1241+
assert "".join(chunks) == "South Atlantic Ocean."
1242+
1243+
spans = trace_exporter.get_finished_spans()
1244+
assert len(spans) == 1
1245+
1246+
span = spans[0]
1247+
assert span.name == f"chat {TEST_CHAT_MODEL}"
1248+
assert span.kind == SpanKind.CLIENT
1249+
assert span.status.status_code == StatusCode.UNSET
1250+
1251+
address, port = address_and_port(client)
1252+
assert dict(span.attributes) == {
1253+
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
1254+
GEN_AI_OPERATION_NAME: "chat",
1255+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
1256+
GEN_AI_SYSTEM: "openai",
1257+
GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC",
1258+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
1259+
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
1260+
SERVER_ADDRESS: address,
1261+
SERVER_PORT: port,
1262+
}
1263+
1264+
logs = logs_exporter.get_finished_logs()
1265+
assert len(logs) == 2
1266+
log_records = logrecords_from_logs(logs)
1267+
user_message, choice = log_records
1268+
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
1269+
assert dict(user_message.body) == {}
1270+
1271+
assert_stop_log_record(choice)
1272+
1273+
(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
1274+
attributes = {
1275+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
1276+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
1277+
}
1278+
assert_operation_duration_metric(
1279+
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
1280+
)
1281+
1282+
12211283
@pytest.mark.skipif(OPENAI_VERSION < (1, 35, 0), reason="service tier added in 1.35.0")
12221284
@pytest.mark.vcr()
12231285
def test_chat_stream_all_the_client_options(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
@@ -2273,6 +2335,71 @@ async def test_chat_async_stream_with_raw_response(default_openai_env, trace_exp
22732335
)
22742336

22752337

2338+
@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
2339+
@pytest.mark.vcr()
2340+
@pytest.mark.asyncio
2341+
async def test_chat_async_stream_with_raw_response_parsed(
2342+
default_openai_env, trace_exporter, metrics_reader, logs_exporter
2343+
):
2344+
client = openai.AsyncOpenAI()
2345+
2346+
messages = [
2347+
{
2348+
"role": "user",
2349+
"content": TEST_CHAT_INPUT,
2350+
}
2351+
]
2352+
2353+
raw_response = await client.chat.completions.with_raw_response.create(
2354+
model=TEST_CHAT_MODEL, messages=messages, stream=True
2355+
)
2356+
2357+
# Explicit parse of the raw response
2358+
chat_completion = raw_response.parse()
2359+
2360+
chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices]
2361+
assert "".join(chunks) == "South Atlantic Ocean."
2362+
2363+
spans = trace_exporter.get_finished_spans()
2364+
assert len(spans) == 1
2365+
2366+
span = spans[0]
2367+
assert span.name == f"chat {TEST_CHAT_MODEL}"
2368+
assert span.kind == SpanKind.CLIENT
2369+
assert span.status.status_code == StatusCode.UNSET
2370+
2371+
address, port = address_and_port(client)
2372+
assert dict(span.attributes) == {
2373+
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
2374+
GEN_AI_OPERATION_NAME: "chat",
2375+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
2376+
GEN_AI_SYSTEM: "openai",
2377+
GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC",
2378+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
2379+
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
2380+
SERVER_ADDRESS: address,
2381+
SERVER_PORT: port,
2382+
}
2383+
2384+
logs = logs_exporter.get_finished_logs()
2385+
assert len(logs) == 2
2386+
log_records = logrecords_from_logs(logs)
2387+
user_message, choice = log_records
2388+
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
2389+
assert dict(user_message.body) == {}
2390+
2391+
assert_stop_log_record(choice)
2392+
2393+
(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
2394+
attributes = {
2395+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
2396+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
2397+
}
2398+
assert_operation_duration_metric(
2399+
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
2400+
)
2401+
2402+
22762403
@pytest.mark.vcr()
22772404
@pytest.mark.asyncio
22782405
async def test_chat_async_stream_with_capture_message_content(

0 commit comments

Comments
 (0)