From 2cbf2ae0cc1ec7f9c5838c8298802c3755bc2641 Mon Sep 17 00:00:00 2001 From: Srdjan Lulic Date: Wed, 30 Apr 2025 13:36:05 +0100 Subject: [PATCH 1/3] openai: Fix missing parse attribute on StreamWrapper --- .../instrumentation/openai/wrappers.py | 26 ++++ ..._chat_stream_with_raw_response_parsed.yaml | 116 ++++++++++++++++ .../tests/test_chat_completions.py | 127 ++++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index 62ec1c6..bab341b 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -154,3 +154,29 @@ async def __aiter__(self): self.end(exc) raise self.end() + + def parse(self): + """ + Handles direct parse() call on the client in order to maintain instrumentation on the parsed iterator. + """ + parsed_iterator = self.__wrapped__.parse() + + parsed_wrapper = StreamWrapper( + stream=parsed_iterator, + span=self.span, + span_attributes=self.span_attributes, + capture_message_content=self.capture_message_content, + event_attributes=self.event_attributes, + event_logger=self.event_logger, + start_time=self.start_time, + token_usage_metric=self.token_usage_metric, + operation_duration_metric=self.operation_duration_metric, + # Crucially, mark the new wrapper as NOT raw after parsing + is_raw_response=False, + ) + + # Handle original sync/async iterators accordingly + if hasattr(parsed_iterator, "__aiter__"): + return parsed_wrapper.__aiter__() + + return parsed_wrapper.__iter__() diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml new file mode 100644 index 0000000..7281d5a --- /dev/null +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml @@ -0,0 +1,116 @@ +interactions: +- request: + body: |- + { + "messages": [ + { + "role": "user", + "content": "Answer in up to 3 words: Which ocean contains Bouvet Island?" + } + ], + "model": "gpt-4o-mini", + "stream": true + } + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + authorization: + - Bearer test_openai_api_key + connection: + - keep-alive + content-length: + - '147' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.76.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.76.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.6 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: |+ + data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} + + data: [DONE] + + headers: + CF-RAY: + - 938698a5dfdcf41c-LHR + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 30 Apr 2025 11:11:05 GMT + Server: + - cloudflare + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: test_openai_org_id + openai-processing-ms: + - '112' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-ratelimit-limit-requests: + - '200' + x-ratelimit-limit-tokens: + - '100000' + x-ratelimit-remaining-requests: + - '199' + x-ratelimit-remaining-tokens: + - '99962' + x-ratelimit-reset-requests: + - 7m12s + x-ratelimit-reset-tokens: + - 16m11.453s + x-request-id: + - req_859b175255c1479e917cced0197dfe32 + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index 5fbf820..6acb896 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -1218,6 +1218,68 @@ def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metri ) +@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") +@pytest.mark.vcr() +def test_chat_stream_with_raw_response_parsed(default_openai_env, trace_exporter, metrics_reader, logs_exporter): + client = openai.OpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + raw_response = client.chat.completions.with_raw_response.create( + model=TEST_CHAT_MODEL, messages=messages, stream=True + ) + + # Explicit parse of the raw response + chat_completion = raw_response.parse() + + chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices] + assert "".join(chunks) == "South Atlantic Ocean." + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + SERVER_ADDRESS: address, + SERVER_PORT: port, + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + (operation_duration_metric,) = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + + @pytest.mark.skipif(OPENAI_VERSION < (1, 35, 0), reason="service tier added in 1.35.0") @pytest.mark.vcr() def test_chat_stream_all_the_client_options(default_openai_env, trace_exporter, metrics_reader, logs_exporter): @@ -2273,6 +2335,71 @@ async def test_chat_async_stream_with_raw_response(default_openai_env, trace_exp ) +@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") +@pytest.mark.vcr() +@pytest.mark.asyncio +async def test_chat_async_stream_with_raw_response_parsed( + default_openai_env, trace_exporter, metrics_reader, logs_exporter +): + client = openai.AsyncOpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + raw_response = await client.chat.completions.with_raw_response.create( + model=TEST_CHAT_MODEL, messages=messages, stream=True + ) + + # Explicit parse of the raw response + chat_completion = raw_response.parse() + + chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices] + assert "".join(chunks) == "South Atlantic Ocean." + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + SERVER_ADDRESS: address, + SERVER_PORT: port, + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + (operation_duration_metric,) = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + + @pytest.mark.vcr() @pytest.mark.asyncio async def test_chat_async_stream_with_capture_message_content( From 01d1617226f5c97617d6e604e5b4bb5723966e37 Mon Sep 17 00:00:00 2001 From: Srdjan Lulic Date: Tue, 6 May 2025 14:14:30 +0100 Subject: [PATCH 2/3] openai: Remove reduntant unit tests for raw responses and add explicit parsing to existing tests --- ..._chat_stream_with_raw_response_parsed.yaml | 116 ---------------- .../tests/test_chat_completions.py | 129 +----------------- 2 files changed, 4 insertions(+), 241 deletions(-) delete mode 100644 instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml deleted file mode 100644 index 7281d5a..0000000 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response_parsed.yaml +++ /dev/null @@ -1,116 +0,0 @@ -interactions: -- request: - body: |- - { - "messages": [ - { - "role": "user", - "content": "Answer in up to 3 words: Which ocean contains Bouvet Island?" - } - ], - "model": "gpt-4o-mini", - "stream": true - } - headers: - accept: - - application/json - accept-encoding: - - gzip, deflate, zstd - authorization: - - Bearer test_openai_api_key - connection: - - keep-alive - content-length: - - '147' - content-type: - - application/json - host: - - api.openai.com - user-agent: - - OpenAI/Python 1.76.0 - x-stainless-arch: - - arm64 - x-stainless-async: - - 'false' - x-stainless-lang: - - python - x-stainless-os: - - MacOS - x-stainless-package-version: - - 1.76.0 - x-stainless-raw-response: - - 'true' - x-stainless-read-timeout: - - '600' - x-stainless-retry-count: - - '0' - x-stainless-runtime: - - CPython - x-stainless-runtime-version: - - 3.12.6 - method: POST - uri: https://api.openai.com/v1/chat/completions - response: - body: - string: |+ - data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} - - data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]} - - data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]} - - data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]} - - data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]} - - data: {"id":"chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC","object":"chat.completion.chunk","created":1746011465,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_0392822090","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} - - data: [DONE] - - headers: - CF-RAY: - - 938698a5dfdcf41c-LHR - Connection: - - keep-alive - Content-Type: - - text/event-stream; charset=utf-8 - Date: - - Wed, 30 Apr 2025 11:11:05 GMT - Server: - - cloudflare - Set-Cookie: test_set_cookie - Transfer-Encoding: - - chunked - X-Content-Type-Options: - - nosniff - access-control-expose-headers: - - X-Request-ID - alt-svc: - - h3=":443"; ma=86400 - cf-cache-status: - - DYNAMIC - openai-organization: test_openai_org_id - openai-processing-ms: - - '112' - openai-version: - - '2020-10-01' - strict-transport-security: - - max-age=31536000; includeSubDomains; preload - x-ratelimit-limit-requests: - - '200' - x-ratelimit-limit-tokens: - - '100000' - x-ratelimit-remaining-requests: - - '199' - x-ratelimit-remaining-tokens: - - '99962' - x-ratelimit-reset-requests: - - 7m12s - x-ratelimit-reset-tokens: - - 16m11.453s - x-request-id: - - req_859b175255c1479e917cced0197dfe32 - status: - code: 200 - message: OK -version: 1 diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index 6acb896..c1a3e8a 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -1171,65 +1171,6 @@ def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metri } ] - chat_completion = client.chat.completions.with_raw_response.create( - model=TEST_CHAT_MODEL, messages=messages, stream=True - ) - - chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices] - assert "".join(chunks) == "Atlantic Ocean" - - spans = trace_exporter.get_finished_spans() - assert len(spans) == 1 - - span = spans[0] - assert span.name == f"chat {TEST_CHAT_MODEL}" - assert span.kind == SpanKind.CLIENT - assert span.status.status_code == StatusCode.UNSET - - address, port = address_and_port(client) - assert dict(span.attributes) == { - GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", - GEN_AI_OPERATION_NAME: "chat", - GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, - GEN_AI_SYSTEM: "openai", - GEN_AI_RESPONSE_ID: "chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT", - GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, - GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), - SERVER_ADDRESS: address, - SERVER_PORT: port, - } - - logs = logs_exporter.get_finished_logs() - assert len(logs) == 2 - log_records = logrecords_from_logs(logs) - user_message, choice = log_records - assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} - assert dict(user_message.body) == {} - - assert_stop_log_record(choice) - - (operation_duration_metric,) = get_sorted_metrics(metrics_reader) - attributes = { - GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, - GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, - } - assert_operation_duration_metric( - client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 - ) - - -@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") -@pytest.mark.vcr() -def test_chat_stream_with_raw_response_parsed(default_openai_env, trace_exporter, metrics_reader, logs_exporter): - client = openai.OpenAI() - - messages = [ - { - "role": "user", - "content": TEST_CHAT_INPUT, - } - ] - raw_response = client.chat.completions.with_raw_response.create( model=TEST_CHAT_MODEL, messages=messages, stream=True ) @@ -1238,7 +1179,7 @@ def test_chat_stream_with_raw_response_parsed(default_openai_env, trace_exporter chat_completion = raw_response.parse() chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices] - assert "".join(chunks) == "South Atlantic Ocean." + assert "".join(chunks) == "Atlantic Ocean" spans = trace_exporter.get_finished_spans() assert len(spans) == 1 @@ -1254,7 +1195,7 @@ def test_chat_stream_with_raw_response_parsed(default_openai_env, trace_exporter GEN_AI_OPERATION_NAME: "chat", GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, GEN_AI_SYSTEM: "openai", - GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC", + GEN_AI_RESPONSE_ID: "chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT", GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), SERVER_ADDRESS: address, @@ -2288,68 +2229,6 @@ async def test_chat_async_stream_with_raw_response(default_openai_env, trace_exp } ] - chat_completion = await client.chat.completions.with_raw_response.create( - model=TEST_CHAT_MODEL, messages=messages, stream=True - ) - - chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices] - assert "".join(chunks) == "Atlantic Ocean" - - spans = trace_exporter.get_finished_spans() - assert len(spans) == 1 - - span = spans[0] - assert span.name == f"chat {TEST_CHAT_MODEL}" - assert span.kind == SpanKind.CLIENT - assert span.status.status_code == StatusCode.UNSET - - address, port = address_and_port(client) - assert dict(span.attributes) == { - GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", - GEN_AI_OPERATION_NAME: "chat", - GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, - GEN_AI_SYSTEM: "openai", - GEN_AI_RESPONSE_ID: "chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT", - GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, - GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), - SERVER_ADDRESS: address, - SERVER_PORT: port, - } - - logs = logs_exporter.get_finished_logs() - assert len(logs) == 2 - log_records = logrecords_from_logs(logs) - user_message, choice = log_records - assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} - assert dict(user_message.body) == {} - - assert_stop_log_record(choice) - - (operation_duration_metric,) = get_sorted_metrics(metrics_reader) - attributes = { - GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, - GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, - } - assert_operation_duration_metric( - client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 - ) - - -@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") -@pytest.mark.vcr() -@pytest.mark.asyncio -async def test_chat_async_stream_with_raw_response_parsed( - default_openai_env, trace_exporter, metrics_reader, logs_exporter -): - client = openai.AsyncOpenAI() - - messages = [ - { - "role": "user", - "content": TEST_CHAT_INPUT, - } - ] - raw_response = await client.chat.completions.with_raw_response.create( model=TEST_CHAT_MODEL, messages=messages, stream=True ) @@ -2358,7 +2237,7 @@ async def test_chat_async_stream_with_raw_response_parsed( chat_completion = raw_response.parse() chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices] - assert "".join(chunks) == "South Atlantic Ocean." + assert "".join(chunks) == "Atlantic Ocean" spans = trace_exporter.get_finished_spans() assert len(spans) == 1 @@ -2374,7 +2253,7 @@ async def test_chat_async_stream_with_raw_response_parsed( GEN_AI_OPERATION_NAME: "chat", GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, GEN_AI_SYSTEM: "openai", - GEN_AI_RESPONSE_ID: "chatcmpl-BRzdBETW1h4E9Vy0Se8CSvrYEXMtC", + GEN_AI_RESPONSE_ID: "chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT", GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), SERVER_ADDRESS: address, From 667e306f7018717cda465b7d4f244c19f0fbd3b3 Mon Sep 17 00:00:00 2001 From: Srdjan Lulic Date: Tue, 6 May 2025 14:35:35 +0100 Subject: [PATCH 3/3] openai: Remove is_raw_response attribute from StreamWrapper --- .../src/opentelemetry/instrumentation/openai/__init__.py | 6 ++---- .../src/opentelemetry/instrumentation/openai/wrappers.py | 8 -------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py index 973e6eb..4020097 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py @@ -196,7 +196,6 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time) raise - is_raw_response = _is_raw_response(result) if kwargs.get("stream"): return StreamWrapper( stream=result, @@ -208,12 +207,12 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): start_time=start_time, token_usage_metric=self.token_usage_metric, operation_duration_metric=self.operation_duration_metric, - is_raw_response=is_raw_response, ) logger.debug(f"openai.resources.chat.completions.Completions.create result: {result}") # if the caller is using with_raw_response we need to parse the output to get the response class we expect + is_raw_response = _is_raw_response(result) if is_raw_response: result = result.parse() response_attributes = _get_attributes_from_response( @@ -271,7 +270,6 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time) raise - is_raw_response = _is_raw_response(result) if kwargs.get("stream"): return StreamWrapper( stream=result, @@ -283,12 +281,12 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): start_time=start_time, token_usage_metric=self.token_usage_metric, operation_duration_metric=self.operation_duration_metric, - is_raw_response=is_raw_response, ) logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create result: {result}") # if the caller is using with_raw_response we need to parse the output to get the response class we expect + is_raw_response = _is_raw_response(result) if is_raw_response: result = result.parse() response_attributes = _get_attributes_from_response( diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index bab341b..22d431d 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -47,7 +47,6 @@ def __init__( start_time: float, token_usage_metric: Histogram, operation_duration_metric: Histogram, - is_raw_response: bool, ): # we need to wrap the original response even in case of raw_responses super().__init__(stream) @@ -60,7 +59,6 @@ def __init__( self.token_usage_metric = token_usage_metric self.operation_duration_metric = operation_duration_metric self.start_time = start_time - self.is_raw_response = is_raw_response self.response_id = None self.model = None @@ -125,8 +123,6 @@ def __exit__(self, exc_type, exc_value, traceback): def __iter__(self): stream = self.__wrapped__ try: - if self.is_raw_response: - stream = stream.parse() for chunk in stream: self.process_chunk(chunk) yield chunk @@ -145,8 +141,6 @@ async def __aexit__(self, exc_type, exc_value, traceback): async def __aiter__(self): stream = self.__wrapped__ try: - if self.is_raw_response: - stream = stream.parse() async for chunk in stream: self.process_chunk(chunk) yield chunk @@ -171,8 +165,6 @@ def parse(self): start_time=self.start_time, token_usage_metric=self.token_usage_metric, operation_duration_metric=self.operation_duration_metric, - # Crucially, mark the new wrapper as NOT raw after parsing - is_raw_response=False, ) # Handle original sync/async iterators accordingly