diff --git a/sentry_sdk/ai/monitoring.py b/sentry_sdk/ai/monitoring.py index 7a687736d0..e3f372c3ba 100644 --- a/sentry_sdk/ai/monitoring.py +++ b/sentry_sdk/ai/monitoring.py @@ -40,7 +40,7 @@ def sync_wrapped(*args, **kwargs): for k, v in kwargs.pop("sentry_data", {}).items(): span.set_data(k, v) if curr_pipeline: - span.set_data(SPANDATA.AI_PIPELINE_NAME, curr_pipeline) + span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, curr_pipeline) return f(*args, **kwargs) else: _ai_pipeline_name.set(description) @@ -69,7 +69,7 @@ async def async_wrapped(*args, **kwargs): for k, v in kwargs.pop("sentry_data", {}).items(): span.set_data(k, v) if curr_pipeline: - span.set_data(SPANDATA.AI_PIPELINE_NAME, curr_pipeline) + span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, curr_pipeline) return await f(*args, **kwargs) else: _ai_pipeline_name.set(description) @@ -108,7 +108,7 @@ def record_token_usage( # TODO: move pipeline name elsewhere ai_pipeline_name = get_ai_pipeline_name() if ai_pipeline_name: - span.set_data(SPANDATA.AI_PIPELINE_NAME, ai_pipeline_name) + span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, ai_pipeline_name) if input_tokens is not None: span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index a7e713dc0b..a82ff94c49 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -169,6 +169,7 @@ class SPANDATA: AI_PIPELINE_NAME = "ai.pipeline.name" """ Name of the AI pipeline or chain being executed. + DEPRECATED: Use GEN_AI_PIPELINE_NAME instead. Example: "qa-pipeline" """ @@ -229,6 +230,7 @@ class SPANDATA: AI_STREAMING = "ai.streaming" """ Whether or not the AI model call's response was streamed back asynchronously + DEPRECATED: Use GEN_AI_RESPONSE_STREAMING instead. Example: true """ @@ -372,6 +374,24 @@ class SPANDATA: Example: "chat" """ + GEN_AI_PIPELINE_NAME = "gen_ai.pipeline.name" + """ + Name of the AI pipeline or chain being executed. + Example: "qa-pipeline" + """ + + GEN_AI_RESPONSE_MODEL = "gen_ai.response.model" + """ + Exact model identifier used to generate the response + Example: gpt-4o-mini-2024-07-18 + """ + + GEN_AI_RESPONSE_STREAMING = "gen_ai.response.streaming" + """ + Whether or not the AI model call's response was streamed back asynchronously + Example: true + """ + GEN_AI_RESPONSE_TEXT = "gen_ai.response.text" """ The model's response text messages. @@ -411,7 +431,7 @@ class SPANDATA: GEN_AI_REQUEST_MODEL = "gen_ai.request.model" """ The model identifier being used for the request. - Example: "gpt-4-turbo-preview" + Example: "gpt-4-turbo" """ GEN_AI_REQUEST_PRESENCE_PENALTY = "gen_ai.request.presence_penalty" @@ -649,9 +669,11 @@ class OP: FUNCTION_AWS = "function.aws" FUNCTION_GCP = "function.gcp" GEN_AI_CHAT = "gen_ai.chat" + GEN_AI_EMBEDDINGS = "gen_ai.embeddings" GEN_AI_EXECUTE_TOOL = "gen_ai.execute_tool" GEN_AI_HANDOFF = "gen_ai.handoff" GEN_AI_INVOKE_AGENT = "gen_ai.invoke_agent" + GEN_AI_RESPONSES = "gen_ai.responses" GRAPHQL_EXECUTE = "graphql.execute" GRAPHQL_MUTATION = "graphql.mutation" GRAPHQL_PARSE = "graphql.parse" @@ -674,8 +696,6 @@ class OP: MIDDLEWARE_STARLITE = "middleware.starlite" MIDDLEWARE_STARLITE_RECEIVE = "middleware.starlite.receive" MIDDLEWARE_STARLITE_SEND = "middleware.starlite.send" - OPENAI_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.openai" - OPENAI_EMBEDDINGS_CREATE = "ai.embeddings.create.openai" HUGGINGFACE_HUB_CHAT_COMPLETIONS_CREATE = ( "ai.chat_completions.create.huggingface_hub" ) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index d906a8e0b2..78fcdd49e2 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -10,6 +10,7 @@ from sentry_sdk.utils import ( capture_internal_exceptions, event_from_exception, + safe_serialize, ) from typing import TYPE_CHECKING @@ -27,6 +28,14 @@ except ImportError: raise DidNotEnable("OpenAI not installed") +RESPONSES_API_ENABLED = True +try: + # responses API support was introduced in v1.66.0 + from openai.resources.responses import Responses, AsyncResponses + from openai.types.responses.response_completed_event import ResponseCompletedEvent +except ImportError: + RESPONSES_API_ENABLED = False + class OpenAIIntegration(Integration): identifier = "openai" @@ -46,13 +55,17 @@ def __init__(self, include_prompts=True, tiktoken_encoding_name=None): def setup_once(): # type: () -> None Completions.create = _wrap_chat_completion_create(Completions.create) - Embeddings.create = _wrap_embeddings_create(Embeddings.create) - AsyncCompletions.create = _wrap_async_chat_completion_create( AsyncCompletions.create ) + + Embeddings.create = _wrap_embeddings_create(Embeddings.create) AsyncEmbeddings.create = _wrap_async_embeddings_create(AsyncEmbeddings.create) + if RESPONSES_API_ENABLED: + Responses.create = _wrap_responses_create(Responses.create) + AsyncResponses.create = _wrap_async_responses_create(AsyncResponses.create) + def count_tokens(self, s): # type: (OpenAIIntegration, str) -> int if self.tiktoken_encoding is not None: @@ -62,6 +75,12 @@ def count_tokens(self, s): def _capture_exception(exc): # type: (Any) -> None + # Close an eventually open span + # We need to do this by hand because we are not using the start_span context manager + current_span = sentry_sdk.get_current_span() + if current_span is not None: + current_span.__exit__(None, None, None) + event, hint = event_from_exception( exc, client_options=sentry_sdk.get_client().options, @@ -81,7 +100,7 @@ def _get_usage(usage, names): def _calculate_token_usage( messages, response, span, streaming_message_responses, count_tokens ): - # type: (Iterable[ChatCompletionMessageParam], Any, Span, Optional[List[str]], Callable[..., Any]) -> None + # type: (Optional[Iterable[ChatCompletionMessageParam]], Any, Span, Optional[List[str]], Callable[..., Any]) -> None input_tokens = 0 # type: Optional[int] input_tokens_cached = 0 # type: Optional[int] output_tokens = 0 # type: Optional[int] @@ -106,13 +125,13 @@ def _calculate_token_usage( total_tokens = _get_usage(response.usage, ["total_tokens"]) # Manually count tokens - # TODO: when implementing responses API, check for responses API if input_tokens == 0: - for message in messages: - if "content" in message: + for message in messages or []: + if isinstance(message, dict) and "content" in message: input_tokens += count_tokens(message["content"]) + elif isinstance(message, str): + input_tokens += count_tokens(message) - # TODO: when implementing responses API, check for responses API if output_tokens == 0: if streaming_message_responses is not None: for message in streaming_message_responses: @@ -139,138 +158,254 @@ def _calculate_token_usage( ) -def _new_chat_completion_common(f, *args, **kwargs): - # type: (Any, *Any, **Any) -> Any - integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) - if integration is None: - return f(*args, **kwargs) +def _set_input_data(span, kwargs, operation, integration): + # type: (Span, dict[str, Any], str, OpenAIIntegration) -> None + # Input messages (the prompt or data sent to the model) + messages = kwargs.get("messages") + if messages is None: + messages = kwargs.get("input") + + if isinstance(messages, str): + messages = [messages] + + if ( + messages is not None + and len(messages) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized(span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages) + + # Input attributes: Common + set_data_normalized(span, SPANDATA.GEN_AI_SYSTEM, "openai") + set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, operation) + + # Input attributes: Optional + kwargs_keys_to_attributes = { + "model": SPANDATA.GEN_AI_REQUEST_MODEL, + "stream": SPANDATA.GEN_AI_RESPONSE_STREAMING, + "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, + "presence_penalty": SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY, + "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY, + "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE, + "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P, + } + for key, attribute in kwargs_keys_to_attributes.items(): + value = kwargs.get(key) + if value is not None: + set_data_normalized(span, attribute, value) + + # Input attributes: Tools + tools = kwargs.get("tools") + if tools is not None and len(tools) > 0: + set_data_normalized( + span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools) + ) - if "messages" not in kwargs: - # invalid call (in all versions of openai), let it return error - return f(*args, **kwargs) - try: - iter(kwargs["messages"]) - except TypeError: - # invalid call (in all versions), messages must be iterable - return f(*args, **kwargs) +def _set_output_data(span, response, kwargs, integration, finish_span=True): + # type: (Span, Any, dict[str, Any], OpenAIIntegration, bool) -> None + if hasattr(response, "model"): + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_MODEL, response.model) - kwargs["messages"] = list(kwargs["messages"]) - messages = kwargs["messages"] - model = kwargs.get("model") - streaming = kwargs.get("stream") - - span = sentry_sdk.start_span( - op=consts.OP.OPENAI_CHAT_COMPLETIONS_CREATE, - name="Chat Completion", - origin=OpenAIIntegration.origin, - ) - span.__enter__() + # Input messages (the prompt or data sent to the model) + # used for the token usage calculation + messages = kwargs.get("messages") + if messages is None: + messages = kwargs.get("input") - res = yield f, args, kwargs + if messages is not None and isinstance(messages, str): + messages = [messages] - with capture_internal_exceptions(): + if hasattr(response, "choices"): if should_send_default_pii() and integration.include_prompts: - set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, messages) - - set_data_normalized(span, SPANDATA.AI_MODEL_ID, model) - set_data_normalized(span, SPANDATA.AI_STREAMING, streaming) + response_text = [choice.message.dict() for choice in response.choices] + if len(response_text) > 0: + set_data_normalized( + span, + SPANDATA.GEN_AI_RESPONSE_TEXT, + safe_serialize(response_text), + ) + _calculate_token_usage(messages, response, span, None, integration.count_tokens) + if finish_span: + span.__exit__(None, None, None) - if hasattr(res, "choices"): - if should_send_default_pii() and integration.include_prompts: + elif hasattr(response, "output"): + if should_send_default_pii() and integration.include_prompts: + response_text = [item.to_dict() for item in response.output] + if len(response_text) > 0: set_data_normalized( span, - SPANDATA.AI_RESPONSES, - list(map(lambda x: x.message, res.choices)), + SPANDATA.GEN_AI_RESPONSE_TEXT, + safe_serialize(response_text), ) - _calculate_token_usage(messages, res, span, None, integration.count_tokens) + _calculate_token_usage(messages, response, span, None, integration.count_tokens) + if finish_span: span.__exit__(None, None, None) - elif hasattr(res, "_iterator"): - data_buf: list[list[str]] = [] # one for each choice - - old_iterator = res._iterator - - def new_iterator(): - # type: () -> Iterator[ChatCompletionChunk] - with capture_internal_exceptions(): - for x in old_iterator: - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - yield x - if len(data_buf) > 0: - all_responses = list( - map(lambda chunk: "".join(chunk), data_buf) + + elif hasattr(response, "_iterator"): + data_buf: list[list[str]] = [] # one for each choice + + old_iterator = response._iterator + + def new_iterator(): + # type: () -> Iterator[ChatCompletionChunk] + with capture_internal_exceptions(): + count_tokens_manually = True + for x in old_iterator: + # OpenAI chat completion API + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr( + choice.delta, "content" + ): + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + + # OpenAI responses API + elif hasattr(x, "delta"): + if len(data_buf) == 0: + data_buf.append([]) + data_buf[0].append(x.delta or "") + + # OpenAI responses API end of streaming response + if RESPONSES_API_ENABLED and isinstance(x, ResponseCompletedEvent): + _calculate_token_usage( + messages, + x.response, + span, + None, + integration.count_tokens, ) - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.AI_RESPONSES, all_responses - ) + count_tokens_manually = False + + yield x + + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses + ) + if count_tokens_manually: _calculate_token_usage( messages, - res, + response, span, all_responses, integration.count_tokens, ) + + if finish_span: span.__exit__(None, None, None) - async def new_iterator_async(): - # type: () -> AsyncIterator[ChatCompletionChunk] - with capture_internal_exceptions(): - async for x in old_iterator: - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - yield x - if len(data_buf) > 0: - all_responses = list( - map(lambda chunk: "".join(chunk), data_buf) + async def new_iterator_async(): + # type: () -> AsyncIterator[ChatCompletionChunk] + with capture_internal_exceptions(): + count_tokens_manually = True + async for x in old_iterator: + # OpenAI chat completion API + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr( + choice.delta, "content" + ): + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + + # OpenAI responses API + elif hasattr(x, "delta"): + if len(data_buf) == 0: + data_buf.append([]) + data_buf[0].append(x.delta or "") + + # OpenAI responses API end of streaming response + if RESPONSES_API_ENABLED and isinstance(x, ResponseCompletedEvent): + _calculate_token_usage( + messages, + x.response, + span, + None, + integration.count_tokens, + ) + count_tokens_manually = False + + yield x + + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses ) - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.AI_RESPONSES, all_responses - ) + if count_tokens_manually: _calculate_token_usage( messages, - res, + response, span, all_responses, integration.count_tokens, ) + if finish_span: span.__exit__(None, None, None) - if str(type(res._iterator)) == "": - res._iterator = new_iterator_async() - else: - res._iterator = new_iterator() - + if str(type(response._iterator)) == "": + response._iterator = new_iterator_async() else: - set_data_normalized(span, "unknown_response", True) + response._iterator = new_iterator() + else: + _calculate_token_usage(messages, response, span, None, integration.count_tokens) + if finish_span: span.__exit__(None, None, None) - return res + + +def _new_chat_completion_common(f, *args, **kwargs): + # type: (Any, Any, Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return f(*args, **kwargs) + + if "messages" not in kwargs: + # invalid call (in all versions of openai), let it return error + return f(*args, **kwargs) + + try: + iter(kwargs["messages"]) + except TypeError: + # invalid call (in all versions), messages must be iterable + return f(*args, **kwargs) + + model = kwargs.get("model") + operation = "chat" + + span = sentry_sdk.start_span( + op=consts.OP.GEN_AI_CHAT, + name=f"{operation} {model}", + origin=OpenAIIntegration.origin, + ) + span.__enter__() + + _set_input_data(span, kwargs, operation, integration) + + response = yield f, args, kwargs + + _set_output_data(span, response, kwargs, integration, finish_span=True) + + return response def _wrap_chat_completion_create(f): # type: (Callable[..., Any]) -> Callable[..., Any] def _execute_sync(f, *args, **kwargs): - # type: (Any, *Any, **Any) -> Any + # type: (Any, Any, Any) -> Any gen = _new_chat_completion_common(f, *args, **kwargs) try: @@ -291,7 +426,7 @@ def _execute_sync(f, *args, **kwargs): @wraps(f) def _sentry_patched_create_sync(*args, **kwargs): - # type: (*Any, **Any) -> Any + # type: (Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) if integration is None or "messages" not in kwargs: # no "messages" means invalid call (in all versions of openai), let it return error @@ -305,7 +440,7 @@ def _sentry_patched_create_sync(*args, **kwargs): def _wrap_async_chat_completion_create(f): # type: (Callable[..., Any]) -> Callable[..., Any] async def _execute_async(f, *args, **kwargs): - # type: (Any, *Any, **Any) -> Any + # type: (Any, Any, Any) -> Any gen = _new_chat_completion_common(f, *args, **kwargs) try: @@ -326,7 +461,7 @@ async def _execute_async(f, *args, **kwargs): @wraps(f) async def _sentry_patched_create_async(*args, **kwargs): - # type: (*Any, **Any) -> Any + # type: (Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) if integration is None or "messages" not in kwargs: # no "messages" means invalid call (in all versions of openai), let it return error @@ -338,52 +473,24 @@ async def _sentry_patched_create_async(*args, **kwargs): def _new_embeddings_create_common(f, *args, **kwargs): - # type: (Any, *Any, **Any) -> Any + # type: (Any, Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) if integration is None: return f(*args, **kwargs) + model = kwargs.get("model") + operation = "embeddings" + with sentry_sdk.start_span( - op=consts.OP.OPENAI_EMBEDDINGS_CREATE, - description="OpenAI Embedding Creation", + op=consts.OP.GEN_AI_EMBEDDINGS, + name=f"{operation} {model}", origin=OpenAIIntegration.origin, ) as span: - if "input" in kwargs and ( - should_send_default_pii() and integration.include_prompts - ): - if isinstance(kwargs["input"], str): - set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, [kwargs["input"]]) - elif ( - isinstance(kwargs["input"], list) - and len(kwargs["input"]) > 0 - and isinstance(kwargs["input"][0], str) - ): - set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, kwargs["input"]) - if "model" in kwargs: - set_data_normalized(span, SPANDATA.AI_MODEL_ID, kwargs["model"]) + _set_input_data(span, kwargs, operation, integration) response = yield f, args, kwargs - input_tokens = 0 - total_tokens = 0 - if hasattr(response, "usage"): - if hasattr(response.usage, "prompt_tokens") and isinstance( - response.usage.prompt_tokens, int - ): - input_tokens = response.usage.prompt_tokens - if hasattr(response.usage, "total_tokens") and isinstance( - response.usage.total_tokens, int - ): - total_tokens = response.usage.total_tokens - - if input_tokens == 0: - input_tokens = integration.count_tokens(kwargs["input"] or "") - - record_token_usage( - span, - input_tokens=input_tokens, - total_tokens=total_tokens or input_tokens, - ) + _set_output_data(span, response, kwargs, integration, finish_span=False) return response @@ -391,7 +498,7 @@ def _new_embeddings_create_common(f, *args, **kwargs): def _wrap_embeddings_create(f): # type: (Any) -> Any def _execute_sync(f, *args, **kwargs): - # type: (Any, *Any, **Any) -> Any + # type: (Any, Any, Any) -> Any gen = _new_embeddings_create_common(f, *args, **kwargs) try: @@ -412,7 +519,7 @@ def _execute_sync(f, *args, **kwargs): @wraps(f) def _sentry_patched_create_sync(*args, **kwargs): - # type: (*Any, **Any) -> Any + # type: (Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) if integration is None: return f(*args, **kwargs) @@ -425,7 +532,7 @@ def _sentry_patched_create_sync(*args, **kwargs): def _wrap_async_embeddings_create(f): # type: (Any) -> Any async def _execute_async(f, *args, **kwargs): - # type: (Any, *Any, **Any) -> Any + # type: (Any, Any, Any) -> Any gen = _new_embeddings_create_common(f, *args, **kwargs) try: @@ -446,7 +553,7 @@ async def _execute_async(f, *args, **kwargs): @wraps(f) async def _sentry_patched_create_async(*args, **kwargs): - # type: (*Any, **Any) -> Any + # type: (Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) if integration is None: return await f(*args, **kwargs) @@ -454,3 +561,96 @@ async def _sentry_patched_create_async(*args, **kwargs): return await _execute_async(f, *args, **kwargs) return _sentry_patched_create_async + + +def _new_responses_create_common(f, *args, **kwargs): + # type: (Any, Any, Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return f(*args, **kwargs) + + model = kwargs.get("model") + operation = "responses" + + span = sentry_sdk.start_span( + op=consts.OP.GEN_AI_RESPONSES, + name=f"{operation} {model}", + origin=OpenAIIntegration.origin, + ) + span.__enter__() + + _set_input_data(span, kwargs, operation, integration) + + response = yield f, args, kwargs + + _set_output_data(span, response, kwargs, integration, finish_span=True) + + return response + + +def _wrap_responses_create(f): + # type: (Any) -> Any + def _execute_sync(f, *args, **kwargs): + # type: (Any, Any, Any) -> Any + gen = _new_responses_create_common(f, *args, **kwargs) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return e.value + + try: + try: + result = f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + raise e from None + + return gen.send(result) + except StopIteration as e: + return e.value + + @wraps(f) + def _sentry_patched_create_sync(*args, **kwargs): + # type: (Any, Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return f(*args, **kwargs) + + return _execute_sync(f, *args, **kwargs) + + return _sentry_patched_create_sync + + +def _wrap_async_responses_create(f): + # type: (Any) -> Any + async def _execute_async(f, *args, **kwargs): + # type: (Any, Any, Any) -> Any + gen = _new_responses_create_common(f, *args, **kwargs) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return await e.value + + try: + try: + result = await f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + raise e from None + + return gen.send(result) + except StopIteration as e: + return e.value + + @wraps(f) + async def _sentry_patched_responses_async(*args, **kwargs): + # type: (Any, Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return await f(*args, **kwargs) + + return await _execute_async(f, *args, **kwargs) + + return _sentry_patched_responses_async diff --git a/sentry_sdk/integrations/openai_agents/utils.py b/sentry_sdk/integrations/openai_agents/utils.py index dc66521c83..1525346726 100644 --- a/sentry_sdk/integrations/openai_agents/utils.py +++ b/sentry_sdk/integrations/openai_agents/utils.py @@ -1,16 +1,14 @@ -import json import sentry_sdk from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.scope import should_send_default_pii -from sentry_sdk.utils import event_from_exception +from sentry_sdk.utils import event_from_exception, safe_serialize from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any from typing import Callable - from typing import Union from agents import Usage try: @@ -162,49 +160,3 @@ def _set_output_data(span, result): span.set_data( SPANDATA.GEN_AI_RESPONSE_TEXT, safe_serialize(output_messages["response"]) ) - - -def safe_serialize(data): - # type: (Any) -> str - """Safely serialize to a readable string.""" - - def serialize_item(item): - # type: (Any) -> Union[str, dict[Any, Any], list[Any], tuple[Any, ...]] - if callable(item): - try: - module = getattr(item, "__module__", None) - qualname = getattr(item, "__qualname__", None) - name = getattr(item, "__name__", "anonymous") - - if module and qualname: - full_path = f"{module}.{qualname}" - elif module and name: - full_path = f"{module}.{name}" - else: - full_path = name - - return f"" - except Exception: - return f"" - elif isinstance(item, dict): - return {k: serialize_item(v) for k, v in item.items()} - elif isinstance(item, (list, tuple)): - return [serialize_item(x) for x in item] - elif hasattr(item, "__dict__"): - try: - attrs = { - k: serialize_item(v) - for k, v in vars(item).items() - if not k.startswith("_") - } - return f"<{type(item).__name__} {attrs}>" - except Exception: - return repr(item) - else: - return item - - try: - serialized = serialize_item(data) - return json.dumps(serialized, default=str) - except Exception: - return str(data) diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index 3b0ab8d746..9c6f2cfc3b 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -1938,3 +1938,49 @@ def try_convert(convert_func, value): return convert_func(value) except Exception: return None + + +def safe_serialize(data): + # type: (Any) -> str + """Safely serialize to a readable string.""" + + def serialize_item(item): + # type: (Any) -> Union[str, dict[Any, Any], list[Any], tuple[Any, ...]] + if callable(item): + try: + module = getattr(item, "__module__", None) + qualname = getattr(item, "__qualname__", None) + name = getattr(item, "__name__", "anonymous") + + if module and qualname: + full_path = f"{module}.{qualname}" + elif module and name: + full_path = f"{module}.{name}" + else: + full_path = name + + return f"" + except Exception: + return f"" + elif isinstance(item, dict): + return {k: serialize_item(v) for k, v in item.items()} + elif isinstance(item, (list, tuple)): + return [serialize_item(x) for x in item] + elif hasattr(item, "__dict__"): + try: + attrs = { + k: serialize_item(v) + for k, v in vars(item).items() + if not k.startswith("_") + } + return f"<{type(item).__name__} {attrs}>" + except Exception: + return repr(item) + else: + return item + + try: + serialized = serialize_item(data) + return json.dumps(serialized, default=str) + except Exception: + return str(data) diff --git a/tests/integrations/openai/test_openai.py b/tests/integrations/openai/test_openai.py index ac6d9f4c29..dfac08d762 100644 --- a/tests/integrations/openai/test_openai.py +++ b/tests/integrations/openai/test_openai.py @@ -1,3 +1,4 @@ +import json import pytest from openai import AsyncOpenAI, OpenAI, AsyncStream, Stream, OpenAIError from openai.types import CompletionUsage, CreateEmbeddingResponse, Embedding @@ -6,6 +7,25 @@ from openai.types.chat.chat_completion_chunk import ChoiceDelta, Choice as DeltaChoice from openai.types.create_embedding_response import Usage as EmbeddingTokenUsage +SKIP_RESPONSES_TESTS = False + +try: + from openai.types.responses.response_completed_event import ResponseCompletedEvent + from openai.types.responses.response_created_event import ResponseCreatedEvent + from openai.types.responses.response_text_delta_event import ResponseTextDeltaEvent + from openai.types.responses.response_usage import ( + InputTokensDetails, + OutputTokensDetails, + ) + from openai.types.responses import ( + Response, + ResponseUsage, + ResponseOutputMessage, + ResponseOutputText, + ) +except ImportError: + SKIP_RESPONSES_TESTS = True + from sentry_sdk import start_transaction from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations.openai import ( @@ -36,7 +56,7 @@ async def __call__(self, *args, **kwargs): ) ], created=10000000, - model="model-id", + model="response-model-id", object="chat.completion", usage=CompletionUsage( completion_tokens=10, @@ -46,6 +66,46 @@ async def __call__(self, *args, **kwargs): ) +if SKIP_RESPONSES_TESTS: + EXAMPLE_RESPONSE = None +else: + EXAMPLE_RESPONSE = Response( + id="chat-id", + output=[ + ResponseOutputMessage( + id="message-id", + content=[ + ResponseOutputText( + annotations=[], + text="the model response", + type="output_text", + ), + ], + role="assistant", + status="completed", + type="message", + ), + ], + parallel_tool_calls=False, + tool_choice="none", + tools=[], + created_at=10000000, + model="response-model-id", + object="response", + usage=ResponseUsage( + input_tokens=20, + input_tokens_details=InputTokensDetails( + cached_tokens=5, + ), + output_tokens=10, + output_tokens_details=OutputTokensDetails( + reasoning_tokens=8, + ), + total_tokens=30, + ), + ) + + async def async_iterator(values): for value in values: yield value @@ -81,14 +141,17 @@ def test_nonstreaming_chat_completion( tx = events[0] assert tx["type"] == "transaction" span = tx["spans"][0] - assert span["op"] == "ai.chat_completions.create.openai" + assert span["op"] == "gen_ai.chat" if send_default_pii and include_prompts: - assert "hello" in span["data"][SPANDATA.AI_INPUT_MESSAGES]["content"] - assert "the model response" in span["data"][SPANDATA.AI_RESPONSES]["content"] + assert "hello" in span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]["content"] + assert ( + "the model response" + in json.loads(span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT])[0]["content"] + ) else: - assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] - assert SPANDATA.AI_RESPONSES not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] assert span["data"]["gen_ai.usage.output_tokens"] == 10 assert span["data"]["gen_ai.usage.input_tokens"] == 20 @@ -123,14 +186,17 @@ async def test_nonstreaming_chat_completion_async( tx = events[0] assert tx["type"] == "transaction" span = tx["spans"][0] - assert span["op"] == "ai.chat_completions.create.openai" + assert span["op"] == "gen_ai.chat" if send_default_pii and include_prompts: - assert "hello" in span["data"][SPANDATA.AI_INPUT_MESSAGES]["content"] - assert "the model response" in span["data"][SPANDATA.AI_RESPONSES]["content"] + assert "hello" in span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]["content"] + assert ( + "the model response" + in json.loads(span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT])[0]["content"] + ) else: - assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] - assert SPANDATA.AI_RESPONSES not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] assert span["data"]["gen_ai.usage.output_tokens"] == 10 assert span["data"]["gen_ai.usage.input_tokens"] == 20 @@ -216,14 +282,14 @@ def test_streaming_chat_completion( tx = events[0] assert tx["type"] == "transaction" span = tx["spans"][0] - assert span["op"] == "ai.chat_completions.create.openai" + assert span["op"] == "gen_ai.chat" if send_default_pii and include_prompts: - assert "hello" in span["data"][SPANDATA.AI_INPUT_MESSAGES]["content"] - assert "hello world" in span["data"][SPANDATA.AI_RESPONSES] + assert "hello" in span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]["content"] + assert "hello world" in span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] else: - assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] - assert SPANDATA.AI_RESPONSES not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] try: import tiktoken # type: ignore # noqa # pylint: disable=unused-import @@ -312,14 +378,14 @@ async def test_streaming_chat_completion_async( tx = events[0] assert tx["type"] == "transaction" span = tx["spans"][0] - assert span["op"] == "ai.chat_completions.create.openai" + assert span["op"] == "gen_ai.chat" if send_default_pii and include_prompts: - assert "hello" in span["data"][SPANDATA.AI_INPUT_MESSAGES]["content"] - assert "hello world" in span["data"][SPANDATA.AI_RESPONSES] + assert "hello" in span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]["content"] + assert "hello world" in span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] else: - assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] - assert SPANDATA.AI_RESPONSES not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] try: import tiktoken # type: ignore # noqa # pylint: disable=unused-import @@ -403,11 +469,11 @@ def test_embeddings_create( tx = events[0] assert tx["type"] == "transaction" span = tx["spans"][0] - assert span["op"] == "ai.embeddings.create.openai" + assert span["op"] == "gen_ai.embeddings" if send_default_pii and include_prompts: - assert "hello" in span["data"][SPANDATA.AI_INPUT_MESSAGES] + assert "hello" in span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] else: - assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] assert span["data"]["gen_ai.usage.input_tokens"] == 20 assert span["data"]["gen_ai.usage.total_tokens"] == 30 @@ -451,11 +517,11 @@ async def test_embeddings_create_async( tx = events[0] assert tx["type"] == "transaction" span = tx["spans"][0] - assert span["op"] == "ai.embeddings.create.openai" + assert span["op"] == "gen_ai.embeddings" if send_default_pii and include_prompts: - assert "hello" in span["data"][SPANDATA.AI_INPUT_MESSAGES] + assert "hello" in span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] else: - assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] assert span["data"]["gen_ai.usage.input_tokens"] == 20 assert span["data"]["gen_ai.usage.total_tokens"] == 30 @@ -897,3 +963,434 @@ def count_tokens(msg): output_tokens_reasoning=None, total_tokens=None, ) + + +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +def test_ai_client_span_responses_api_no_pii(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + client = OpenAI(api_key="z") + client.responses._post = mock.Mock(return_value=EXAMPLE_RESPONSE) + + with start_transaction(name="openai tx"): + client.responses.create( + model="gpt-4o", + instructions="You are a coding assistant that talks like a pirate.", + input="How do I check if a Python object is an instance of a class?", + ) + + (transaction,) = events + spans = transaction["spans"] + + assert len(spans) == 1 + assert spans[0]["op"] == "gen_ai.responses" + assert spans[0]["origin"] == "auto.ai.openai" + assert spans[0]["data"] == { + "gen_ai.operation.name": "responses", + "gen_ai.request.model": "gpt-4o", + "gen_ai.response.model": "response-model-id", + "gen_ai.system": "openai", + "gen_ai.usage.input_tokens": 20, + "gen_ai.usage.input_tokens.cached": 5, + "gen_ai.usage.output_tokens": 10, + "gen_ai.usage.output_tokens.reasoning": 8, + "gen_ai.usage.total_tokens": 30, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + + assert "gen_ai.request.messages" not in spans[0]["data"] + assert "gen_ai.response.text" not in spans[0]["data"] + + +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +def test_ai_client_span_responses_api(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + client = OpenAI(api_key="z") + client.responses._post = mock.Mock(return_value=EXAMPLE_RESPONSE) + + with start_transaction(name="openai tx"): + client.responses.create( + model="gpt-4o", + instructions="You are a coding assistant that talks like a pirate.", + input="How do I check if a Python object is an instance of a class?", + ) + + (transaction,) = events + spans = transaction["spans"] + + assert len(spans) == 1 + assert spans[0]["op"] == "gen_ai.responses" + assert spans[0]["origin"] == "auto.ai.openai" + assert spans[0]["data"] == { + "gen_ai.operation.name": "responses", + "gen_ai.request.messages": "How do I check if a Python object is an instance of a class?", + "gen_ai.request.model": "gpt-4o", + "gen_ai.system": "openai", + "gen_ai.response.model": "response-model-id", + "gen_ai.usage.input_tokens": 20, + "gen_ai.usage.input_tokens.cached": 5, + "gen_ai.usage.output_tokens": 10, + "gen_ai.usage.output_tokens.reasoning": 8, + "gen_ai.usage.total_tokens": 30, + "gen_ai.response.text": '[{"id": "message-id", "content": [{"annotations": [], "text": "the model response", "type": "output_text"}], "role": "assistant", "status": "completed", "type": "message"}]', + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + + +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +def test_error_in_responses_api(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + client = OpenAI(api_key="z") + client.responses._post = mock.Mock( + side_effect=OpenAIError("API rate limit reached") + ) + + with start_transaction(name="openai tx"): + with pytest.raises(OpenAIError): + client.responses.create( + model="gpt-4o", + instructions="You are a coding assistant that talks like a pirate.", + input="How do I check if a Python object is an instance of a class?", + ) + + (error_event, transaction_event) = events + + assert transaction_event["type"] == "transaction" + # make sure the span where the error occurred is captured + assert transaction_event["spans"][0]["op"] == "gen_ai.responses" + + assert error_event["level"] == "error" + assert error_event["exception"]["values"][0]["type"] == "OpenAIError" + + assert ( + error_event["contexts"]["trace"]["trace_id"] + == transaction_event["contexts"]["trace"]["trace_id"] + ) + + +@pytest.mark.asyncio +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +async def test_ai_client_span_responses_async_api(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + client.responses._post = AsyncMock(return_value=EXAMPLE_RESPONSE) + + with start_transaction(name="openai tx"): + await client.responses.create( + model="gpt-4o", + instructions="You are a coding assistant that talks like a pirate.", + input="How do I check if a Python object is an instance of a class?", + ) + + (transaction,) = events + spans = transaction["spans"] + + assert len(spans) == 1 + assert spans[0]["op"] == "gen_ai.responses" + assert spans[0]["origin"] == "auto.ai.openai" + assert spans[0]["data"] == { + "gen_ai.operation.name": "responses", + "gen_ai.request.messages": "How do I check if a Python object is an instance of a class?", + "gen_ai.request.model": "gpt-4o", + "gen_ai.response.model": "response-model-id", + "gen_ai.system": "openai", + "gen_ai.usage.input_tokens": 20, + "gen_ai.usage.input_tokens.cached": 5, + "gen_ai.usage.output_tokens": 10, + "gen_ai.usage.output_tokens.reasoning": 8, + "gen_ai.usage.total_tokens": 30, + "gen_ai.response.text": '[{"id": "message-id", "content": [{"annotations": [], "text": "the model response", "type": "output_text"}], "role": "assistant", "status": "completed", "type": "message"}]', + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + + +@pytest.mark.asyncio +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +async def test_ai_client_span_streaming_responses_async_api( + sentry_init, capture_events +): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + client.responses._post = AsyncMock(return_value=EXAMPLE_RESPONSE) + + with start_transaction(name="openai tx"): + await client.responses.create( + model="gpt-4o", + instructions="You are a coding assistant that talks like a pirate.", + input="How do I check if a Python object is an instance of a class?", + stream=True, + ) + + (transaction,) = events + spans = transaction["spans"] + + assert len(spans) == 1 + assert spans[0]["op"] == "gen_ai.responses" + assert spans[0]["origin"] == "auto.ai.openai" + assert spans[0]["data"] == { + "gen_ai.operation.name": "responses", + "gen_ai.request.messages": "How do I check if a Python object is an instance of a class?", + "gen_ai.request.model": "gpt-4o", + "gen_ai.response.model": "response-model-id", + "gen_ai.response.streaming": True, + "gen_ai.system": "openai", + "gen_ai.usage.input_tokens": 20, + "gen_ai.usage.input_tokens.cached": 5, + "gen_ai.usage.output_tokens": 10, + "gen_ai.usage.output_tokens.reasoning": 8, + "gen_ai.usage.total_tokens": 30, + "gen_ai.response.text": '[{"id": "message-id", "content": [{"annotations": [], "text": "the model response", "type": "output_text"}], "role": "assistant", "status": "completed", "type": "message"}]', + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + + +@pytest.mark.asyncio +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +async def test_error_in_responses_async_api(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + client.responses._post = AsyncMock( + side_effect=OpenAIError("API rate limit reached") + ) + + with start_transaction(name="openai tx"): + with pytest.raises(OpenAIError): + await client.responses.create( + model="gpt-4o", + instructions="You are a coding assistant that talks like a pirate.", + input="How do I check if a Python object is an instance of a class?", + ) + + (error_event, transaction_event) = events + + assert transaction_event["type"] == "transaction" + # make sure the span where the error occurred is captured + assert transaction_event["spans"][0]["op"] == "gen_ai.responses" + + assert error_event["level"] == "error" + assert error_event["exception"]["values"][0]["type"] == "OpenAIError" + + assert ( + error_event["contexts"]["trace"]["trace_id"] + == transaction_event["contexts"]["trace"]["trace_id"] + ) + + +if SKIP_RESPONSES_TESTS: + EXAMPLE_RESPONSES_STREAM = [] +else: + EXAMPLE_RESPONSES_STREAM = [ + ResponseCreatedEvent( + sequence_number=1, + type="response.created", + response=Response( + id="chat-id", + created_at=10000000, + model="response-model-id", + object="response", + output=[], + parallel_tool_calls=False, + tool_choice="none", + tools=[], + ), + ), + ResponseTextDeltaEvent( + item_id="msg_1", + sequence_number=2, + type="response.output_text.delta", + logprobs=[], + content_index=0, + output_index=0, + delta="hel", + ), + ResponseTextDeltaEvent( + item_id="msg_1", + sequence_number=3, + type="response.output_text.delta", + logprobs=[], + content_index=0, + output_index=0, + delta="lo ", + ), + ResponseTextDeltaEvent( + item_id="msg_1", + sequence_number=4, + type="response.output_text.delta", + logprobs=[], + content_index=0, + output_index=0, + delta="world", + ), + ResponseCompletedEvent( + sequence_number=5, + type="response.completed", + response=Response( + id="chat-id", + created_at=10000000, + model="response-model-id", + object="response", + output=[], + parallel_tool_calls=False, + tool_choice="none", + tools=[], + usage=ResponseUsage( + input_tokens=20, + input_tokens_details=InputTokensDetails( + cached_tokens=5, + ), + output_tokens=10, + output_tokens_details=OutputTokensDetails( + reasoning_tokens=8, + ), + total_tokens=30, + ), + ), + ), + ] + + +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +def test_streaming_responses_api( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[ + OpenAIIntegration( + include_prompts=include_prompts, + ) + ], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = OpenAI(api_key="z") + returned_stream = Stream(cast_to=None, response=None, client=client) + returned_stream._iterator = EXAMPLE_RESPONSES_STREAM + client.responses._post = mock.Mock(return_value=returned_stream) + + with start_transaction(name="openai tx"): + response_stream = client.responses.create( + model="some-model", + input="hello", + stream=True, + ) + + response_string = "" + for item in response_stream: + if hasattr(item, "delta"): + response_string += item.delta + + assert response_string == "hello world" + + (transaction,) = events + (span,) = transaction["spans"] + assert span["op"] == "gen_ai.responses" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] == "hello" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "hello world" + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"]["gen_ai.usage.input_tokens"] == 20 + assert span["data"]["gen_ai.usage.output_tokens"] == 10 + assert span["data"]["gen_ai.usage.total_tokens"] == 30 + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +@pytest.mark.skipif(SKIP_RESPONSES_TESTS, reason="Responses API not available") +async def test_streaming_responses_api_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[ + OpenAIIntegration( + include_prompts=include_prompts, + ) + ], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + returned_stream = AsyncStream(cast_to=None, response=None, client=client) + returned_stream._iterator = async_iterator(EXAMPLE_RESPONSES_STREAM) + client.responses._post = AsyncMock(return_value=returned_stream) + + with start_transaction(name="openai tx"): + response_stream = await client.responses.create( + model="some-model", + input="hello", + stream=True, + ) + + response_string = "" + async for item in response_stream: + if hasattr(item, "delta"): + response_string += item.delta + + assert response_string == "hello world" + + (transaction,) = events + (span,) = transaction["spans"] + assert span["op"] == "gen_ai.responses" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] == "hello" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "hello world" + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"]["gen_ai.usage.input_tokens"] == 20 + assert span["data"]["gen_ai.usage.output_tokens"] == 10 + assert span["data"]["gen_ai.usage.total_tokens"] == 30