Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ def __init__(
self.choices = []
self.usage = None
self.service_tier = None
self.ended = False

def end(self, exc=None):
if self.ended:
return

self.ended = True
if exc is not None:
self.span.set_status(StatusCode.ERROR, str(exc))
self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
Expand Down Expand Up @@ -111,6 +116,12 @@ def process_chunk(self, chunk):
if hasattr(chunk, "service_tier"):
self.service_tier = chunk.service_tier

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.end(exc_value)

def __iter__(self):
stream = self.__wrapped__
try:
Expand All @@ -124,6 +135,13 @@ def __iter__(self):
raise
self.end()
Copy link
Contributor

@anuraaga anuraaga Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, without the changes this doesn't get called in the new test cases? It's unexpected to me since they seem to consume the stream in a for loop so wondering if we know why it would be a problem.

A case that does intuitively seem to require this change is one that breaks during iteration

with response:
  for chunk in response:
    first = chunk
    break

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if that is a fail case, we may need to consume the stream in the exit methods to get the stop / token info.

Copy link
Contributor Author

@SrdjanLL SrdjanLL Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for probing into this!

TL;DR While digging further I think I found the most likely root cause of missing spans in our chatbot-rag-app and it's not multiple attempts to end a span, but default ObjectProxy behaviour when proxied object gets used as context manager. The fix inspired by the initial patch and refactor still applies, but it sort of solved the problem by chance.

Sorry for the verbose root causing bellow, but had to leave this somewhere (led by Riccardo's example 😄)


All right, so I dug a bit further into it and found that on the unit test this does get called (when not guarded) and the unit tests capture regression in terms of the span count, while we also get the warning for trying to close an already ended span. While a good regression to catch, it's not the what I set out to do with the change so thanks for drilling into this.

Also if that is a fail case, we may need to consume the stream in the exit methods to get the stop / token info.

That's an interesting one and raises a question of whether we'd want to capture the tokens before the exit or the token info that that application chooses to deal with or the token information from the stream response. That seems like a weird edge case though? And may not be the right thing to resolve as part of this PR.


Back to the original issue. Thanks to your comment I got to the point of debugging where I found __iter__ doesn't get invoked unless __enter__ and __exit__ methods were are added toStreamWrapper. Counterintuitively. No obvious clues on our end why these two different concepts have dependency.

My assumption is that somewhere downstream (langchain/openai) proxied object is used with context manager and not implementing enter/exit overrides on the StreamWrapper proxy seem to use proxied object's behaviour and not resulting in the generator invocation on the proxy. The closest thing I was able to find related to this for wrapt.ObjectProxy can be found here, which concerns over proxying aiohttp.request that uses (async) context manager and seems to be used by openai/langchain as well. I found relevant code on the BaseChatOpenAI that seems the most likely cause in our use case. Implementing context manager enter/exit on our StreamWrapper doesn't affect the behavior of the instrumented code, but it solves the problem of generator invocation.
The I added check the span count and fail with 0 spans when context manager is not implemented which should catch regressions on this in the future.

Seems like the initial patch from @xrmx fixed the span closure warnings and also resolved this issue, but it's good to know that what I explained above was causing missing traces and no re-attempting the span closure.

Either way, I will proceed with your suggestion to add ended flag and guard span ending in a single place instead as that seem to cover both the warning of attempt to end an already ended span and this weird regression.


async def __aenter__(self):
# No difference in behavior between sync and async context manager
return self.__enter__()

async def __aexit__(self, exc_type, exc_value, traceback):
self.__exit__(exc_type, exc_value, traceback)

async def __aiter__(self):
stream = self.__wrapped__
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
interactions:
- request:
body: |-
{
"messages": [
{
"role": "user",
"content": "Answer in up to 3 words: Which ocean contains Bouvet Island?"
}
],
"model": "gpt-4o-mini",
"stream": true
}
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
authorization:
- Bearer test_openai_api_key
connection:
- keep-alive
content-length:
- '147'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.66.5
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.66.5
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.6
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}

data: [DONE]

headers:
CF-RAY:
- 933c86cb9ae5773e-LHR
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 21 Apr 2025 11:26:28 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization: test_openai_org_id
openai-processing-ms:
- '460'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '200'
x-ratelimit-limit-tokens:
- '100000'
x-ratelimit-remaining-requests:
- '199'
x-ratelimit-remaining-tokens:
- '88447'
x-ratelimit-reset-requests:
- 7m12s
x-ratelimit-reset-tokens:
- 83h10m39.057s
x-request-id:
- req_a39b652ec0ccb45a968e10112e569bf4
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,62 @@ def test_chat_stream(default_openai_env, trace_exporter, metrics_reader, logs_ex
)


@pytest.mark.vcr()
def test_chat_stream_with_context_manager(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
client = openai.OpenAI()

messages = [
{
"role": "user",
"content": TEST_CHAT_INPUT,
}
]

# Use a context manager for the streaming response
with client.chat.completions.create(model=TEST_CHAT_MODEL, messages=messages, stream=True) as chat_completion:
chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices]
assert "".join(chunks) == "South Atlantic Ocean."

spans = trace_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == f"chat {TEST_CHAT_MODEL}"
assert span.kind == SpanKind.CLIENT
assert span.status.status_code == StatusCode.UNSET

address, port = address_and_port(client)
assert dict(span.attributes) == {
GEN_AI_OPERATION_NAME: "chat",
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_SYSTEM: "openai",
GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL",
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
SERVER_ADDRESS: address,
SERVER_PORT: port,
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
}

logs = logs_exporter.get_finished_logs()
assert len(logs) == 2
log_records = logrecords_from_logs(logs)
user_message, choice = log_records
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
assert dict(user_message.body) == {}

assert_stop_log_record(choice)

(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
attributes = {
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
}
assert_operation_duration_metric(
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
)


@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
@pytest.mark.vcr()
def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
Expand Down Expand Up @@ -2096,6 +2152,67 @@ async def test_chat_async_stream(default_openai_env, trace_exporter, metrics_rea
)


@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_chat_async_stream_with_context_manager(
default_openai_env, trace_exporter, metrics_reader, logs_exporter
):
client = openai.AsyncOpenAI()

messages = [
{
"role": "user",
"content": TEST_CHAT_INPUT,
}
]

# Use a context manager for the asynchronous streaming response
async with await client.chat.completions.create(
model=TEST_CHAT_MODEL, messages=messages, stream=True
) as chat_completion:
chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices]
assert "".join(chunks) == "South Atlantic Ocean."

spans = trace_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == f"chat {TEST_CHAT_MODEL}"
assert span.kind == SpanKind.CLIENT
assert span.status.status_code == StatusCode.UNSET

address, port = address_and_port(client)
assert dict(span.attributes) == {
GEN_AI_OPERATION_NAME: "chat",
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_SYSTEM: "openai",
GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL",
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
SERVER_ADDRESS: address,
SERVER_PORT: port,
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
}

logs = logs_exporter.get_finished_logs()
assert len(logs) == 2
log_records = logrecords_from_logs(logs)
user_message, choice = log_records
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
assert dict(user_message.body) == {}

assert_stop_log_record(choice)

(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
attributes = {
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
}
assert_operation_duration_metric(
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
)


@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
@pytest.mark.vcr()
@pytest.mark.asyncio
Expand Down