diff --git a/pyproject.toml b/pyproject.toml index efa4f5ccc..5a16b3737 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,6 @@ requires-python = ">=3.10" dependencies = [ "aiohttp>=3.11.13", "fastapi>=0.115.6", - "instructor>=1.7.9", "jsonref>=1.1.0", "mcp>=1.10.1", "numpy>=2.1.3", diff --git a/src/mcp_agent/config.py b/src/mcp_agent/config.py index cae1a5533..d9ff57e36 100644 --- a/src/mcp_agent/config.py +++ b/src/mcp_agent/config.py @@ -10,6 +10,7 @@ import threading import warnings +from httpx import URL from pydantic import AliasChoices, BaseModel, ConfigDict, Field, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -205,6 +206,7 @@ class AnthropicSettings(BaseSettings, VertexAIMixin, BedrockMixin): "provider", "ANTHROPIC_PROVIDER", "anthropic__provider" ), ) + base_url: str | URL | None = Field(default=None) model_config = SettingsConfigDict( env_prefix="ANTHROPIC_", diff --git a/src/mcp_agent/workflows/llm/augmented_llm.py b/src/mcp_agent/workflows/llm/augmented_llm.py index 75a2f0274..cbb179929 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm.py +++ b/src/mcp_agent/workflows/llm/augmented_llm.py @@ -168,6 +168,11 @@ class RequestParams(CreateMessageRequestParams): This is used to stably identify the user in the LLM provider's logs. """ + strict: bool = False + """ + Whether models that support strict mode should strictly enforce the response schema. + """ + class AugmentedLLMProtocol(Protocol, Generic[MessageParamT, MessageT]): """Protocol defining the interface for augmented LLMs""" diff --git a/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py b/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py index 828f3f2bc..e9a9f9a7d 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py @@ -48,7 +48,7 @@ from mcp_agent.tracing.telemetry import get_tracer, is_otel_serializable, telemetry from mcp_agent.tracing.token_tracking_decorator import track_tokens from mcp_agent.utils.common import ensure_serializable, typed_dict_extras, to_string -from mcp_agent.utils.pydantic_type_serializer import serialize_model, deserialize_model + from mcp_agent.workflows.llm.augmented_llm import ( AugmentedLLM, ModelT, @@ -83,15 +83,6 @@ class RequestCompletionRequest(BaseModel): payload: dict -class RequestStructuredCompletionRequest(BaseModel): - config: AnthropicSettings - params: RequestParams - response_model: Type[ModelT] | None = None - serialized_response_model: str | None = None - response_str: str - model: str - - def create_anthropic_instance(settings: AnthropicSettings): """Select and initialise the appropriate anthropic client instance based on settings""" if settings.provider == "bedrock": @@ -419,10 +410,9 @@ async def generate_structured( response_model: Type[ModelT], request_params: RequestParams | None = None, ) -> ModelT: - # First we invoke the LLM to generate a string response - # We need to do this in a two-step process because Instructor doesn't - # know how to invoke MCP tools via call_tool, so we'll handle all the - # processing first and then pass the final response through Instructor + # Use Anthropic's native structured output via a forced tool call carrying JSON input + import json + tracer = get_tracer(self.context) with tracer.start_as_current_span( f"{self.__class__.__name__}.{self.name}.generate_structured" @@ -430,57 +420,76 @@ async def generate_structured( span.set_attribute(GEN_AI_AGENT_NAME, self.agent.name) self._annotate_span_for_generation_message(span, message) - response = await self.generate_str( - message=message, - request_params=request_params, - ) - params = self.get_request_params(request_params) - if self.context.tracing_enabled: AugmentedLLM.annotate_span_with_request_params(span, params) - model = await self.select_model(params) - span.set_attribute(GEN_AI_REQUEST_MODEL, model) - - span.set_attribute("response_model", response_model.__name__) - - serialized_response_model: str | None = None - - if self.executor and self.executor.execution_engine == "temporal": - # Serialize the response model to a string - serialized_response_model = serialize_model(response_model) - - structured_response = await self.executor.execute( - AnthropicCompletionTasks.request_structured_completion_task, - RequestStructuredCompletionRequest( - config=self.context.config.anthropic, - params=params, - response_model=response_model - if not serialized_response_model - else None, - serialized_response_model=serialized_response_model, - response_str=response, - model=model, - ), + model_name = ( + await self.select_model(params) or self.default_request_params.model ) + span.set_attribute(GEN_AI_REQUEST_MODEL, model_name) - # TODO: saqadri (MAC) - fix request_structured_completion_task to return ensure_serializable - # Convert dict back to the proper model instance if needed - if isinstance(structured_response, dict): - structured_response = response_model.model_validate(structured_response) + # Convert message(s) to Anthropic format + messages: List[MessageParam] = [] + if params.use_history: + messages.extend(self.history.get()) + messages.extend( + AnthropicConverter.convert_mixed_messages_to_anthropic(message) + ) - if self.context.tracing_enabled: - try: - span.set_attribute( - "structured_response_json", - structured_response.model_dump_json(), - ) - # pylint: disable=broad-exception-caught - except Exception: - span.set_attribute("unstructured_response", response) + # Define a single tool that matches the Pydantic schema + schema = response_model.model_json_schema() + tools: List[ToolParam] = [ + { + "name": "return_structured_output", + "description": "Return the response in the required JSON format", + "input_schema": schema, + } + ] + + args = { + "model": model_name, + "messages": messages, + "system": self.instruction or params.systemPrompt, + "tools": tools, + "tool_choice": {"type": "tool", "name": "return_structured_output"}, + } + if params.maxTokens is not None: + args["max_tokens"] = params.maxTokens + if params.stopSequences: + args["stop_sequences"] = params.stopSequences + + # Call Anthropic directly (one-turn streaming for consistency) + base_url = None + if self.context and self.context.config and self.context.config.anthropic: + base_url = self.context.config.anthropic.base_url + api_key = self.context.config.anthropic.api_key + client = AsyncAnthropic(api_key=api_key, base_url=base_url) + else: + client = AsyncAnthropic() + + async with client: + async with client.messages.stream(**args) as stream: + final = await stream.get_final_message() + + # Extract tool_use input and validate + for block in final.content: + if ( + getattr(block, "type", None) == "tool_use" + and getattr(block, "name", "") == "return_structured_output" + ): + data = getattr(block, "input", None) + try: + if isinstance(data, str): + return response_model.model_validate(json.loads(data)) + return response_model.model_validate(data) + except Exception: + # Fallthrough to error + break - return structured_response + raise ValueError( + "Failed to obtain structured output from Anthropic response" + ) @classmethod def convert_message_to_message_param( @@ -770,44 +779,6 @@ async def request_completion_task( response = ensure_serializable(response) return response - @staticmethod - @workflow_task - @telemetry.traced() - async def request_structured_completion_task( - request: RequestStructuredCompletionRequest, - ): - """ - Request a structured completion using Instructor's Anthropic API. - """ - import instructor - - if request.response_model: - response_model = request.response_model - elif request.serialized_response_model: - response_model = deserialize_model(request.serialized_response_model) - else: - raise ValueError( - "Either response_model or serialized_response_model must be provided for structured completion." - ) - - # We pass the text through instructor to extract structured data - client = instructor.from_anthropic(create_anthropic_instance(request.config)) - - # Extract structured data from natural language without blocking the loop - loop = asyncio.get_running_loop() - structured_response = await loop.run_in_executor( - None, - functools.partial( - client.chat.completions.create, - model=request.model, - response_model=response_model, - messages=[{"role": "user", "content": request.response_str}], - max_tokens=request.params.maxTokens, - ), - ) - - return structured_response - class AnthropicMCPTypeConverter(ProviderToMCPConverter[MessageParam, Message]): """ diff --git a/src/mcp_agent/workflows/llm/augmented_llm_azure.py b/src/mcp_agent/workflows/llm/augmented_llm_azure.py index 726fdcf20..98b6496d9 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_azure.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_azure.py @@ -2,6 +2,7 @@ import functools import json from typing import Any, Iterable, Optional, Type, Union +from azure.core.exceptions import HttpResponseError from azure.ai.inference import ChatCompletionsClient from azure.ai.inference.models import ( ChatCompletions, @@ -351,6 +352,7 @@ async def generate_structured( name=response_model.__name__, description=response_model.__doc__, schema=json_schema, + strict=request_params.strict, ) request_params.metadata = metadata @@ -362,7 +364,7 @@ async def generate_structured( @classmethod def convert_message_to_message_param( - cls, message: ResponseMessage, **kwargs + cls, message: ResponseMessage ) -> AssistantMessage: """Convert a response object to an input parameter object to allow LLM calls to be chained.""" assistant_message = AssistantMessage( @@ -539,12 +541,37 @@ async def request_completion_task( ), ) - payload = request.payload - # Offload sync SDK call to a thread to avoid blocking the event loop + payload = request.payload.copy() loop = asyncio.get_running_loop() - response = await loop.run_in_executor( - None, functools.partial(azure_client.complete, **payload) - ) + + try: + response = await loop.run_in_executor( + None, functools.partial(azure_client.complete, **payload) + ) + except HttpResponseError as e: + logger = get_logger(__name__) + + if e.status_code != 400: + logger.error(f"Azure API call failed: {e}") + raise + + logger.warning( + f"Initial Azure API call failed: {e}. Retrying with fallback parameters." + ) + + # Create a new payload with fallback values for commonly problematic parameters + fallback_payload = {**payload, "max_tokens": None, "temperature": 1} + + try: + response = await loop.run_in_executor( + None, functools.partial(azure_client.complete, **fallback_payload) + ) + except Exception as retry_error: + # If retry also fails, raise a more informative error + raise RuntimeError( + f"Azure API call failed even with fallback parameters. " + f"Original error: {e}. Retry error: {retry_error}" + ) from retry_error return response diff --git a/src/mcp_agent/workflows/llm/augmented_llm_google.py b/src/mcp_agent/workflows/llm/augmented_llm_google.py index f2dae45e8..1964dc9cd 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_google.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_google.py @@ -19,7 +19,7 @@ from mcp_agent.config import GoogleSettings from mcp_agent.executor.workflow_task import workflow_task from mcp_agent.logging.logger import get_logger -from mcp_agent.utils.pydantic_type_serializer import serialize_model, deserialize_model + from mcp_agent.workflows.llm.augmented_llm import ( AugmentedLLM, MCPMessageParam, @@ -244,40 +244,68 @@ async def generate_structured( response_model: Type[ModelT], request_params: RequestParams | None = None, ) -> ModelT: - response = await self.generate_str( - message=message, - request_params=request_params, - ) + """ + Use Gemini native structured outputs via response_schema and response_mime_type. + """ + import json params = self.get_request_params(request_params) - model = await self.select_model(params) or "gemini-2.0-flash" - - serialized_response_model: str | None = None + model = await self.select_model(params) or (params.model or "gemini-2.0-flash") + + # Convert input messages and build config + messages = GoogleConverter.convert_mixed_messages_to_google(message) + + # Schema can be dict or the Pydantic class; Gemini supports both. + try: + schema = response_model.model_json_schema() + except Exception: + schema = None + + config = types.GenerateContentConfig( + max_output_tokens=params.maxTokens, + temperature=params.temperature, + stop_sequences=params.stopSequences or [], + system_instruction=self.instruction or params.systemPrompt, + ) + config.response_mime_type = "application/json" + config.response_schema = schema if schema is not None else response_model - if self.executor and self.executor.execution_engine == "temporal": - # Serialize the response model to a string - serialized_response_model = serialize_model(response_model) + # Build conversation: include history if enabled + conversation: list[types.Content] = [] + if params.use_history: + conversation.extend(self.history.get()) + if isinstance(messages, list): + conversation.extend(messages) + else: + conversation.append(messages) - structured_response = await self.executor.execute( - GoogleCompletionTasks.request_structured_completion_task, - RequestStructuredCompletionRequest( + api_response: types.GenerateContentResponse = await self.executor.execute( + GoogleCompletionTasks.request_completion_task, + RequestCompletionRequest( config=self.context.config.google, - params=params, - response_model=response_model - if not serialized_response_model - else None, - serialized_response_model=serialized_response_model, - response_str=response, - model=model, + payload={ + "model": model, + "contents": conversation, + "config": config, + }, ), ) - # TODO: saqadri (MAC) - fix request_structured_completion_task to return ensure_serializable - # Convert dict back to the proper model instance if needed - if isinstance(structured_response, dict): - structured_response = response_model.model_validate(structured_response) + # Extract JSON text from response + text = None + if api_response and api_response.candidates: + cand = api_response.candidates[0] + if cand.content and cand.content.parts: + for part in cand.content.parts: + if part.text: + text = part.text + break + + if not text: + raise ValueError("No structured response returned by Gemini") - return structured_response + data = json.loads(text) + return response_model.model_validate(data) @classmethod def convert_message_to_message_param(cls, message, **kwargs): @@ -365,43 +393,12 @@ async def request_structured_completion_task( request: RequestStructuredCompletionRequest, ): """ - Request a structured completion using Instructor's Google API. + Deprecated: structured output is handled directly in generate_structured. """ - import instructor - - if request.response_model: - response_model = request.response_model - elif request.serialized_response_model: - response_model = deserialize_model(request.serialized_response_model) - else: - raise ValueError( - "Either response_model or serialized_response_model must be provided for structured completion." - ) - - if request.config and request.config.vertexai: - google_client = Client( - vertexai=request.config.vertexai, - project=request.config.project, - location=request.config.location, - ) - else: - google_client = Client(api_key=request.config.api_key) - - client = instructor.from_genai( - google_client, mode=instructor.Mode.GENAI_STRUCTURED_OUTPUTS + raise NotImplementedError( + "request_structured_completion_task is no longer used; use generate_structured instead." ) - structured_response = client.chat.completions.create( - model=request.model, - response_model=response_model, - system="Convert the provided text into the required response model. Do not change the text or add any additional text. Just convert it into the required response model.", - messages=[ - {"role": "user", "content": request.response_str}, - ], - ) - - return structured_response - class GoogleMCPTypeConverter(ProviderToMCPConverter[types.Content, types.Content]): """ diff --git a/src/mcp_agent/workflows/llm/augmented_llm_openai.py b/src/mcp_agent/workflows/llm/augmented_llm_openai.py index 9bbc2a215..86811c6b8 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_openai.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_openai.py @@ -51,7 +51,7 @@ from mcp_agent.tracing.telemetry import is_otel_serializable from mcp_agent.utils.common import ensure_serializable, typed_dict_extras from mcp_agent.utils.mime_utils import image_url_to_mime_and_base64 -from mcp_agent.utils.pydantic_type_serializer import serialize_model, deserialize_model +from mcp_agent.utils.pydantic_type_serializer import deserialize_model from mcp_agent.workflows.llm.augmented_llm import ( AugmentedLLM, MessageTypes, @@ -435,69 +435,84 @@ async def generate_structured( response_model: Type[ModelT], request_params: RequestParams | None = None, ) -> ModelT: - # First we invoke the LLM to generate a string response - # We need to do this in a two-step process because Instructor doesn't - # know how to invoke MCP tools via call_tool, so we'll handle all the - # processing first and then pass the final response through Instructor + """ + Use OpenAI native structured outputs via response_format (JSON schema). + """ + import json + tracer = get_tracer(self.context) with tracer.start_as_current_span( f"{self.__class__.__name__}.{self.name}.generate_structured" ) as span: - span.set_attribute(GEN_AI_AGENT_NAME, self.agent.name) - self._annotate_span_for_generation_message(span, message) + if self.context.tracing_enabled: + span.set_attribute(GEN_AI_AGENT_NAME, self.agent.name) + self._annotate_span_for_generation_message(span, message) params = self.get_request_params(request_params) - + model = await self.select_model(params) or ( + self.default_request_params.model or "gpt-4o" + ) if self.context.tracing_enabled: AugmentedLLM.annotate_span_with_request_params(span, params) + span.set_attribute(GEN_AI_REQUEST_MODEL, model) + span.set_attribute("response_model", response_model.__name__) - response = await self.generate_str( - message=message, - request_params=params, - ) - - model = await self.select_model(params) or "gpt-4o" - span.set_attribute(GEN_AI_REQUEST_MODEL, model) - - span.set_attribute("response_model", response_model.__name__) - - serialized_response_model: str | None = None - - if self.executor and self.executor.execution_engine == "temporal": - # Serialize the response model to a string - serialized_response_model = serialize_model(response_model) - - structured_response = await self.executor.execute( - OpenAICompletionTasks.request_structured_completion_task, - RequestStructuredCompletionRequest( - config=self.context.config.openai, - response_model=response_model - if not serialized_response_model - else None, - serialized_response_model=serialized_response_model, - response_str=response, - model=model, - user=params.user - or getattr(self.context.config.openai, "user", None), - strict=getattr(params, "strict", False), + # Prepare messages + messages: List[ChatCompletionMessageParam] = [] + system_prompt = self.instruction or params.systemPrompt + if system_prompt: + messages.append( + ChatCompletionSystemMessageParam( + role="system", content=system_prompt + ) + ) + if params.use_history: + messages.extend(self.history.get()) + messages.extend(OpenAIConverter.convert_mixed_messages_to_openai(message)) + + # Build response_format + schema = response_model.model_json_schema() + response_format = { + "type": "json_schema", + "json_schema": { + "name": getattr(response_model, "__name__", "StructuredOutput"), + "schema": schema, + "strict": params.strict, + }, + } + + # Build payload + payload = { + "model": model, + "messages": messages, + "response_format": response_format, + "max_tokens": params.maxTokens, + } + user = params.user or getattr(self.context.config.openai, "user", None) + if user: + payload["user"] = user + if params.stopSequences is not None: + payload["stop"] = params.stopSequences + if params.metadata: + payload.update(params.metadata) + + completion: ChatCompletion = await self.executor.execute( + OpenAICompletionTasks.request_completion_task, + RequestCompletionRequest( + config=self.context.config.openai, payload=payload ), ) - # TODO: saqadri (MAC) - fix request_structured_completion_task to return ensure_serializable - # Convert dict back to the proper model instance if needed - if isinstance(structured_response, dict): - structured_response = response_model.model_validate(structured_response) - if self.context.tracing_enabled: - try: - span.set_attribute( - "structured_response_json", - structured_response.model_dump_json(), - ) - # pylint: disable=broad-exception-caught - except Exception: - span.set_attribute("unstructured_response", response) + if not completion.choices or completion.choices[0].message.content is None: + raise ValueError("No structured content returned by model") - return structured_response + content = completion.choices[0].message.content + try: + data = json.loads(content) + return response_model.model_validate(data) + except Exception: + # Fallback to pydantic JSON parsing if already a JSON string-like + return response_model.model_validate_json(content) async def pre_tool_call(self, tool_call_id: str | None, request: CallToolRequest): return request @@ -892,21 +907,29 @@ async def request_structured_completion_task( request: RequestStructuredCompletionRequest, ) -> ModelT: """ - Request a structured completion using Instructor's OpenAI API. + Request a structured completion using OpenAI's native structured outputs. """ - import instructor - from instructor.exceptions import InstructorRetryException - - if request.response_model: + # Resolve the response model + if request.response_model is not None: response_model = request.response_model - elif request.serialized_response_model: + elif request.serialized_response_model is not None: response_model = deserialize_model(request.serialized_response_model) else: raise ValueError( "Either response_model or serialized_response_model must be provided for structured completion." ) - # Next we pass the text through instructor to extract structured data + # Build response_format using JSON Schema + schema = response_model.model_json_schema() + response_format = { + "type": "json_schema", + "json_schema": { + "name": getattr(response_model, "__name__", "StructuredOutput"), + "schema": schema, + "strict": request.strict, + }, + } + async with AsyncOpenAI( api_key=request.config.api_key, base_url=request.config.base_url, @@ -916,41 +939,29 @@ async def request_structured_completion_task( default_headers=request.config.default_headers if hasattr(request.config, "default_headers") else None, - ) as async_client: - client = instructor.from_openai( - async_client, - mode=instructor.Mode.TOOLS_STRICT - if request.strict - else instructor.Mode.TOOLS, - ) + ) as async_openai_client: + payload = { + "model": request.model, + "messages": [{"role": "user", "content": request.response_str}], + "response_format": response_format, + } + if request.user: + payload["user"] = request.user - try: - # Extract structured data from natural language - structured_response = await client.chat.completions.create( - model=request.model, - response_model=response_model, - messages=[ - {"role": "user", "content": request.response_str}, - ], - user=request.user, - ) - except InstructorRetryException: - # Retry the request with JSON mode - client = instructor.from_openai( - async_client, - mode=instructor.Mode.JSON, - ) + completion = await async_openai_client.chat.completions.create(**payload) - structured_response = await client.chat.completions.create( - model=request.model, - response_model=response_model, - messages=[ - {"role": "user", "content": request.response_str}, - ], - user=request.user, - ) + if not completion.choices or completion.choices[0].message.content is None: + raise ValueError("No structured content returned by model") + + content = completion.choices[0].message.content + # message.content is expected to be JSON string + try: + data = json.loads(content) + except Exception: + # Some models may already return a dict-like; fall back to string validation + return response_model.model_validate_json(content) - return structured_response + return response_model.model_validate(data) class MCPOpenAITypeConverter( diff --git a/tests/workflows/llm/test_augmented_llm_anthropic.py b/tests/workflows/llm/test_augmented_llm_anthropic.py index 104ac2770..7a04439f9 100644 --- a/tests/workflows/llm/test_augmented_llm_anthropic.py +++ b/tests/workflows/llm/test_augmented_llm_anthropic.py @@ -210,31 +210,55 @@ async def test_generate_str(self, mock_llm, default_usage): @pytest.mark.asyncio async def test_generate_structured(self, mock_llm, default_usage): """ - Tests structured output generation using Instructor. + Tests structured output generation using native Anthropic API. """ + from unittest.mock import patch # Define a simple response model class TestResponseModel(BaseModel): name: str value: int - # Mock the generate_str method to return a string that will be parsed by the instructor mock - mock_llm.generate_str = AsyncMock(return_value="name: Test, value: 42") - - # Patch executor.execute to return the expected TestResponseModel instance - mock_llm.executor.execute = AsyncMock( - return_value=TestResponseModel(name="Test", value=42) + # Create a mock Message with tool_use block containing the structured data + tool_use_block = ToolUseBlock( + type="tool_use", + id="tool_123", + name="return_structured_output", + input={"name": "Test", "value": 42}, ) - # Call the method - result = await AnthropicAugmentedLLM.generate_structured( - mock_llm, "Test query", TestResponseModel + mock_message = Message( + type="message", + id="msg_123", + role="assistant", + content=[tool_use_block], + model="claude-3-7-sonnet-latest", + stop_reason="tool_use", + usage=default_usage, ) - # Assertions - assert isinstance(result, TestResponseModel) - assert result.name == "Test" - assert result.value == 42 + # Mock the AsyncAnthropic client and streaming + with patch( + "mcp_agent.workflows.llm.augmented_llm_anthropic.AsyncAnthropic" + ) as MockAsyncAnthropic: + mock_client = MockAsyncAnthropic.return_value + mock_stream = AsyncMock() + mock_stream.get_final_message = AsyncMock(return_value=mock_message) + mock_stream.__aenter__ = AsyncMock(return_value=mock_stream) + mock_stream.__aexit__ = AsyncMock(return_value=None) + mock_client.messages.stream = MagicMock(return_value=mock_stream) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + # Call the method + result = await AnthropicAugmentedLLM.generate_structured( + mock_llm, "Test query", TestResponseModel + ) + + # Assertions + assert isinstance(result, TestResponseModel) + assert result.name == "Test" + assert result.value == 42 # Test 4: With History @pytest.mark.asyncio @@ -779,6 +803,7 @@ async def test_generate_structured_with_mixed_message_types(self, mock_llm): """ Tests generate_structured() method with mixed message types. """ + from unittest.mock import patch # Define a simple response model class TestResponseModel(BaseModel): @@ -795,19 +820,49 @@ class TestResponseModel(BaseModel): ), ] - mock_llm.generate_str = AsyncMock(return_value="name: MixedTypes, value: 123") - # Patch executor.execute to return the expected TestResponseModel instance - mock_llm.executor.execute = AsyncMock( - return_value=TestResponseModel(name="MixedTypes", value=123) + # Create a mock Message with tool_use block containing the structured data + tool_use_block = ToolUseBlock( + type="tool_use", + id="tool_456", + name="return_structured_output", + input={"name": "MixedTypes", "value": 123}, ) - # Call generate_structured with mixed message types - result = await mock_llm.generate_structured(messages, TestResponseModel) + mock_message = Message( + type="message", + id="msg_456", + role="assistant", + content=[tool_use_block], + model="claude-3-7-sonnet-latest", + stop_reason="tool_use", + usage=Usage( + cache_creation_input_tokens=0, + cache_read_input_tokens=0, + input_tokens=100, + output_tokens=50, + ), + ) - # Assertions - assert isinstance(result, TestResponseModel) - assert result.name == "MixedTypes" - assert result.value == 123 + # Mock the AsyncAnthropic client and streaming + with patch( + "mcp_agent.workflows.llm.augmented_llm_anthropic.AsyncAnthropic" + ) as MockAsyncAnthropic: + mock_client = MockAsyncAnthropic.return_value + mock_stream = AsyncMock() + mock_stream.get_final_message = AsyncMock(return_value=mock_message) + mock_stream.__aenter__ = AsyncMock(return_value=mock_stream) + mock_stream.__aexit__ = AsyncMock(return_value=None) + mock_client.messages.stream = MagicMock(return_value=mock_stream) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + # Call generate_structured with mixed message types + result = await mock_llm.generate_structured(messages, TestResponseModel) + + # Assertions + assert isinstance(result, TestResponseModel) + assert result.name == "MixedTypes" + assert result.value == 123 # Test 25: System Prompt Not None in API Call @pytest.mark.asyncio diff --git a/tests/workflows/llm/test_augmented_llm_google.py b/tests/workflows/llm/test_augmented_llm_google.py index 3e63e01e5..1353de260 100644 --- a/tests/workflows/llm/test_augmented_llm_google.py +++ b/tests/workflows/llm/test_augmented_llm_google.py @@ -1,4 +1,4 @@ -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest from pydantic import BaseModel @@ -183,29 +183,22 @@ class TestResponseModel(BaseModel): name: str value: int - # Mock the generate_str method - mock_llm.generate_str = AsyncMock(return_value="name: Test, value: 42") + # Create a proper GenerateContentResponse with JSON content + import json - # Mock instructor from_genai - with patch("instructor.from_genai") as mock_instructor: - mock_client = MagicMock() - mock_client.chat.completions.create.return_value = TestResponseModel( - name="Test", value=42 - ) - mock_instructor.return_value = mock_client + json_content = json.dumps({"name": "Test", "value": 42}) + response = self.create_text_response(json_content) - # Patch executor.execute to be an async mock returning the expected value - mock_llm.executor.execute = AsyncMock( - return_value=TestResponseModel(name="Test", value=42) - ) + # Patch executor.execute to return the GenerateContentResponse with JSON + mock_llm.executor.execute = AsyncMock(return_value=response) - # Call the method - result = await mock_llm.generate_structured("Test query", TestResponseModel) + # Call the method + result = await mock_llm.generate_structured("Test query", TestResponseModel) - # Assertions - assert isinstance(result, TestResponseModel) - assert result.name == "Test" - assert result.value == 42 + # Assertions + assert isinstance(result, TestResponseModel) + assert result.name == "Test" + assert result.value == 42 # Test 4: With History @pytest.mark.asyncio @@ -773,26 +766,19 @@ class TestResponseModel(BaseModel): ), ] - # Mock the generate_str method - mock_llm.generate_str = AsyncMock(return_value="name: MixedTypes, value: 123") + # Create a proper GenerateContentResponse with JSON content + import json - # Patch instructor.from_genai to return the expected model - with patch("instructor.from_genai") as mock_instructor: - mock_client = MagicMock() - mock_client.chat.completions.create.return_value = TestResponseModel( - name="MixedTypes", value=123 - ) - mock_instructor.return_value = mock_client + json_content = json.dumps({"name": "MixedTypes", "value": 123}) + response = self.create_text_response(json_content) - # Patch executor.execute to be an async mock returning the expected value - mock_llm.executor.execute = AsyncMock( - return_value=TestResponseModel(name="MixedTypes", value=123) - ) + # Patch executor.execute to return the GenerateContentResponse with JSON + mock_llm.executor.execute = AsyncMock(return_value=response) - result = await mock_llm.generate_structured(messages, TestResponseModel) - assert isinstance(result, TestResponseModel) - assert result.name == "MixedTypes" - assert result.value == 123 + result = await mock_llm.generate_structured(messages, TestResponseModel) + assert isinstance(result, TestResponseModel) + assert result.name == "MixedTypes" + assert result.value == 123 @pytest.mark.asyncio async def test_parallel_tool_calls(self, mock_llm: GoogleAugmentedLLM): diff --git a/tests/workflows/llm/test_augmented_llm_openai.py b/tests/workflows/llm/test_augmented_llm_openai.py index d908ceb15..2972fe47c 100644 --- a/tests/workflows/llm/test_augmented_llm_openai.py +++ b/tests/workflows/llm/test_augmented_llm_openai.py @@ -1,5 +1,5 @@ import json -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest from openai.types.chat.chat_completion import Choice @@ -174,38 +174,31 @@ async def test_generate_str(self, mock_llm, default_usage): @pytest.mark.asyncio async def test_generate_structured(self, mock_llm, default_usage): """ - Tests structured output generation using Instructor. + Tests structured output generation using native OpenAI API. """ + import json # Define a simple response model class TestResponseModel(BaseModel): name: str value: int - # Set up mocks for the two-stage process - # First for the text generation - mock_llm.generate_str = AsyncMock(return_value="name: Test, value: 42") + # Create a proper ChatCompletion response with JSON content + json_content = json.dumps({"name": "Test", "value": 42}) + completion_response = self.create_text_response( + json_content, usage=default_usage + ) - # Then for Instructor's structured data extraction - with patch("instructor.from_openai") as mock_instructor: - mock_client = MagicMock() - mock_client.chat.completions.create.return_value = TestResponseModel( - name="Test", value=42 - ) - mock_instructor.return_value = mock_client + # Patch executor.execute to return the ChatCompletion with JSON + mock_llm.executor.execute = AsyncMock(return_value=completion_response) - # Patch executor.execute to be an async mock returning the expected value - mock_llm.executor.execute = AsyncMock( - return_value=TestResponseModel(name="Test", value=42) - ) + # Call the method + result = await mock_llm.generate_structured("Test query", TestResponseModel) - # Call the method - result = await mock_llm.generate_structured("Test query", TestResponseModel) - - # Assertions - assert isinstance(result, TestResponseModel) - assert result.name == "Test" - assert result.value == 42 + # Assertions + assert isinstance(result, TestResponseModel) + assert result.name == "Test" + assert result.value == 42 # Test 4: With History @pytest.mark.asyncio @@ -612,6 +605,7 @@ async def test_generate_structured_with_mixed_message_types(self, mock_llm): """ Tests generate_structured() method with mixed message types. """ + import json # Define a simple response model class TestResponseModel(BaseModel): @@ -628,12 +622,18 @@ class TestResponseModel(BaseModel): ), ] - mock_llm.generate_str = AsyncMock(return_value="name: MixedTypes, value: 123") - # Patch executor.execute to return the expected TestResponseModel instance - mock_llm.executor.execute = AsyncMock( - return_value=TestResponseModel(name="MixedTypes", value=123) + # Create a proper ChatCompletion response with JSON content + json_content = json.dumps({"name": "MixedTypes", "value": 123}) + completion_response = self.create_text_response( + json_content, + usage=CompletionUsage( + completion_tokens=100, prompt_tokens=150, total_tokens=250 + ), ) + # Patch executor.execute to return the ChatCompletion with JSON + mock_llm.executor.execute = AsyncMock(return_value=completion_response) + # Call generate_structured with mixed message types result = await mock_llm.generate_structured(messages, TestResponseModel)