diff --git a/haystack/components/generators/chat/__init__.py b/haystack/components/generators/chat/__init__.py index c98bc263ca..4679d10337 100644 --- a/haystack/components/generators/chat/__init__.py +++ b/haystack/components/generators/chat/__init__.py @@ -9,6 +9,7 @@ _import_structure = { "openai": ["OpenAIChatGenerator"], + "openai_responses": ["OpenAIResponsesChatGenerator"], "azure": ["AzureOpenAIChatGenerator"], "hugging_face_local": ["HuggingFaceLocalChatGenerator"], "hugging_face_api": ["HuggingFaceAPIChatGenerator"], @@ -21,6 +22,7 @@ from .hugging_face_api import HuggingFaceAPIChatGenerator as HuggingFaceAPIChatGenerator from .hugging_face_local import HuggingFaceLocalChatGenerator as HuggingFaceLocalChatGenerator from .openai import OpenAIChatGenerator as OpenAIChatGenerator + from .openai_responses import OpenAIResponsesChatGenerator as OpenAIResponsesChatGenerator else: sys.modules[__name__] = LazyImporter(name=__name__, module_file=__file__, import_structure=_import_structure) diff --git a/haystack/components/generators/chat/openai_responses.py b/haystack/components/generators/chat/openai_responses.py new file mode 100644 index 0000000000..3b81d0fcab --- /dev/null +++ b/haystack/components/generators/chat/openai_responses.py @@ -0,0 +1,793 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +from datetime import datetime +from typing import Any, Optional, Union + +from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream +from openai.lib._pydantic import to_strict_json_schema +from openai.types.responses import ParsedResponse, Response, ResponseOutputRefusal, ResponseStreamEvent +from pydantic import BaseModel + +from haystack import component, default_from_dict, default_to_dict, logging +from haystack.dataclasses import ( + AsyncStreamingCallbackT, + ChatMessage, + ComponentInfo, + ImageContent, + ReasoningContent, + StreamingCallbackT, + StreamingChunk, + SyncStreamingCallbackT, + TextContent, + ToolCall, + ToolCallDelta, + select_streaming_callback, +) +from haystack.tools import ( + Toolset, + ToolsType, + _check_duplicate_tool_names, + deserialize_tools_or_toolset_inplace, + serialize_tools_or_toolset, +) +from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable +from haystack.utils.http_client import init_http_client + +logger = logging.getLogger(__name__) + + +@component +class OpenAIResponsesChatGenerator: + """ + Completes chats using OpenAI's Responses API. + + It works with the gpt-4 and o-series models and supports streaming responses + from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage) + format in input and output. + + You can customize how the text is generated by passing parameters to the + OpenAI API. Use the `**generation_kwargs` argument when you initialize + the component or when you run it. Any parameter that works with + `openai.Responses.create` will work here too. + + For details on OpenAI API parameters, see + [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses). + + ### Usage example + + ```python + from haystack.components.generators.chat import OpenAIResponsesChatGenerator + from haystack.dataclasses import ChatMessage + + messages = [ChatMessage.from_user("What's Natural Language Processing?")] + + client = OpenAIResponsesChatGenerator(generation_kwargs={"reasoning": {"effort": "low", "summary": "auto"}}) + response = client.run(messages) + print(response) + ``` + """ + + def __init__( + self, + *, + api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"), + model: str = "gpt-5-mini", + streaming_callback: Optional[StreamingCallbackT] = None, + api_base_url: Optional[str] = None, + organization: Optional[str] = None, + generation_kwargs: Optional[dict[str, Any]] = None, + timeout: Optional[float] = None, + max_retries: Optional[int] = None, + tools: Optional[ToolsType] = None, + tools_strict: bool = False, + http_client_kwargs: Optional[dict[str, Any]] = None, + ): + """ + Creates an instance of OpenAIResponsesChatGenerator. Uses OpenAI's gpt-5-mini by default. + + Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' + environment variables to override the `timeout` and `max_retries` parameters respectively + in the OpenAI client. + + :param api_key: The OpenAI API key. + You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter + during initialization. + :param model: The name of the model to use. + :param streaming_callback: A callback function that is called when a new token is received from the stream. + The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk) + as an argument. + :param api_base_url: An optional base URL. + :param organization: Your organization ID, defaults to `None`. See + [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization). + :param generation_kwargs: Other parameters to use for the model. These parameters are sent + directly to the OpenAI endpoint. + See OpenAI [documentation](https://platform.openai.com/docs/api-reference/responses) for + more details. + Some of the supported parameters: + - `temperature`: What sampling temperature to use. Higher values like 0.8 will make the output more random, + while lower values like 0.2 will make it more focused and deterministic. + - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model + considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens + comprising the top 10% probability mass are considered. + - `previous_response_id`: The ID of the previous response. + Use this to create multi-turn conversations. + - `text_format`: A JSON schema or a Pydantic model that enforces the structure of the model's response. + If provided, the output will always be validated against this + format (unless the model returns a tool call). + For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs). + Notes: + - This parameter accepts Pydantic models and JSON schemas for latest models starting from GPT-4o. + Older models only support basic version of structured outputs through `{"type": "json_object"}`. + For detailed information on JSON mode, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs#json-mode). + - For structured outputs with streaming, + the `text_format` must be a JSON schema and not a Pydantic model. + - `reasoning`: A dictionary of parameters for reasoning. For example: + - `summary`: The summary of the reasoning. + - `effort`: The level of effort to put into the reasoning. Can be `low`, `medium` or `high`. + - `generate_summary`: Whether to generate a summary of the reasoning. + Note: OpenAI does not return the reasoning tokens, but we can view summary if its enabled. + For details, see the [OpenAI Reasoning documentation](https://platform.openai.com/docs/guides/reasoning). + :param timeout: + Timeout for OpenAI client calls. If not set, it defaults to either the + `OPENAI_TIMEOUT` environment variable, or 30 seconds. + :param max_retries: + Maximum number of retries to contact OpenAI after an internal error. + If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5. + :param tools: + The tools that the model can use to prepare calls. This parameter can accept either a + mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of + OpenAI/MCP tool definitions. + Note: You cannot pass OpenAI/MCP tools and Haystack tools together. + For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools). + :param tools_strict: + Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly + follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls + are strict by default. + :param http_client_kwargs: + A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`. + For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client). + + """ + self.api_key = api_key + self.model = model + self.generation_kwargs = generation_kwargs or {} + self.streaming_callback = streaming_callback + self.api_base_url = api_base_url + self.organization = organization + self.timeout = timeout + self.max_retries = max_retries + self.tools = tools # Store tools as-is, whether it's a list or a Toolset + self.tools_strict = tools_strict + self.http_client_kwargs = http_client_kwargs + + if timeout is None: + timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0")) + if max_retries is None: + max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5")) + + client_kwargs: dict[str, Any] = { + "api_key": api_key.resolve_value(), + "organization": organization, + "base_url": api_base_url, + "timeout": timeout, + "max_retries": max_retries, + } + + self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs) + self.async_client = AsyncOpenAI( + http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs + ) + + def _get_telemetry_data(self) -> dict[str, Any]: + """ + Data that is sent to Posthog for usage analytics. + """ + return {"model": self.model} + + def to_dict(self) -> dict[str, Any]: + """ + Serialize this component to a dictionary. + + :returns: + The serialized component as a dictionary. + """ + callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None + generation_kwargs = self.generation_kwargs.copy() + response_format = generation_kwargs.get("text_format") + + # If the response format is a Pydantic model, it's converted to openai's json schema format + # If it's already a json schema, it's left as is + if response_format and issubclass(response_format, BaseModel): + json_schema = { + "type": "json_schema", + "json_schema": { + "name": response_format.__name__, + "strict": True, + "schema": to_strict_json_schema(response_format), + }, + } + generation_kwargs["text_format"] = json_schema + + # OpenAI/MCP tools are passed as list of dictionaries + if self.tools and isinstance(self.tools, list) and isinstance(self.tools[0], dict): + serialized_tools = self.tools + else: + # function returns correct type but mypy doesn't know it + serialized_tools = serialize_tools_or_toolset(self.tools) # type: ignore[assignment] + + return default_to_dict( + self, + model=self.model, + streaming_callback=callback_name, + api_base_url=self.api_base_url, + organization=self.organization, + generation_kwargs=generation_kwargs, + api_key=self.api_key.to_dict(), + timeout=self.timeout, + max_retries=self.max_retries, + tools=serialized_tools, + tools_strict=self.tools_strict, + http_client_kwargs=self.http_client_kwargs, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "OpenAIResponsesChatGenerator": + """ + Deserialize this component from a dictionary. + + :param data: The dictionary representation of this component. + :returns: + The deserialized component instance. + """ + deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"]) + + # we only deserialize the tools if they are haystack tools + # because openai tools are not serialized in the same way + + tools = data["init_parameters"].get("tools") + if tools and ( + isinstance(tools, dict) + and tools.get("type") == "haystack.tools.toolset.Toolset" + or isinstance(tools, list) + and tools[0].get("type") == "haystack.tools.tool.Tool" + ): + deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools") + + init_params = data.get("init_parameters", {}) + serialized_callback_handler = init_params.get("streaming_callback") + + if serialized_callback_handler: + data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler) + return default_from_dict(cls, data) + + @component.output_types(replies=list[ChatMessage]) + def run( + self, + messages: list[ChatMessage], + *, + streaming_callback: Optional[StreamingCallbackT] = None, + generation_kwargs: Optional[dict[str, Any]] = None, + tools: Optional[ToolsType] = None, + tools_strict: Optional[bool] = None, + ): + """ + Invokes response generation based on the provided messages and generation parameters. + + :param messages: + A list of ChatMessage instances representing the input messages. + :param streaming_callback: + A callback function that is called when a new token is received from the stream. + :param generation_kwargs: + Additional keyword arguments for text generation. These parameters will + override the parameters passed during component initialization. + For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create). + :param tools: + The tools that the model can use to prepare calls. If set, it will override the + `tools` parameter set during component initialization. This parameter can accept either a + mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of + OpenAI/MCP tool definitions. + Note: You cannot pass OpenAI/MCP tools and Haystack tools together. + For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools). + :param tools_strict: + Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly + follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls + are strict by default. + If set, it will override the `tools_strict` parameter set during component initialization. + + :returns: + A dictionary with the following key: + - `replies`: A list containing the generated responses as ChatMessage instances. + """ + if len(messages) == 0: + return {"replies": []} + + streaming_callback = select_streaming_callback( + init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False + ) + responses: Union[Stream[ResponseStreamEvent], Response] + + api_args = self._prepare_api_call( + messages=messages, + streaming_callback=streaming_callback, + generation_kwargs=generation_kwargs, + tools=tools, + tools_strict=tools_strict, + ) + openai_endpoint = api_args.pop("openai_endpoint") + openai_endpoint_method = getattr(self.client.responses, openai_endpoint) + responses = openai_endpoint_method(**api_args) + + if streaming_callback is not None: + response_output = self._handle_stream_response( + responses, # type: ignore + streaming_callback, + ) + else: + assert isinstance(responses, Response), "Unexpected response type for non-streaming request." + response_output = [_convert_response_to_chat_message(responses)] + return {"replies": response_output} + + @component.output_types(replies=list[ChatMessage]) + async def run_async( + self, + messages: list[ChatMessage], + *, + streaming_callback: Optional[StreamingCallbackT] = None, + generation_kwargs: Optional[dict[str, Any]] = None, + tools: Optional[ToolsType] = None, + tools_strict: Optional[bool] = None, + ): + """ + Asynchronously invokes response generation based on the provided messages and generation parameters. + + This is the asynchronous version of the `run` method. It has the same parameters and return values + but can be used with `await` in async code. + + :param messages: + A list of ChatMessage instances representing the input messages. + :param streaming_callback: + A callback function that is called when a new token is received from the stream. + Must be a coroutine. + :param generation_kwargs: + Additional keyword arguments for text generation. These parameters will + override the parameters passed during component initialization. + For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create). + :param tools: + A list of tools or a Toolset for which the model can prepare calls. If set, it will override the + `tools` parameter set during component initialization. This parameter can accept either a list of + mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of + OpenAI/MCP tool definitions. + Note: You cannot pass OpenAI/MCP tools and Haystack tools together. + :param tools_strict: + Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly + the schema provided in the `parameters` field of the tool definition, but this may increase latency. + If set, it will override the `tools_strict` parameter set during component initialization. + + :returns: + A dictionary with the following key: + - `replies`: A list containing the generated responses as ChatMessage instances. + """ + # validate and select the streaming callback + streaming_callback = select_streaming_callback( + init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True + ) + responses: Union[AsyncStream[ResponseStreamEvent], Response] + + if len(messages) == 0: + return {"replies": []} + + api_args = self._prepare_api_call( + messages=messages, + streaming_callback=streaming_callback, + generation_kwargs=generation_kwargs, + tools=tools, + tools_strict=tools_strict, + ) + + openai_endpoint = api_args.pop("openai_endpoint") + openai_endpoint_method = getattr(self.async_client.responses, openai_endpoint) + responses = await openai_endpoint_method(**api_args) + + if streaming_callback is not None: + response_output = await self._handle_async_stream_response( + responses, # type: ignore + streaming_callback, + ) + + else: + assert isinstance(responses, Response), "Unexpected response type for non-streaming request." + response_output = [_convert_response_to_chat_message(responses)] + return {"replies": response_output} + + def _prepare_api_call( # noqa: PLR0913 + self, + *, + messages: list[ChatMessage], + streaming_callback: Optional[StreamingCallbackT] = None, + generation_kwargs: Optional[dict[str, Any]] = None, + tools: Optional[ToolsType] = None, + tools_strict: Optional[bool] = None, + ) -> dict[str, Any]: + # update generation kwargs by merging with the generation kwargs passed to the run method + generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})} + + text_format = generation_kwargs.pop("text_format", None) + + # adapt ChatMessage(s) to the format expected by the OpenAI API + openai_formatted_messages = [convert_message_to_responses_api_format(message) for message in messages] + + tools = tools or self.tools + tools_strict = tools_strict if tools_strict is not None else self.tools_strict + + openai_tools = {} + # Build tool definitions + if tools: + tool_definitions = [] + if isinstance(tools, list) and isinstance(tools[0], dict): + # Predefined OpenAI/MCP-style tools + tool_definitions = tools + + # Convert all tool objects to the correct OpenAI-compatible structure + else: + if isinstance(tools, Toolset): + tools = list(tools) + _check_duplicate_tool_names(tools) # type: ignore[arg-type] + for t in tools: + function_spec = {**t.tool_spec} # type: ignore[union-attr] + if not tools_strict: + function_spec["strict"] = False + function_spec["parameters"]["additionalProperties"] = False + tool_definitions.append({"type": "function", **function_spec}) # type: ignore[arg-type] + + openai_tools = {"tools": tool_definitions} + + base_args = {"model": self.model, "input": openai_formatted_messages, **openai_tools, **generation_kwargs} + + if text_format and issubclass(text_format, BaseModel): + return { + **base_args, + "stream": streaming_callback is not None, + "text_format": text_format, + "openai_endpoint": "parse", + } + # we pass a key `openai_endpoint` as a hint to the run method to use the create or parse endpoint + # this key will be removed before the API call is made + + return {**base_args, "stream": streaming_callback is not None, "openai_endpoint": "create"} + + def _handle_stream_response(self, responses: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]: + component_info = ComponentInfo.from_component(self) + chunks: list[StreamingChunk] = [] + + for chunk in responses: # pylint: disable=not-an-iterable + chunk_delta = _convert_streaming_response_chunk_to_streaming_chunk( + chunk=chunk, previous_chunks=chunks, component_info=component_info + ) + if chunk_delta: + chunks.append(chunk_delta) + callback(chunk_delta) + chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks) + return [chat_message] + + async def _handle_async_stream_response( + self, responses: AsyncStream, callback: AsyncStreamingCallbackT + ) -> list[ChatMessage]: + component_info = ComponentInfo.from_component(self) + chunks: list[StreamingChunk] = [] + async for chunk in responses: # pylint: disable=not-an-iterable + chunk_delta = _convert_streaming_response_chunk_to_streaming_chunk( + chunk=chunk, previous_chunks=chunks, component_info=component_info + ) + if chunk_delta: + chunks.append(chunk_delta) + await callback(chunk_delta) + chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks) + return [chat_message] + + +def _convert_response_to_chat_message(responses: Union[Response, ParsedResponse]) -> ChatMessage: + """ + Converts the non-streaming response from the OpenAI API to a ChatMessage. + + :param responses: The responses returned by the OpenAI API. + :returns: The ChatMessage. + """ + + tool_calls = [] + reasoning = None + tool_call_details = {} + for output in responses.output: + if isinstance(output, ResponseOutputRefusal): + logger.warning(f"OpenAI returned a refusal output: {output}") + continue + if output.type == "reasoning": + # openai doesn't return the reasoning tokens, but we can view summary if its enabled + # https://platform.openai.com/docs/guides/reasoning#reasoning-summaries + summaries = output.summary + extra = output.to_dict() + # we dont need the summary in the extra + extra.pop("summary") + reasoning_text = "\n".join([summary.text for summary in summaries if summaries]) + if reasoning_text: + reasoning = ReasoningContent(reasoning_text=reasoning_text, extra=extra) + + elif output.type == "function_call": + try: + arguments = json.loads(output.arguments) + except json.JSONDecodeError: + logger.warning( + "OpenAI returned a malformed JSON string for tool call arguments. This tool call " + "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. " + "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}", + _id=output.id, + _name=output.name, + _arguments=output.arguments, + ) + # We need to store both id and call_id for tool calls + tool_call_details[output.id] = {"call_id": output.call_id, "status": output.status} + + tool_calls.append(ToolCall(id=output.id, tool_name=output.name, arguments=arguments)) + + # we save the response as dict because it contains resp_id etc. + meta = responses.to_dict() + # remove output from meta because it contains toolcalls, reasoning, text etc. + meta.pop("output") + meta["tool_call_details"] = tool_call_details + chat_message = ChatMessage.from_assistant( + text=responses.output_text if responses.output_text else None, + reasoning=reasoning, + tool_calls=tool_calls, + meta=meta, + ) + + return chat_message + + +def _convert_streaming_response_chunk_to_streaming_chunk( + chunk: ResponseStreamEvent, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None +) -> StreamingChunk: + """ + Converts the streaming response chunk from the OpenAI Responses API to a StreamingChunk. + + :param chunk: The chunk returned by the OpenAI Responses API. + :param previous_chunks: A list of previously received StreamingChunks. + :param component_info: An optional `ComponentInfo` object containing information about the component that + generated the chunk, such as the component name and type. + :returns: + A StreamingChunk object representing the content of the chunk from the OpenAI Responses API. + """ + + if chunk.type == "response.output_text.delta": + # if item is a ResponseTextDeltaEvent + meta = chunk.to_dict() + meta["received_at"] = datetime.now().isoformat() + return StreamingChunk(content=chunk.delta, component_info=component_info, index=chunk.content_index, meta=meta) + + # Responses API always returns reasoning chunks even if there is no summary + elif chunk.type == "response.reasoning_summary_text.delta": + # we remove the delta from the extra because it is already in the reasoning_text + # rest of the information needs to be saved for chat message + extra = chunk.to_dict() + extra.pop("delta") + reasoning = ReasoningContent(reasoning_text=chunk.delta, extra=extra) + return StreamingChunk(content="", component_info=component_info, index=chunk.output_index, reasoning=reasoning) + + # the function name is only streamed at the start and end of the function call + elif chunk.type == "response.output_item.added" and chunk.item.type == "function_call": + function = chunk.item.name + meta = chunk.item.to_dict() + tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item.id, tool_name=function) + return StreamingChunk( + content="", + component_info=component_info, + index=chunk.output_index, + tool_calls=[tool_call], + start=True, + meta=meta, + ) + + # the function arguments are streamed in parts + # function name is not passed in these chunks + elif chunk.type == "response.function_call_arguments.delta": + arguments = chunk.delta + meta = chunk.to_dict() + meta.pop("delta") + tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item_id, arguments=arguments) + return StreamingChunk( + content="", + component_info=component_info, + index=chunk.output_index, + tool_calls=[tool_call], + start=True, + meta=meta, + ) + + # we return rest of the chunk as is + chunk_message = StreamingChunk( + content="", component_info=component_info, index=getattr(chunk, "output_index", None), meta=chunk.to_dict() + ) + return chunk_message + + +def convert_message_to_responses_api_format(message: ChatMessage, require_tool_call_ids: bool = True) -> dict[str, Any]: + """ + Convert a ChatMessage to the dictionary format expected by OpenAI's Responses API. + + :param message: The ChatMessage to convert to OpenAI's Responses API format. + :param require_tool_call_ids: + If True (default), enforces that each Tool Call includes a non-null `id` attribute. + Set to False to allow Tool Calls without `id`, which may be suitable for shallow OpenAI-compatible APIs. + :returns: + The ChatMessage in the format expected by OpenAI's Responses API. + + :raises ValueError: + If the message format is invalid, or if `require_tool_call_ids` is True and any Tool Call is missing an + `id` attribute. + """ + text_contents = message.texts + tool_calls = message.tool_calls + tool_call_results = message.tool_call_results + images = message.images + reasonings = message.reasonings + + if not text_contents and not tool_calls and not tool_call_results and not images and not reasonings: + raise ValueError( + """A `ChatMessage` must contain at least one `TextContent`, `ToolCall`, `ToolCallResult`, + `ImageContent`, or `ReasoningContent`.""" + ) + if len(tool_call_results) > 0 and len(message._content) > 1: + raise ValueError( + "For OpenAI compatibility, a `ChatMessage` with a `ToolCallResult` cannot contain any other content." + ) + + openai_msg: dict[str, Any] = {"role": message._role.value} + + # Add name field if present + if message._name is not None: + openai_msg["name"] = message._name + + # user message + if openai_msg["role"] == "user": + if len(message._content) == 1 and isinstance(message._content[0], TextContent): + openai_msg["content"] = message.text + return openai_msg + + # if the user message contains a list of text and images, OpenAI expects a list of dictionaries + content = [] + for part in message._content: + if isinstance(part, TextContent): + text_type = "input_text" + content.append({"type": text_type, "text": part.text}) + elif isinstance(part, ImageContent): + image_item: dict[str, Any] + image_item = { + "type": "input_image", + # If no MIME type is provided, default to JPEG. + # OpenAI API appears to tolerate MIME type mismatches. + "image_url": f"data:{part.mime_type or 'image/jpeg'};base64,{part.base64_image}", + } + + content.append(image_item) + + openai_msg["content"] = content + return openai_msg + + # tool message + if tool_call_results: + result = tool_call_results[0] + openai_msg["content"] = result.result + if result.origin.id is not None: + openai_msg["tool_call_id"] = result.origin.id + elif require_tool_call_ids: + raise ValueError("`ToolCall` must have a non-null `id` attribute to be used with OpenAI.") + # OpenAI does not provide a way to communicate errors in tool invocations, so we ignore the error field + return openai_msg + + # system and assistant messages + openai_msg["content"] = [] + + if text_contents: + openai_msg["content"] = " ".join(text_contents) + + if reasonings: + for reasoning in reasonings: + reasoning_item = { + **(reasoning.extra), + "summary": [{"text": reasoning.reasoning_text, "type": "summary_text"}], + } + openai_msg["content"].append(reasoning_item) + + if tool_calls: + tool_call_ids = message._meta.get("tool_call_ids", {}) + + for tc in tool_calls: + openai_tool_call = { + "type": "function_call", + # We disable ensure_ascii so special chars like emojis are not converted + "name": tc.tool_name, + "arguments": json.dumps(tc.arguments, ensure_ascii=False), + } + if tc.id is not None: + openai_tool_call["id"] = tc.id + openai_tool_call.update(tool_call_ids[tc.id]) + elif require_tool_call_ids: + raise ValueError("`ToolCall` must have a non-null `id` attribute to be used with OpenAI.") + openai_msg["content"].append(openai_tool_call) + + return openai_msg + + +def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> ChatMessage: + """ + Connects the streaming chunks into a single ChatMessage. + + :param chunks: The list of all `StreamingChunk` objects. + + :returns: The ChatMessage. + """ + text = "".join([chunk.content for chunk in chunks]) + reasoning = None + tool_calls = [] + tool_call_details = {} + + # Process tool calls if present in any chunk + tool_call_data: dict[str, dict[str, str]] = {} # Track tool calls by id + for chunk in chunks: + if chunk.tool_calls: + for tool_call in chunk.tool_calls: + assert tool_call.id is not None + # We use the tool call id to track the tool call across chunks + if tool_call.id not in tool_call_data: + tool_call_data[tool_call.id] = {"name": "", "arguments": "", "call_id": ""} + + if tool_call.tool_name is not None: + # we dont need to append the tool name as it is passed once in the start of the function call + tool_call_data[tool_call.id]["name"] = tool_call.tool_name + if tool_call.arguments is not None: + tool_call_data[tool_call.id]["arguments"] += tool_call.arguments + + # this is the information we need to save to send back to API + if chunk.meta.get("type") == "function_call": + call_id = chunk.meta.get("call_id") + fc_id = chunk.meta.get("id") + if fc_id is not None and isinstance(fc_id, str): + tool_call_data[fc_id]["call_id"] = str(call_id) if call_id is not None else "" + + if chunk.reasoning: + reasoning = chunk.reasoning + + # Convert accumulated tool call data into ToolCall objects + sorted_keys = sorted(tool_call_data.keys()) + for key in sorted_keys: + tool_call_dict = tool_call_data[key] + try: + arguments = json.loads(tool_call_dict.get("arguments", "{}")) if tool_call_dict.get("arguments") else {} + tool_calls.append(ToolCall(id=key, tool_name=tool_call_dict["name"], arguments=arguments)) + if tool_call_dict["call_id"]: + tool_call_details[key] = {"call_id": tool_call_dict["call_id"]} + except json.JSONDecodeError: + logger.warning( + "The LLM provider returned a malformed JSON string for tool call arguments. This tool call " + "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. " + "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}", + _id=key, + _name=tool_call_dict["name"], + _arguments=tool_call_dict["arguments"], + ) + + # the final response is the last chunk with the response metadata + final_response = chunks[-1].meta.get("response") + meta: dict[str, Any] = { + "model": final_response.get("model") if final_response else None, + "index": 0, + "response_start_time": final_response.get("created_at") if final_response else None, + "usage": final_response.get("usage") if final_response else None, + } + if tool_call_details: + meta["tool_call_details"] = tool_call_details + + return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta, reasoning=reasoning) diff --git a/releasenotes/notes/add-openai-responses-chatgenerator-52ca7457a4e61db1.yaml b/releasenotes/notes/add-openai-responses-chatgenerator-52ca7457a4e61db1.yaml new file mode 100644 index 0000000000..9ac8bacc29 --- /dev/null +++ b/releasenotes/notes/add-openai-responses-chatgenerator-52ca7457a4e61db1.yaml @@ -0,0 +1,49 @@ +--- +features: + - | + Added the OpenAIResponsesChatGenerator, a new component that integrates OpenAI's Responses API into Haystack. + This unlocks several advanced capabilities from the Responses API: + - Allowing retrieval of concise summaries of the model's reasoning process. + - Allowing the use of native OpenAI or MCP tool formats, along with Haystack Tool objects and Toolset instances. + + Example with reasoning and web search tool: + ```python + from haystack.components.generators.chat import OpenAIResponsesChatGenerator + from haystack.dataclasses import ChatMessage + + chat_generator = OpenAIResponsesChatGenerator( + model="o3-mini", + generation_kwargs={ + {"summary": "auto", "effort": "low"} + }, + tools=[{"type": "web_search"}] + ) + + response = chat_generator.run( + messages=[ + ChatMessage.from_user("What's a positive news story from today?") + ] + ) + print(response["replies"][0].text) + ``` + + Example with structured output: + ```python + from pydantic import BaseModel + from haystack.components.generators.chat import OpenAIResponsesChatGenerator + from haystack.dataclasses import ChatMessage + + class WeatherInfo(BaseModel): + location: str + temperature: float + conditions: str + + chat_generator = OpenAIResponsesChatGenerator( + model="gpt-5-mini", + generation_kwargs={"text_format": WeatherInfo} + ) + + response = chat_generator.run( + messages=[ChatMessage.from_user("What's the weather in Paris?")] + ) + ``` diff --git a/test/components/generators/chat/test_openai_responses.py b/test/components/generators/chat/test_openai_responses.py new file mode 100644 index 0000000000..807894cf8b --- /dev/null +++ b/test/components/generators/chat/test_openai_responses.py @@ -0,0 +1,660 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging +import os +from typing import Any, Optional +from unittest.mock import MagicMock + +import pytest +from openai import OpenAIError +from pydantic import BaseModel + +from haystack import component +from haystack.components.generators.chat.openai_responses import ( + OpenAIResponsesChatGenerator, + convert_message_to_responses_api_format, +) +from haystack.dataclasses import ChatMessage, ChatRole, ImageContent, ReasoningContent, StreamingChunk, ToolCall +from haystack.tools import ComponentTool, Tool, Toolset +from haystack.utils import Secret + +logger = logging.getLogger(__name__) + + +class CalendarEvent(BaseModel): + event_name: str + event_date: str + event_location: str + + +@pytest.fixture +def calendar_event_model(): + return CalendarEvent + + +def callback(chunk: StreamingChunk) -> None: ... + + +@component +class MessageExtractor: + @component.output_types(messages=list[str], meta=dict[str, Any]) + def run(self, messages: list[ChatMessage], meta: Optional[dict[str, Any]] = None) -> dict[str, Any]: + """ + Extracts the text content of ChatMessage objects + + :param messages: List of Haystack ChatMessage objects + :param meta: Optional metadata to include in the response. + :returns: + A dictionary with keys "messages" and "meta". + """ + if meta is None: + meta = {} + return {"messages": [m.text for m in messages], "meta": meta} + + +def weather_function(city: str) -> dict[str, Any]: + weather_info = { + "Berlin": {"weather": "mostly sunny", "temperature": 7, "unit": "celsius"}, + "Paris": {"weather": "mostly cloudy", "temperature": 8, "unit": "celsius"}, + "Rome": {"weather": "sunny", "temperature": 14, "unit": "celsius"}, + } + return weather_info.get(city, {"weather": "unknown", "temperature": 0, "unit": "celsius"}) + + +@pytest.fixture +def tools(): + weather_tool = Tool( + name="weather", + description="useful to determine the weather in a given location", + parameters={"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}, + function=weather_function, + ) + # We add a tool that has a more complex parameter signature + message_extractor_tool = ComponentTool( + component=MessageExtractor(), + name="message_extractor", + description="Useful for returning the text content of ChatMessage objects", + ) + return [weather_tool, message_extractor_tool] + + +class TestOpenAIResponsesChatGenerator: + def test_init_default(self, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + component = OpenAIResponsesChatGenerator() + assert component.client.api_key == "test-api-key" + assert component.model == "gpt-5-mini" + assert component.streaming_callback is None + assert not component.generation_kwargs + assert component.client.timeout == 30 + assert component.client.max_retries == 5 + assert component.tools is None + assert not component.tools_strict + assert component.http_client_kwargs is None + + def test_init_fail_wo_api_key(self, monkeypatch): + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + with pytest.raises(ValueError): + OpenAIResponsesChatGenerator() + + def test_run_fail_with_duplicate_tool_names(self, monkeypatch, tools): + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + + duplicate_tools = [tools[0], tools[0]] + with pytest.raises(ValueError): + chat_messages = [ChatMessage.from_user("What's the weather like in Paris and Berlin?")] + component = OpenAIResponsesChatGenerator(tools=duplicate_tools) + component.run(chat_messages) + + def test_init_with_parameters(self, monkeypatch): + tool = Tool(name="name", description="description", parameters={"x": {"type": "string"}}, function=lambda x: x) + + monkeypatch.setenv("OPENAI_TIMEOUT", "100") + monkeypatch.setenv("OPENAI_MAX_RETRIES", "10") + component = OpenAIResponsesChatGenerator( + api_key=Secret.from_token("test-api-key"), + model="gpt-4o-mini", + streaming_callback=callback, + api_base_url="test-base-url", + generation_kwargs={"max_tokens": 10, "some_test_param": "test-params"}, + timeout=40.0, + max_retries=1, + tools=[tool], + tools_strict=True, + http_client_kwargs={"proxy": "http://example.com:8080", "verify": False}, + ) + assert component.client.api_key == "test-api-key" + assert component.model == "gpt-4o-mini" + assert component.streaming_callback is callback + assert component.generation_kwargs == {"max_tokens": 10, "some_test_param": "test-params"} + assert component.client.timeout == 40.0 + assert component.client.max_retries == 1 + assert component.tools == [tool] + assert component.tools_strict + assert component.http_client_kwargs == {"proxy": "http://example.com:8080", "verify": False} + + def test_init_with_parameters_and_env_vars(self, monkeypatch): + monkeypatch.setenv("OPENAI_TIMEOUT", "100") + monkeypatch.setenv("OPENAI_MAX_RETRIES", "10") + component = OpenAIResponsesChatGenerator( + api_key=Secret.from_token("test-api-key"), + model="gpt-4o-mini", + streaming_callback=callback, + api_base_url="test-base-url", + generation_kwargs={"max_tokens": 10, "some_test_param": "test-params"}, + ) + assert component.client.api_key == "test-api-key" + assert component.model == "gpt-4o-mini" + assert component.streaming_callback is callback + assert component.generation_kwargs == {"max_tokens": 10, "some_test_param": "test-params"} + assert component.client.timeout == 100.0 + assert component.client.max_retries == 10 + + def test_to_dict_default(self, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + component = OpenAIResponsesChatGenerator() + data = component.to_dict() + assert data == { + "type": "haystack.components.generators.chat.openai_responses.OpenAIResponsesChatGenerator", + "init_parameters": { + "api_key": {"env_vars": ["OPENAI_API_KEY"], "strict": True, "type": "env_var"}, + "model": "gpt-5-mini", + "organization": None, + "streaming_callback": None, + "api_base_url": None, + "generation_kwargs": {}, + "tools": None, + "tools_strict": False, + "max_retries": None, + "timeout": None, + "http_client_kwargs": None, + }, + } + + def test_to_dict_with_parameters(self, monkeypatch, calendar_event_model): + tool = Tool(name="name", description="description", parameters={"x": {"type": "string"}}, function=print) + + monkeypatch.setenv("ENV_VAR", "test-api-key") + component = OpenAIResponsesChatGenerator( + api_key=Secret.from_env_var("ENV_VAR"), + model="gpt-5-mini", + streaming_callback=callback, + api_base_url="test-base-url", + generation_kwargs={"max_tokens": 10, "some_test_param": "test-params", "text_format": calendar_event_model}, + tools=[tool], + tools_strict=True, + max_retries=10, + timeout=100.0, + http_client_kwargs={"proxy": "http://example.com:8080", "verify": False}, + ) + data = component.to_dict() + + assert data == { + "type": "haystack.components.generators.chat.openai_responses.OpenAIResponsesChatGenerator", + "init_parameters": { + "api_key": {"env_vars": ["ENV_VAR"], "strict": True, "type": "env_var"}, + "model": "gpt-5-mini", + "organization": None, + "api_base_url": "test-base-url", + "max_retries": 10, + "timeout": 100.0, + "streaming_callback": "generators.chat.test_openai_responses.callback", + "generation_kwargs": { + "max_tokens": 10, + "some_test_param": "test-params", + "text_format": { + "type": "json_schema", + "json_schema": { + "name": "CalendarEvent", + "strict": True, + "schema": { + "properties": { + "event_name": {"title": "Event Name", "type": "string"}, + "event_date": {"title": "Event Date", "type": "string"}, + "event_location": {"title": "Event Location", "type": "string"}, + }, + "required": ["event_name", "event_date", "event_location"], + "title": "CalendarEvent", + "type": "object", + "additionalProperties": False, + }, + }, + }, + }, + "tools": [ + { + "type": "haystack.tools.tool.Tool", + "data": { + "description": "description", + "function": "builtins.print", + "inputs_from_state": None, + "name": "name", + "outputs_to_state": None, + "outputs_to_string": None, + "parameters": {"x": {"type": "string"}}, + }, + } + ], + "tools_strict": True, + "http_client_kwargs": {"proxy": "http://example.com:8080", "verify": False}, + }, + } + + def test_from_dict(self, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "fake-api-key") + data = { + "type": "haystack.components.generators.chat.openai_responses.OpenAIResponsesChatGenerator", + "init_parameters": { + "api_key": {"env_vars": ["OPENAI_API_KEY"], "strict": True, "type": "env_var"}, + "model": "gpt-5-mini", + "api_base_url": "test-base-url", + "streaming_callback": "generators.chat.test_openai_responses.callback", + "max_retries": 10, + "timeout": 100.0, + "generation_kwargs": {"max_tokens": 10, "some_test_param": "test-params"}, + "tools": [ + { + "type": "haystack.tools.tool.Tool", + "data": { + "description": "description", + "function": "builtins.print", + "name": "name", + "parameters": {"x": {"type": "string"}}, + }, + } + ], + "tools_strict": True, + "http_client_kwargs": {"proxy": "http://example.com:8080", "verify": False}, + }, + } + component = OpenAIResponsesChatGenerator.from_dict(data) + + assert isinstance(component, OpenAIResponsesChatGenerator) + assert component.model == "gpt-5-mini" + assert component.streaming_callback is callback + assert component.api_base_url == "test-base-url" + assert component.generation_kwargs == {"max_tokens": 10, "some_test_param": "test-params"} + assert component.api_key == Secret.from_env_var("OPENAI_API_KEY") + assert component.tools == [ + Tool(name="name", description="description", parameters={"x": {"type": "string"}}, function=print) + ] + assert component.tools_strict + assert component.client.timeout == 100.0 + assert component.client.max_retries == 10 + assert component.http_client_kwargs == {"proxy": "http://example.com:8080", "verify": False} + + def test_from_dict_fail_wo_env_var(self, monkeypatch): + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + data = { + "type": "haystack.components.generators.chat.openai_responses.OpenAIResponsesChatGenerator", + "init_parameters": { + "api_key": {"env_vars": ["OPENAI_API_KEY"], "strict": True, "type": "env_var"}, + "model": "gpt-5-mini", + "organization": None, + "api_base_url": "test-base-url", + "streaming_callback": "test.components.generators.chat.test_openai_responses.callback", + "generation_kwargs": {"max_tokens": 10, "some_test_param": "test-params"}, + "tools": None, + }, + } + with pytest.raises(ValueError): + OpenAIResponsesChatGenerator.from_dict(data) + + def test_convert_chat_message_to_responses_api_format(self): + chat_message = ChatMessage( + _role=ChatRole.ASSISTANT, + _content=[ + ReasoningContent( + reasoning_text="I need to use the functions.weather tool.", + extra={"id": "rs_0d13efdd", "type": "reasoning"}, + ), + ToolCall(tool_name="weather", arguments={"location": "Berlin"}, id="fc_0d13efdd"), + ], + _name=None, + # some keys are removed to keep the test concise + _meta={ + "id": "resp_0d13efdd97aa4", + "created_at": 1761148307.0, + "model": "gpt-5-mini-2025-08-07", + "object": "response", + "parallel_tool_calls": True, + "temperature": 1.0, + "tool_choice": "auto", + "tools": [ + { + "name": "weather", + "parameters": { + "type": "object", + "properties": {"location": {"type": "string"}}, + "required": ["location"], + "additionalProperties": False, + }, + "strict": False, + "type": "function", + "description": "A tool to get the weather", + } + ], + "top_p": 1.0, + "reasoning": {"effort": "low", "summary": "detailed"}, + "usage": {"input_tokens": 59, "output_tokens": 19, "total_tokens": 78}, + "store": True, + "tool_call_ids": {"fc_0d13efdd": {"call_id": "call_a82vwFAIzku9SmBuQuecQSRq", "status": "completed"}}, + }, + ) + responses_api_format = convert_message_to_responses_api_format(chat_message) + assert responses_api_format == { + "role": "assistant", + "content": [ + { + "id": "rs_0d13efdd", + "type": "reasoning", + "summary": [{"text": "I need to use the functions.weather tool.", "type": "summary_text"}], + }, + { + "type": "function_call", + "name": "weather", + "arguments": '{"location": "Berlin"}', + "id": "fc_0d13efdd", + "call_id": "call_a82vwFAIzku9SmBuQuecQSRq", + "status": "completed", + }, + ], + } + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run(self): + chat_messages = [ChatMessage.from_user("What's the capital of France")] + component = OpenAIResponsesChatGenerator() + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message: ChatMessage = results["replies"][0] + assert "Paris" in message.text + assert "gpt-5-mini" in message.meta["model"] + assert message.meta["status"] == "completed" + assert message.meta["usage"]["total_tokens"] > 0 + assert message.meta["id"] is not None + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_with_reasoning(self): + chat_messages = [ChatMessage.from_user("Explain in 2 lines why is there a Moon?")] + component = OpenAIResponsesChatGenerator(generation_kwargs={"reasoning": {"summary": "auto", "effort": "low"}}) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message: ChatMessage = results["replies"][0] + assert "Moon" in message.text + assert "gpt-5-mini" in message.meta["model"] + assert message.reasonings is not None + assert message.meta["status"] == "completed" + assert message.meta["usage"]["output_tokens"] > 0 + assert "reasoning_tokens" in message.meta["usage"]["output_tokens_details"] + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_with_text_format(self, calendar_event_model): + chat_messages = [ + ChatMessage.from_user("The marketing summit takes place on October12th at the Hilton Hotel downtown.") + ] + component = OpenAIResponsesChatGenerator(generation_kwargs={"text_format": calendar_event_model}) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message: ChatMessage = results["replies"][0] + msg = json.loads(message.text) + assert "Marketing Summit" in msg["event_name"] + assert isinstance(msg["event_date"], str) + assert isinstance(msg["event_location"], str) + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + # So far from documentation, responses.parse only supports BaseModel + def test_live_run_with_text_format_json_schema(self): + pass + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_with_response_format_and_streaming(self, calendar_event_model): + chat_messages = [ + ChatMessage.from_user("The marketing summit takes place on October12th at the Hilton Hotel downtown.") + ] + component = OpenAIResponsesChatGenerator(generation_kwargs={"text_format": calendar_event_model}) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message: ChatMessage = results["replies"][0] + msg = json.loads(message.text) + assert "Marketing Summit" in msg["event_name"] + assert isinstance(msg["event_date"], str) + assert isinstance(msg["event_location"], str) + + def test_run_with_wrong_model(self): + mock_client = MagicMock() + mock_client.responses.create.side_effect = OpenAIError("Invalid model name") + + generator = OpenAIResponsesChatGenerator( + api_key=Secret.from_token("test-api-key"), model="something-obviously-wrong" + ) + + generator.client = mock_client + + with pytest.raises(OpenAIError): + generator.run([ChatMessage.from_user("irrelevant")]) + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_streaming(self): + class Callback: + def __init__(self): + self.responses = "" + self.counter = 0 + + def __call__(self, chunk: StreamingChunk) -> None: + self.counter += 1 + self.responses += chunk.content if chunk.content else "" + + callback = Callback() + component = OpenAIResponsesChatGenerator(streaming_callback=callback) + results = component.run([ChatMessage.from_user("What's the capital of France?")]) + + # Basic response checks + assert "replies" in results + assert len(results["replies"]) == 1 + message: ChatMessage = results["replies"][0] + assert "Paris" in message.text + assert isinstance(message.meta, dict) + + # Metadata checks + metadata = message.meta + assert "gpt-5-mini" in metadata["model"] + + # Usage information checks + assert isinstance(metadata.get("usage"), dict), "meta.usage not a dict" + usage = metadata["usage"] + assert "output_tokens" in usage and usage["output_tokens"] > 0 + + # Detailed token information checks + assert isinstance(usage.get("output_tokens_details"), dict), "usage.output_tokens_details not a dict" + + # Streaming callback verification + assert callback.counter > 1 + assert "Paris" in callback.responses + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_with_tools_streaming(self, tools): + chat_messages = [ChatMessage.from_user("What's the weather like in Paris and Berlin?")] + + def callback(chunk: StreamingChunk) -> None: ... + + component = OpenAIResponsesChatGenerator(tools=tools, streaming_callback=callback) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message = results["replies"][0] + + assert not message.texts + assert not message.text + assert message.tool_calls + tool_calls = message.tool_calls + assert len(tool_calls) == 2 + + for tool_call in tool_calls: + assert isinstance(tool_call, ToolCall) + assert tool_call.tool_name == "weather" + + arguments = [tool_call.arguments for tool_call in tool_calls] + assert sorted(arguments, key=lambda x: x["city"]) == [{"city": "Berlin"}, {"city": "Paris"}] + + def test_chat_generator_with_toolset_initialization(self, tools, monkeypatch): + """Test that the OpenAIChatGenerator can be initialized with a Toolset.""" + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + toolset = Toolset(tools) + generator = OpenAIResponsesChatGenerator(tools=toolset) + assert generator.tools == toolset + + def test_from_dict_with_toolset(self, tools, monkeypatch): + """Test that the OpenAIChatGenerator can be deserialized from a dictionary with a Toolset.""" + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + toolset = Toolset(tools) + component = OpenAIResponsesChatGenerator(tools=toolset) + data = component.to_dict() + + deserialized_component = OpenAIResponsesChatGenerator.from_dict(data) + + assert isinstance(deserialized_component.tools, Toolset) + assert len(deserialized_component.tools) == len(tools) + assert all(isinstance(tool, Tool) for tool in deserialized_component.tools) + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_with_toolset(self, tools): + chat_messages = [ChatMessage.from_user("What's the weather like in Paris?")] + toolset = Toolset(tools) + component = OpenAIResponsesChatGenerator(tools=toolset) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message = results["replies"][0] + + assert not message.texts + assert not message.text + assert message.tool_calls + tool_call = message.tool_call + assert isinstance(tool_call, ToolCall) + assert tool_call.tool_name == "weather" + assert tool_call.arguments == {"city": "Paris"} + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_multimodal(self, test_files_path): + image_path = test_files_path / "images" / "apple.jpg" + + # we resize the image to keep this test fast (around 1s) - increase the size in case of errors + image_content = ImageContent.from_file_path(file_path=image_path, size=(100, 100), detail="low") + + chat_messages = [ChatMessage.from_user(content_parts=["What does this image show? Max 5 words", image_content])] + + generator = OpenAIResponsesChatGenerator(model="gpt-5-nano") + results = generator.run(chat_messages) + + assert len(results["replies"]) == 1 + message: ChatMessage = results["replies"][0] + + assert message.text + assert "apple" in message.text.lower() + + assert message.is_from(ChatRole.ASSISTANT) + assert not message.tool_calls + assert not message.tool_call_results + + @pytest.mark.skip(reason="The tool calls time out resulting in failing") + def test_live_run_with_openai_tools(self): + """ + Test the use of generator with a list of OpenAI tools and MCP tools. + """ + chat_messages = [ChatMessage.from_user("What was a positive news story from today?")] + component = OpenAIResponsesChatGenerator( + model="gpt-5", + tools=[ + {"type": "web_search_preview"}, + { + "type": "mcp", + "server_label": "dmcp", + "server_description": "A Dungeons and Dragons MCP server to assist with dice rolling.", + "server_url": "https://dmcp-server.deno.dev/sse", + "require_approval": "never", + }, + ], + ) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message = results["replies"][0] + assert message.meta["status"] == "completed" + + chat_messages = [ChatMessage.from_user("Roll 2d4+1")] + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message = results["replies"][0] + assert message.meta["status"] == "completed" + + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + @pytest.mark.integration + def test_live_run_with_tools_streaming_and_reasoning(self, tools): + chat_messages = [ChatMessage.from_user("What's the weather like in Paris and Berlin?")] + + def callback(chunk: StreamingChunk) -> None: ... + + component = OpenAIResponsesChatGenerator( + tools=tools, + streaming_callback=callback, + generation_kwargs={"reasoning": {"summary": "auto", "effort": "low"}}, + ) + results = component.run(chat_messages) + assert len(results["replies"]) == 1 + message = results["replies"][0] + + assert message.reasonings is not None + # model sometimes skips reasoning + # needs to be cross checked + assert message.reasonings[0].extra is not None + assert not message.text + assert message.tool_calls + tool_calls = message.tool_calls + assert len(tool_calls) == 2 + + for tool_call in tool_calls: + assert isinstance(tool_call, ToolCall) + assert tool_call.tool_name == "weather" + + arguments = [tool_call.arguments for tool_call in tool_calls] + assert sorted(arguments, key=lambda x: x["city"]) == [{"city": "Berlin"}, {"city": "Paris"}]