Skip to content

Commit 47e8356

Browse files
committed
openai: Add conditional span closure logic for async streaming completion
1 parent fc16998 commit 47e8356

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ def __iter__(self):
135135
if not self.in_context_manager:
136136
self.end()
137137

138+
async def __aenter__(self):
139+
# No difference in behavior between sync and async context manager
140+
return self.__enter__()
141+
142+
async def __aexit__(self, exc_type, exc_value, traceback):
143+
self.__exit__(exc_type, exc_value, traceback)
144+
138145
async def __aiter__(self):
139146
stream = self.__wrapped__
140147
try:
@@ -146,4 +153,6 @@ async def __aiter__(self):
146153
except Exception as exc:
147154
self.end(exc)
148155
raise
149-
self.end()
156+
# if we are inside a context manager we'll end when exiting it
157+
if not self.in_context_manager:
158+
self.end()

instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2152,6 +2152,67 @@ async def test_chat_async_stream(default_openai_env, trace_exporter, metrics_rea
21522152
)
21532153

21542154

2155+
@pytest.mark.vcr()
2156+
@pytest.mark.asyncio
2157+
async def test_chat_async_stream_with_context_manager(
2158+
default_openai_env, trace_exporter, metrics_reader, logs_exporter
2159+
):
2160+
client = openai.AsyncOpenAI()
2161+
2162+
messages = [
2163+
{
2164+
"role": "user",
2165+
"content": TEST_CHAT_INPUT,
2166+
}
2167+
]
2168+
2169+
# Use a context manager for the asynchronous streaming response
2170+
async with await client.chat.completions.create(
2171+
model=TEST_CHAT_MODEL, messages=messages, stream=True
2172+
) as chat_completion:
2173+
chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices]
2174+
assert "".join(chunks) == "South Atlantic Ocean."
2175+
2176+
spans = trace_exporter.get_finished_spans()
2177+
assert len(spans) == 1
2178+
2179+
span = spans[0]
2180+
assert span.name == f"chat {TEST_CHAT_MODEL}"
2181+
assert span.kind == SpanKind.CLIENT
2182+
assert span.status.status_code == StatusCode.UNSET
2183+
2184+
address, port = address_and_port(client)
2185+
assert dict(span.attributes) == {
2186+
GEN_AI_OPERATION_NAME: "chat",
2187+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
2188+
GEN_AI_SYSTEM: "openai",
2189+
GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL",
2190+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
2191+
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
2192+
SERVER_ADDRESS: address,
2193+
SERVER_PORT: port,
2194+
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
2195+
}
2196+
2197+
logs = logs_exporter.get_finished_logs()
2198+
assert len(logs) == 2
2199+
log_records = logrecords_from_logs(logs)
2200+
user_message, choice = log_records
2201+
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
2202+
assert dict(user_message.body) == {}
2203+
2204+
assert_stop_log_record(choice)
2205+
2206+
(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
2207+
attributes = {
2208+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
2209+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
2210+
}
2211+
assert_operation_duration_metric(
2212+
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
2213+
)
2214+
2215+
21552216
@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
21562217
@pytest.mark.vcr()
21572218
@pytest.mark.asyncio

0 commit comments

Comments
 (0)