From fc16998cd683c4805859a10221df05113a0a0469 Mon Sep 17 00:00:00 2001 From: Srdjan Lulic Date: Mon, 21 Apr 2025 12:48:48 +0100 Subject: [PATCH 1/3] openai: Add conditional span closure logic depending on whether streamed completion uses context manager or not --- .../instrumentation/openai/wrappers.py | 13 +- ...test_chat_stream_with_context_manager.yaml | 114 ++++++++++++++++++ .../tests/test_chat_completions.py | 56 +++++++++ 3 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_context_manager.yaml diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index 8256822..0dc3caa 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -67,6 +67,7 @@ def __init__( self.choices = [] self.usage = None self.service_tier = None + self.in_context_manager = False def end(self, exc=None): if exc is not None: @@ -111,6 +112,14 @@ def process_chunk(self, chunk): if hasattr(chunk, "service_tier"): self.service_tier = chunk.service_tier + def __enter__(self): + # flag we are inside a context manager and want to follow its lifetime + self.in_context_manager = True + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.end(exc_value) + def __iter__(self): stream = self.__wrapped__ try: @@ -122,7 +131,9 @@ def __iter__(self): except Exception as exc: self.end(exc) raise - self.end() + # if we are inside a context manager we'll end when exiting it + if not self.in_context_manager: + self.end() async def __aiter__(self): stream = self.__wrapped__ diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_context_manager.yaml b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_context_manager.yaml new file mode 100644 index 0000000..0b1827d --- /dev/null +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_context_manager.yaml @@ -0,0 +1,114 @@ +interactions: +- request: + body: |- + { + "messages": [ + { + "role": "user", + "content": "Answer in up to 3 words: Which ocean contains Bouvet Island?" + } + ], + "model": "gpt-4o-mini", + "stream": true + } + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + authorization: + - Bearer test_openai_api_key + connection: + - keep-alive + content-length: + - '147' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.66.5 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.66.5 + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.6 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: |+ + data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} + + data: [DONE] + + headers: + CF-RAY: + - 933c86cb9ae5773e-LHR + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Mon, 21 Apr 2025 11:26:28 GMT + Server: + - cloudflare + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: test_openai_org_id + openai-processing-ms: + - '460' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-ratelimit-limit-requests: + - '200' + x-ratelimit-limit-tokens: + - '100000' + x-ratelimit-remaining-requests: + - '199' + x-ratelimit-remaining-tokens: + - '88447' + x-ratelimit-reset-requests: + - 7m12s + x-ratelimit-reset-tokens: + - 83h10m39.057s + x-request-id: + - req_a39b652ec0ccb45a968e10112e569bf4 + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index bc4fd54..d5ecedc 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -1103,6 +1103,62 @@ def test_chat_stream(default_openai_env, trace_exporter, metrics_reader, logs_ex ) +@pytest.mark.vcr() +def test_chat_stream_with_context_manager(default_openai_env, trace_exporter, metrics_reader, logs_exporter): + client = openai.OpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + # Use a context manager for the streaming response + with client.chat.completions.create(model=TEST_CHAT_MODEL, messages=messages, stream=True) as chat_completion: + chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices] + assert "".join(chunks) == "South Atlantic Ocean." + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + SERVER_ADDRESS: address, + SERVER_PORT: port, + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + (operation_duration_metric,) = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + + @pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") @pytest.mark.vcr() def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): From 47e83568331d0e16ae8bb0ed80a55cf9debe3831 Mon Sep 17 00:00:00 2001 From: Srdjan Lulic Date: Mon, 21 Apr 2025 15:20:35 +0100 Subject: [PATCH 2/3] openai: Add conditional span closure logic for async streaming completion --- .../instrumentation/openai/wrappers.py | 11 +++- .../tests/test_chat_completions.py | 61 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index 0dc3caa..d9bd3ab 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -135,6 +135,13 @@ def __iter__(self): if not self.in_context_manager: self.end() + async def __aenter__(self): + # No difference in behavior between sync and async context manager + return self.__enter__() + + async def __aexit__(self, exc_type, exc_value, traceback): + self.__exit__(exc_type, exc_value, traceback) + async def __aiter__(self): stream = self.__wrapped__ try: @@ -146,4 +153,6 @@ async def __aiter__(self): except Exception as exc: self.end(exc) raise - self.end() + # if we are inside a context manager we'll end when exiting it + if not self.in_context_manager: + self.end() diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index d5ecedc..5fbf820 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -2152,6 +2152,67 @@ async def test_chat_async_stream(default_openai_env, trace_exporter, metrics_rea ) +@pytest.mark.vcr() +@pytest.mark.asyncio +async def test_chat_async_stream_with_context_manager( + default_openai_env, trace_exporter, metrics_reader, logs_exporter +): + client = openai.AsyncOpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + # Use a context manager for the asynchronous streaming response + async with await client.chat.completions.create( + model=TEST_CHAT_MODEL, messages=messages, stream=True + ) as chat_completion: + chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices] + assert "".join(chunks) == "South Atlantic Ocean." + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + SERVER_ADDRESS: address, + SERVER_PORT: port, + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + (operation_duration_metric,) = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + + @pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") @pytest.mark.vcr() @pytest.mark.asyncio From a5e50003cfabec06c2794ff5e39d00af14e643da Mon Sep 17 00:00:00 2001 From: Srdjan Lulic Date: Tue, 22 Apr 2025 15:19:16 +0100 Subject: [PATCH 3/3] openai: Guard span ending based on the flag rather than the context manager --- .../instrumentation/openai/wrappers.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index d9bd3ab..62ec1c6 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -67,9 +67,13 @@ def __init__( self.choices = [] self.usage = None self.service_tier = None - self.in_context_manager = False + self.ended = False def end(self, exc=None): + if self.ended: + return + + self.ended = True if exc is not None: self.span.set_status(StatusCode.ERROR, str(exc)) self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__) @@ -113,8 +117,6 @@ def process_chunk(self, chunk): self.service_tier = chunk.service_tier def __enter__(self): - # flag we are inside a context manager and want to follow its lifetime - self.in_context_manager = True return self def __exit__(self, exc_type, exc_value, traceback): @@ -131,9 +133,7 @@ def __iter__(self): except Exception as exc: self.end(exc) raise - # if we are inside a context manager we'll end when exiting it - if not self.in_context_manager: - self.end() + self.end() async def __aenter__(self): # No difference in behavior between sync and async context manager @@ -153,6 +153,4 @@ async def __aiter__(self): except Exception as exc: self.end(exc) raise - # if we are inside a context manager we'll end when exiting it - if not self.in_context_manager: - self.end() + self.end()