Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,29 @@ async def __aiter__(self):
self.end(exc)
raise
self.end()

def parse(self):
"""
Handles direct parse() call on the client in order to maintain instrumentation on the parsed iterator.
"""
parsed_iterator = self.__wrapped__.parse()

parsed_wrapper = StreamWrapper(
stream=parsed_iterator,
span=self.span,
span_attributes=self.span_attributes,
capture_message_content=self.capture_message_content,
event_attributes=self.event_attributes,
event_logger=self.event_logger,
start_time=self.start_time,
token_usage_metric=self.token_usage_metric,
operation_duration_metric=self.operation_duration_metric,
# Crucially, mark the new wrapper as NOT raw after parsing
is_raw_response=False,
)

# Handle original sync/async iterators accordingly
if hasattr(parsed_iterator, "__aiter__"):
return parsed_wrapper.__aiter__()

return parsed_wrapper.__iter__()
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
interactions:
- request:
body: |-
{
"messages": [
{
"role": "user",
"content": "Answer in up to 3 words: Which ocean contains Bouvet Island?"
}
],
"model": "gpt-4o-mini",
"stream": true
}
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
authorization:
- Bearer test_openai_api_key
connection:
- keep-alive
content-length:
- '147'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.76.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.76.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.6
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |+
data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}

data: [DONE]

headers:
CF-RAY:
- 938698a5dfdcf41c-LHR
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Wed, 30 Apr 2025 11:11:05 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization: test_openai_org_id
openai-processing-ms:
- '112'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '200'
x-ratelimit-limit-tokens:
- '100000'
x-ratelimit-remaining-requests:
- '199'
x-ratelimit-remaining-tokens:
- '99962'
x-ratelimit-reset-requests:
- 7m12s
x-ratelimit-reset-tokens:
- 16m11.453s
x-request-id:
- req_859b175255c1479e917cced0197dfe32
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,68 @@ def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metri
)


@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
@pytest.mark.vcr()
def test_chat_stream_with_raw_response_parsed(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
client = openai.OpenAI()

messages = [
{
"role": "user",
"content": TEST_CHAT_INPUT,
}
]

raw_response = client.chat.completions.with_raw_response.create(
model=TEST_CHAT_MODEL, messages=messages, stream=True
)

# Explicit parse of the raw response
chat_completion = raw_response.parse()

chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices]
assert "".join(chunks) == "South Atlantic Ocean."

spans = trace_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == f"chat {TEST_CHAT_MODEL}"
assert span.kind == SpanKind.CLIENT
assert span.status.status_code == StatusCode.UNSET

address, port = address_and_port(client)
assert dict(span.attributes) == {
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
GEN_AI_OPERATION_NAME: "chat",
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_SYSTEM: "openai",
GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC",
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
SERVER_ADDRESS: address,
SERVER_PORT: port,
}

logs = logs_exporter.get_finished_logs()
assert len(logs) == 2
log_records = logrecords_from_logs(logs)
user_message, choice = log_records
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
assert dict(user_message.body) == {}

assert_stop_log_record(choice)

(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
attributes = {
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
}
assert_operation_duration_metric(
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
)


@pytest.mark.skipif(OPENAI_VERSION < (1, 35, 0), reason="service tier added in 1.35.0")
@pytest.mark.vcr()
def test_chat_stream_all_the_client_options(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
Expand Down Expand Up @@ -2273,6 +2335,71 @@ async def test_chat_async_stream_with_raw_response(default_openai_env, trace_exp
)


@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_chat_async_stream_with_raw_response_parsed(
default_openai_env, trace_exporter, metrics_reader, logs_exporter
):
client = openai.AsyncOpenAI()

messages = [
{
"role": "user",
"content": TEST_CHAT_INPUT,
}
]

raw_response = await client.chat.completions.with_raw_response.create(
model=TEST_CHAT_MODEL, messages=messages, stream=True
)

# Explicit parse of the raw response
chat_completion = raw_response.parse()

chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices]
assert "".join(chunks) == "South Atlantic Ocean."

spans = trace_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == f"chat {TEST_CHAT_MODEL}"
assert span.kind == SpanKind.CLIENT
assert span.status.status_code == StatusCode.UNSET

address, port = address_and_port(client)
assert dict(span.attributes) == {
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
GEN_AI_OPERATION_NAME: "chat",
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_SYSTEM: "openai",
GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC",
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
SERVER_ADDRESS: address,
SERVER_PORT: port,
}

logs = logs_exporter.get_finished_logs()
assert len(logs) == 2
log_records = logrecords_from_logs(logs)
user_message, choice = log_records
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
assert dict(user_message.body) == {}

assert_stop_log_record(choice)

(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
attributes = {
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
}
assert_operation_duration_metric(
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
)


@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_chat_async_stream_with_capture_message_content(
Expand Down