Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ def _uninstrument(self, **kwargs):
unwrap(openai.resources.embeddings.AsyncEmbeddings, "create")

def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
from openai._legacy_response import LegacyAPIResponse

logger.debug(f"{wrapped} kwargs: {kwargs}")

span_attributes = _get_attributes_from_wrapper(instance, kwargs)
Expand Down Expand Up @@ -195,6 +197,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
raise

is_raw_response = isinstance(result, LegacyAPIResponse)
if kwargs.get("stream"):
return StreamWrapper(
stream=result,
Expand All @@ -206,10 +209,14 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
start_time=start_time,
token_usage_metric=self.token_usage_metric,
operation_duration_metric=self.operation_duration_metric,
is_raw_response=is_raw_response,
)

logger.debug(f"openai.resources.chat.completions.Completions.create result: {result}")

# if the caller is using with_raw_response we need to parse the output to get the response class we expect
if is_raw_response:
result = result.parse()
response_attributes = _get_attributes_from_response(
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
)
Expand All @@ -233,6 +240,8 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
return result

async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
from openai._legacy_response import LegacyAPIResponse

logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create kwargs: {kwargs}")

span_attributes = _get_attributes_from_wrapper(instance, kwargs)
Expand Down Expand Up @@ -265,6 +274,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
_record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time)
raise

is_raw_response = isinstance(result, LegacyAPIResponse)
if kwargs.get("stream"):
return StreamWrapper(
stream=result,
Expand All @@ -276,10 +286,14 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
start_time=start_time,
token_usage_metric=self.token_usage_metric,
operation_duration_metric=self.operation_duration_metric,
is_raw_response=is_raw_response,
)

logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create result: {result}")

# if the caller is using with_raw_response we need to parse the output to get the response class we expect
if is_raw_response:
result = result.parse()
response_attributes = _get_attributes_from_response(
result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
from opentelemetry.trace import Span
from opentelemetry.trace.status import StatusCode
from opentelemetry.util.types import Attributes
from wrapt import ObjectProxy

EVENT_GEN_AI_CONTENT_COMPLETION = "gen_ai.content.completion"

logger = logging.getLogger(__name__)


class StreamWrapper:
class StreamWrapper(ObjectProxy):
def __init__(
self,
stream,
Expand All @@ -46,8 +47,11 @@ def __init__(
start_time: float,
token_usage_metric: Histogram,
operation_duration_metric: Histogram,
is_raw_response: bool,
):
self.stream = stream
# we need to wrap the original response even in case of raw_responses
super().__init__(stream)

self.span = span
self.span_attributes = span_attributes
self.capture_message_content = capture_message_content
Expand All @@ -56,6 +60,7 @@ def __init__(
self.token_usage_metric = token_usage_metric
self.operation_duration_metric = operation_duration_metric
self.start_time = start_time
self.is_raw_response = is_raw_response

self.response_id = None
self.model = None
Expand All @@ -64,8 +69,7 @@ def __init__(
self.service_tier = None

def end(self, exc=None):
# StopIteration is not an error, it signals that we have consumed all the stream
if exc is not None and not isinstance(exc, (StopIteration, StopAsyncIteration)):
if exc is not None:
self.span.set_status(StatusCode.ERROR, str(exc))
self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
self.span.end()
Expand Down Expand Up @@ -107,32 +111,28 @@ def process_chunk(self, chunk):
if hasattr(chunk, "service_tier"):
self.service_tier = chunk.service_tier

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.end(exc_value)

def __iter__(self):
return self

def __aiter__(self):
return self

def __next__(self):
stream = self.__wrapped__
try:
chunk = next(self.stream)
self.process_chunk(chunk)
return chunk
if self.is_raw_response:
stream = stream.parse()
for chunk in stream:
self.process_chunk(chunk)
yield chunk
except Exception as exc:
self.end(exc)
raise
self.end()

async def __anext__(self):
async def __aiter__(self):
stream = self.__wrapped__
try:
chunk = await self.stream.__anext__()
self.process_chunk(chunk)
return chunk
if self.is_raw_response:
stream = stream.parse()
async for chunk in stream:
self.process_chunk(chunk)
yield chunk
except Exception as exc:
self.end(exc)
raise
self.end()
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
interactions:
- request:
body: |-
{
"messages": [
{
"role": "user",
"content": "Answer in up to 3 words: Which ocean contains Bouvet Island?"
}
],
"model": "gpt-4o-mini",
"stream": true
}
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
authorization:
- Bearer test_openai_api_key
connection:
- keep-alive
content-length:
- '147'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.54.4
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.54.4
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |+
data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"content":"Atlantic"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}

data: [DONE]

headers:
CF-RAY:
- 9236db6a7e55ed43-MXP
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Thu, 20 Mar 2025 17:16:24 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization: test_openai_org_id
openai-processing-ms:
- '284'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '200000'
x-ratelimit-remaining-requests:
- '9998'
x-ratelimit-remaining-tokens:
- '199983'
x-ratelimit-reset-requests:
- 16.088s
x-ratelimit-reset-tokens:
- 5ms
x-request-id:
- req_d6f6a5d19533f6596e408dd665f07ec5
status:
code: 200
message: OK
version: 1
Loading
Loading