-
Notifications
You must be signed in to change notification settings - Fork 781
Get structured outputs using LLM native APIs #418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRemoves the Instructor dependency and migrates structured-output generation to native provider capabilities (Anthropic, OpenAI, Google, Azure). Adds Anthropic base_url configuration and a strict flag to request params. Updates provider implementations, Azure error handling, method signatures, and tests to new single-call JSON-schema flows. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant AugLLM as AugmentedLLM
participant Provider as LLM Provider
rect rgb(245,248,255)
note over Caller,AugLLM: Structured generation request
Caller->>AugLLM: generate_structured(messages, response_model, params)
AugLLM->>AugLLM: Build messages + JSON Schema (strict optional)
end
alt Anthropic (tool-use)
AugLLM->>Provider: messages + tool(return_structured_output{schema})
Provider-->>AugLLM: stream(tool_use with JSON)
AugLLM->>AugLLM: validate via response_model
else OpenAI (response_format)
AugLLM->>Provider: messages + response_format{json_schema, strict?}
Provider-->>AugLLM: content JSON
AugLLM->>AugLLM: parse + model_validate
else Google (Gemini)
AugLLM->>Provider: contents + config{response_schema, mime=application/json}
Provider-->>AugLLM: JSON text in first candidate
AugLLM->>AugLLM: json.loads + model_validate
end
AugLLM-->>Caller: Typed model instance
sequenceDiagram
autonumber
participant Caller
participant Azure as AzureAugmentedLLM
participant AzureAPI as Azure OpenAI API
Caller->>Azure: request_completion_task(payload)
Azure->>AzureAPI: call(payload)
alt HttpResponseError != 400
AzureAPI--x Azure: error
Azure-->>Caller: raise
else Http 400 fallback
AzureAPI--x Azure: 400
Azure->>Azure: build fallback_payload (max_tokens=None, temperature=1)
Azure->>AzureAPI: retry(fallback_payload)
alt retry fails
AzureAPI--x Azure: error
Azure-->>Caller: RuntimeError(original+retry)
else retry ok
AzureAPI-->>Azure: response
Azure-->>Caller: response
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…use it. Update Azure request_completion_task to use fallback params
| if self.context and self.context.config and self.context.config.anthropic: | ||
| base_url = self.context.config.anthropic.base_url | ||
| api_key = self.context.config.anthropic.api_key | ||
| client = AsyncAnthropic(api_key=api_key, base_url=base_url) | ||
| else: | ||
| client = AsyncAnthropic() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AsyncAnthropic client should be used within an async context manager to ensure proper resource cleanup. Currently, the client is instantiated but never properly closed, which could lead to connection leaks. Consider refactoring to:
async with AsyncAnthropic(api_key=api_key, base_url=base_url) as client:
async with client.messages.stream(**args) as stream:
final = await stream.get_final_message()This ensures the client is properly closed after use, preventing potential resource leaks in production environments.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
| if isinstance(data, str): | ||
| return response_model.model_validate(json.loads(data)) | ||
| return response_model.model_validate(data) | ||
| except Exception: | ||
| # Fallthrough to error | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broad except Exception: clause may obscure important errors during structured output parsing. Consider catching specific exceptions like json.JSONDecodeError or pydantic.ValidationError instead. This would provide clearer error messages and help distinguish between data format issues versus more serious system problems. The current approach makes debugging challenging as it silently falls through to a generic error message.
| if isinstance(data, str): | |
| return response_model.model_validate(json.loads(data)) | |
| return response_model.model_validate(data) | |
| except Exception: | |
| # Fallthrough to error | |
| break | |
| if isinstance(data, str): | |
| return response_model.model_validate(json.loads(data)) | |
| return response_model.model_validate(data) | |
| except json.JSONDecodeError: | |
| # JSON parsing error - invalid JSON format | |
| logger.error("Failed to parse JSON response") | |
| break | |
| except pydantic.ValidationError: | |
| # Validation error - JSON structure doesn't match expected model | |
| logger.error("Response data failed validation against model") | |
| break |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
| ) | ||
|
|
||
| # Create a new payload with fallback values for commonly problematic parameters | ||
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded fallback parameters may lead to unexpected behavior. Setting temperature=1 could significantly alter the model's output characteristics compared to what was originally requested, potentially disrupting applications that rely on specific temperature settings. Consider either preserving the original temperature value or implementing a more conservative fallback strategy that maintains output consistency with the original request parameters.
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} | |
| fallback_payload = {**payload, "max_tokens": None} |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
| except Exception: | ||
| # Fallback to pydantic JSON parsing if already a JSON string-like | ||
| return response_model.model_validate_json(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broad exception handling with a fallback parsing mechanism creates unpredictable behavior. Consider replacing the generic except Exception: with specific exception handling for json.JSONDecodeError. The current approach catches all errors indiscriminately and attempts model_validate_json(content) as a fallback, which could either:
- Succeed with malformed data, leading to silent failures
- Fail with different error messages than the original issue, making debugging difficult
A more targeted approach would improve error clarity and predictability:
try:
data = json.loads(content)
return response_model.model_validate(data)
except json.JSONDecodeError:
# Only fallback for JSON parsing errors
return response_model.model_validate_json(content)This maintains the fallback functionality while providing clearer error boundaries.
| except Exception: | |
| # Fallback to pydantic JSON parsing if already a JSON string-like | |
| return response_model.model_validate_json(content) | |
| except json.JSONDecodeError: | |
| # Fallback to pydantic JSON parsing if already a JSON string-like | |
| return response_model.model_validate_json(content) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
342-359: Propagatestrictcorrectly — good. Add robust parsing/guardrails.If the last message isn’t assistant text (e.g., tool-only turn) or the content isn’t valid JSON, this will raise. Add a safe “last assistant with text” search and JSON error context.
- response = await self.generate(message=message, request_params=request_params) - json_data = json.loads(response[-1].content) - - structured_response = response_model.model_validate(json_data) - return structured_response + response = await self.generate(message=message, request_params=request_params) + # Find the last assistant message with non-empty text content + last = next( + (r for r in reversed(response) if getattr(r, "role", None) == "assistant" and getattr(r, "content", None)), + None, + ) + if not last or not last.content: + raise ValueError("No assistant text content found to parse for structured output.") + try: + json_data = json.loads(last.content) + except json.JSONDecodeError as e: + snippet = last.content[:200].replace("\n", "\\n") + raise ValueError(f"Assistant did not return valid JSON. First 200 chars: {snippet}") from e + return response_model.model_validate(json_data)
457-479: Bug:.get('role')on Azure message objects will raise when tracing enabled.
request.payload["messages"]contains SDK message objects, not dicts. Calling.get('role')on the last element will fail.- latest_message_role = request.payload.get("messages", [{}])[-1].get("role") + msgs = request.payload.get("messages", []) + latest_message_role = getattr(msgs[-1], "role", None) if msgs else None
🧹 Nitpick comments (8)
src/mcp_agent/workflows/llm/augmented_llm.py (1)
598-652: Also surfacestrictin tracing and provider contracts.To make runs diagnosable and consistent across providers, record
strictand, optionally, whether aresponse_formatis present.Add to
annotate_span_with_request_params:if hasattr(request_params, "metadata") and request_params.metadata: record_attributes(span, request_params.metadata, "request_params.metadata") + if hasattr(request_params, "strict"): + span.set_attribute("request_params.strict", bool(request_params.strict)) + # Convenience boolean: did caller request structured output? + span.set_attribute( + "request_params.has_response_format", + bool(getattr(request_params, "metadata", {}) and request_params.metadata.get("response_format")), + )Would you like me to open a follow-up to propagate
strictin the non-Azure providers’generate_structuredpaths?src/mcp_agent/config.py (2)
13-13: Avoid import-time hard dependency on httpx in config module.
from httpx import URLat import time will raise if httpx isn’t installed, even when Anthropic isn’t used.Use TYPE_CHECKING to gate the import:
+from typing import TYPE_CHECKING +if TYPE_CHECKING: + from httpx import URL # type: ignoreThen annotate with a forward ref (see next comment).
209-209: Prefer forward-ref type and alias parity.
- Use a forward ref to avoid runtime import:
str | "URL" | None.- Optional: add
validation_aliasfor parity with other fields.- base_url: str | URL | None = Field(default=None) + base_url: str | "URL" | None = Field( + default=None, + validation_alias=AliasChoices("base_url", "ANTHROPIC_BASE_URL", "anthropic__base_url"), + )src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
480-503: Type/semantics mismatch: response argument is a ChatCompletions, not a ResponseMessage.The method operates on
.choices[0].message, which exists onChatCompletions. Update the annotation (and docstring) to avoid confusion and future errors.- def _annotate_span_for_completion_response( - self, span: trace.Span, response: ResponseMessage, turn: int - ) -> None: + def _annotate_span_for_completion_response( + self, span: trace.Span, response: ChatCompletions, turn: int + ) -> None: """Annotate the span with the completion response as an event."""Optionally set
event_namefromresponse.choices[0].message.roleas you already do.
132-139: Nit: docstring mentions “Azure OpenAI 5” — likely meant “Azure OpenAI (e.g., GPT‑4o‑mini)”.Update to avoid confusion.
- The default implementation uses Azure OpenAI 5 as the LLM. + The default implementation uses Azure OpenAI (e.g., GPT-4o-mini) as the LLM.src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
475-491: Harden tool_use extraction and diagnostics.If parsing fails, we drop error context. Capture the last tool_use payload and include model name in the exception. Also accept string/dict inputs defensively.
# Extract tool_use input and validate - for block in final.content: + last_tool_input = None + for block in final.content: if ( getattr(block, "type", None) == "tool_use" and getattr(block, "name", "") == "return_structured_output" ): data = getattr(block, "input", None) + last_tool_input = data try: if isinstance(data, str): return response_model.model_validate(json.loads(data)) return response_model.model_validate(data) except Exception: # Fallthrough to error break - raise ValueError( - "Failed to obtain structured output from Anthropic response" - ) + raise ValueError( + f"Failed to obtain/validate structured output from Anthropic response (model={model_name}); last_tool_input={to_string(last_tool_input)}" + )src/mcp_agent/workflows/llm/augmented_llm_google.py (1)
264-272: Optional: surface strict intent.Gemini ignores “strict” today; if params.strict is set, append a short system hint like “Only output JSON per the response schema—no prose.” Helps guard against extra text.
config = types.GenerateContentConfig( max_output_tokens=params.maxTokens, temperature=params.temperature, stop_sequences=params.stopSequences or [], - system_instruction=self.instruction or params.systemPrompt, + system_instruction=(self.instruction or params.systemPrompt) + + (" Only output JSON per the response schema—no prose." if params.strict else ""), )src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
474-482: Optional: include schema title and ensure deterministic ordering.Some OpenAI models are sensitive to schema churn. Sorting properties and setting a stable “name” helps cache hits and reduces drift.
- schema = response_model.model_json_schema() + schema = response_model.model_json_schema() + if "properties" in schema and isinstance(schema["properties"], dict): + schema["properties"] = dict(sorted(schema["properties"].items()))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
pyproject.toml(0 hunks)src/mcp_agent/config.py(2 hunks)src/mcp_agent/workflows/llm/augmented_llm.py(1 hunks)src/mcp_agent/workflows/llm/augmented_llm_anthropic.py(2 hunks)src/mcp_agent/workflows/llm/augmented_llm_azure.py(3 hunks)src/mcp_agent/workflows/llm/augmented_llm_google.py(3 hunks)src/mcp_agent/workflows/llm/augmented_llm_openai.py(4 hunks)
💤 Files with no reviewable changes (1)
- pyproject.toml
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/workflows/llm/augmented_llm_google.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
🧬 Code graph analysis (4)
src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
src/mcp_agent/app.py (1)
logger(189-204)src/mcp_agent/logging/logger.py (2)
error(166-174)warning(156-164)
src/mcp_agent/workflows/llm/augmented_llm_google.py (2)
src/mcp_agent/workflows/llm/augmented_llm.py (6)
get_request_params(381-402)select_model(335-379)get(94-95)get(117-118)append(91-92)append(114-115)src/mcp_agent/workflows/llm/multipart_converter_google.py (2)
GoogleConverter(39-374)convert_mixed_messages_to_google(340-374)
src/mcp_agent/workflows/llm/augmented_llm_openai.py (4)
src/mcp_agent/utils/pydantic_type_serializer.py (1)
deserialize_model(946-957)src/mcp_agent/tracing/telemetry.py (1)
get_tracer(171-175)src/mcp_agent/workflows/llm/augmented_llm.py (5)
annotate_span_with_request_params(599-651)append(91-92)append(114-115)get(94-95)get(117-118)src/mcp_agent/workflows/llm/multipart_converter_openai.py (2)
OpenAIConverter(37-500)convert_mixed_messages_to_openai(467-500)
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (3)
src/mcp_agent/tracing/telemetry.py (1)
get_tracer(171-175)src/mcp_agent/workflows/llm/augmented_llm.py (5)
_annotate_span_for_generation_message(653-671)get_request_params(381-402)select_model(335-379)get(94-95)get(117-118)src/mcp_agent/workflows/llm/multipart_converter_anthropic.py (2)
AnthropicConverter(50-518)convert_mixed_messages_to_anthropic(485-518)
🪛 GitHub Actions: Pull Request Checks
src/mcp_agent/workflows/llm/augmented_llm_google.py
[error] 296-296: AttributeError: 'TestResponseModel' object has no attribute 'candidates'. Test path: tests/workflows/llm/test_augmented_llm_google.py::TestGoogleAugmentedLLM::test_generate_structured.
[error] 296-296: AttributeError: 'TestResponseModel' object has no attribute 'candidates'. Test path: tests/workflows/llm/test_augmented_llm_google.py::TestGoogleAugmentedLLM::test_generate_structured_with_mixed_message_types.
src/mcp_agent/workflows/llm/augmented_llm_openai.py
[error] 506-506: AttributeError: 'TestResponseModel' object has no attribute 'choices'. Test path: tests/workflows/llm/test_augmented_llm_openai.py::TestOpenAIAugmentedLLM::test_generate_structured.
[error] 506-506: AttributeError: 'TestResponseModel' object has no attribute 'choices'. Test path: tests/workflows/llm/test_augmented_llm_openai.py::TestOpenAIAugmentedLLM::test_generate_structured_with_mixed_message_types.
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
[error] 471-471: AttributeError: aenter when using async with client.messages.stream(**args). Test path: tests/workflows/llm/test_augmented_llm_anthropic.py::TestAnthropicAugmentedLLM::test_generate_structured.
[error] 471-471: AttributeError: aenter when using async with client.messages.stream(**args). Test path: tests/workflows/llm/test_augmented_llm_anthropic.py::TestAnthropicAugmentedLLM::test_generate_structured_with_mixed_message_types.
🔇 Additional comments (2)
src/mcp_agent/workflows/llm/augmented_llm.py (1)
171-175: Good addition: request-time toggle for strict JSON schema.The
strictflag is clear and defaults safe.src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
441-448: Confirm schema compatibility with Anthropic tools.Pydantic v2 emits 2020-12 JSON Schema; Anthropic tools expect JSON Schema draft-07-like. If you hit validation issues, consider down-leveling or normalizing the schema before assignment.
Would you like a helper that normalizes Pydantic schema (e.g., drops $defs, converts nullable/anyOf) before using as input_schema?
| # Call Anthropic directly (one-turn streaming for consistency) | ||
| base_url = None | ||
| if self.context and self.context.config and self.context.config.anthropic: | ||
| base_url = self.context.config.anthropic.base_url | ||
| api_key = self.context.config.anthropic.api_key | ||
| client = AsyncAnthropic(api_key=api_key, base_url=base_url) | ||
| else: | ||
| client = AsyncAnthropic() | ||
|
|
||
| async with client.messages.stream(**args) as stream: | ||
| final = await stream.get_final_message() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix: replace async streaming context with existing executor path (SDK mismatch causes aenter error).
The Anthropic SDK in CI doesn’t expose an async context manager for messages.stream(), leading to AttributeError: aenter. Also, constructing AsyncAnthropic here bypasses provider routing (bedrock/vertex) and our workflow_task/telemetry. Reuse AnthropicCompletionTasks.request_completion_task for parity with generate() and testability.
Apply:
- # Call Anthropic directly (one-turn streaming for consistency)
- base_url = None
- if self.context and self.context.config and self.context.config.anthropic:
- base_url = self.context.config.anthropic.base_url
- api_key = self.context.config.anthropic.api_key
- client = AsyncAnthropic(api_key=api_key, base_url=base_url)
- else:
- client = AsyncAnthropic()
-
- async with client.messages.stream(**args) as stream:
- final = await stream.get_final_message()
+ # Call Anthropic via the workflow task to honor provider routing and tracing
+ request = RequestCompletionRequest(
+ config=self.context.config.anthropic,
+ payload=args,
+ )
+ final: Message = await self.executor.execute(
+ AnthropicCompletionTasks.request_completion_task,
+ ensure_serializable(request),
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Call Anthropic directly (one-turn streaming for consistency) | |
| base_url = None | |
| if self.context and self.context.config and self.context.config.anthropic: | |
| base_url = self.context.config.anthropic.base_url | |
| api_key = self.context.config.anthropic.api_key | |
| client = AsyncAnthropic(api_key=api_key, base_url=base_url) | |
| else: | |
| client = AsyncAnthropic() | |
| async with client.messages.stream(**args) as stream: | |
| final = await stream.get_final_message() | |
| # Call Anthropic via the workflow task to honor provider routing and tracing | |
| request = RequestCompletionRequest( | |
| config=self.context.config.anthropic, | |
| payload=args, | |
| ) | |
| final: Message = await self.executor.execute( | |
| AnthropicCompletionTasks.request_completion_task, | |
| ensure_serializable(request), | |
| ) |
🧰 Tools
🪛 GitHub Actions: Pull Request Checks
[error] 471-471: AttributeError: aenter when using async with client.messages.stream(**args). Test path: tests/workflows/llm/test_augmented_llm_anthropic.py::TestAnthropicAugmentedLLM::test_generate_structured.
[error] 471-471: AttributeError: aenter when using async with client.messages.stream(**args). Test path: tests/workflows/llm/test_augmented_llm_anthropic.py::TestAnthropicAugmentedLLM::test_generate_structured_with_mixed_message_types.
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/llm/augmented_llm_anthropic.py around lines 462 to
473, replace the direct AsyncAnthropic streaming context and construction (which
causes AttributeError: __aenter__ in CI and bypasses provider routing/telemetry)
with a call to the existing executor path: invoke
AnthropicCompletionTasks.request_completion_task (the same path used by
generate()) passing the assembled args and current workflow_task/telemetry
context so provider routing (bedrock/vertex), config-derived api/base_url, and
telemetry are preserved; await the task result and extract the final message
from that response instead of using client.messages.stream(), and remove direct
AsyncAnthropic instantiation.
| payload = request.payload.copy() | ||
| loop = asyncio.get_running_loop() | ||
| response = await loop.run_in_executor( | ||
| None, functools.partial(azure_client.complete, **payload) | ||
| ) | ||
|
|
||
| try: | ||
| response = await loop.run_in_executor( | ||
| None, functools.partial(azure_client.complete, **payload) | ||
| ) | ||
| except HttpResponseError as e: | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| if e.status_code != 400: | ||
| logger.error(f"Azure API call failed: {e}") | ||
| raise | ||
|
|
||
| logger.warning( | ||
| f"Initial Azure API call failed: {e}. Retrying with fallback parameters." | ||
| ) | ||
|
|
||
| # Create a new payload with fallback values for commonly problematic parameters | ||
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} | ||
|
|
||
| try: | ||
| response = await loop.run_in_executor( | ||
| None, functools.partial(azure_client.complete, **fallback_payload) | ||
| ) | ||
| except Exception as retry_error: | ||
| # If retry also fails, raise a more informative error | ||
| raise RuntimeError( | ||
| f"Azure API call failed even with fallback parameters. " | ||
| f"Original error: {e}. Retry error: {retry_error}" | ||
| ) from retry_error | ||
| return response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Deep-copy request payload and use project logger for consistency.
- Shallow
dict.copy()won’t protect nested structures if the SDK mutates them. - Use the project logger (
get_logger) instead of stdlib logging to keep telemetry consistent.
- payload = request.payload.copy()
+ # Protect nested structures from mutation
+ import copy
+ payload = copy.deepcopy(request.payload)
@@
- except HttpResponseError as e:
- logger = logging.getLogger(__name__)
+ except HttpResponseError as e:
+ logger = get_logger(__name__)Optional: consider exponential backoff for 429/5xx and a capped retry budget. I can draft it if you want.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| payload = request.payload.copy() | |
| loop = asyncio.get_running_loop() | |
| response = await loop.run_in_executor( | |
| None, functools.partial(azure_client.complete, **payload) | |
| ) | |
| try: | |
| response = await loop.run_in_executor( | |
| None, functools.partial(azure_client.complete, **payload) | |
| ) | |
| except HttpResponseError as e: | |
| logger = logging.getLogger(__name__) | |
| if e.status_code != 400: | |
| logger.error(f"Azure API call failed: {e}") | |
| raise | |
| logger.warning( | |
| f"Initial Azure API call failed: {e}. Retrying with fallback parameters." | |
| ) | |
| # Create a new payload with fallback values for commonly problematic parameters | |
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} | |
| try: | |
| response = await loop.run_in_executor( | |
| None, functools.partial(azure_client.complete, **fallback_payload) | |
| ) | |
| except Exception as retry_error: | |
| # If retry also fails, raise a more informative error | |
| raise RuntimeError( | |
| f"Azure API call failed even with fallback parameters. " | |
| f"Original error: {e}. Retry error: {retry_error}" | |
| ) from retry_error | |
| return response | |
| # Protect nested structures from mutation | |
| import copy | |
| payload = copy.deepcopy(request.payload) | |
| loop = asyncio.get_running_loop() | |
| try: | |
| response = await loop.run_in_executor( | |
| None, functools.partial(azure_client.complete, **payload) | |
| ) | |
| except HttpResponseError as e: | |
| logger = get_logger(__name__) | |
| if e.status_code != 400: | |
| logger.error(f"Azure API call failed: {e}") | |
| raise | |
| logger.warning( | |
| f"Initial Azure API call failed: {e}. Retrying with fallback parameters." | |
| ) | |
| # Create a new payload with fallback values for commonly problematic parameters | |
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} | |
| try: | |
| response = await loop.run_in_executor( | |
| None, functools.partial(azure_client.complete, **fallback_payload) | |
| ) | |
| except Exception as retry_error: | |
| # If retry also fails, raise a more informative error | |
| raise RuntimeError( | |
| f"Azure API call failed even with fallback parameters. " | |
| f"Original error: {e}. Retry error: {retry_error}" | |
| ) from retry_error | |
| return response |
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/llm/augmented_llm_azure.py around lines 545 to 576,
replace the shallow payload copy and stdlib logger use: perform a deep copy of
request.payload (use copy.deepcopy) before mutating or passing it to
azure_client so nested structures aren’t accidentally modified, and switch
logging.getLogger(__name__) to the project get_logger() call to keep telemetry
consistent; keep the same retry/fallback logic but operate on the deep-copied
payload and ensure any new imports (copy) and the project logger are present at
the top of the file.
| api_response: types.GenerateContentResponse = await self.executor.execute( | ||
| GoogleCompletionTasks.request_completion_task, | ||
| RequestCompletionRequest( | ||
| config=self.context.config.google, | ||
| params=params, | ||
| response_model=response_model | ||
| if not serialized_response_model | ||
| else None, | ||
| serialized_response_model=serialized_response_model, | ||
| response_str=response, | ||
| model=model, | ||
| payload={ | ||
| "model": model, | ||
| "contents": conversation, | ||
| "config": config, | ||
| }, | ||
| ), | ||
| ) | ||
|
|
||
| # TODO: saqadri (MAC) - fix request_structured_completion_task to return ensure_serializable | ||
| # Convert dict back to the proper model instance if needed | ||
| if isinstance(structured_response, dict): | ||
| structured_response = response_model.model_validate(structured_response) | ||
| # Extract JSON text from response | ||
| text = None | ||
| if api_response and api_response.candidates: | ||
| cand = api_response.candidates[0] | ||
| if cand.content and cand.content.parts: | ||
| for part in cand.content.parts: | ||
| if part.text: | ||
| text = part.text | ||
| break | ||
|
|
||
| if not text: | ||
| raise ValueError("No structured response returned by Gemini") | ||
|
|
||
| return structured_response | ||
| data = json.loads(text) | ||
| return response_model.model_validate(data) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make generate_structured tolerant to mocked returns (fix CI: 'TestResponseModel' has no candidates).
Tests stub the task to return a Pydantic model (or dict/JSON string). Guard before accessing candidates and validate directly when a structured object/string is returned.
- api_response: types.GenerateContentResponse = await self.executor.execute(
+ api_response = await self.executor.execute(
GoogleCompletionTasks.request_completion_task,
RequestCompletionRequest(
config=self.context.config.google,
payload={
"model": model,
"contents": conversation,
"config": config,
},
),
)
- # Extract JSON text from response
- text = None
+ # If tests/mocks return the structured object directly, accept it
+ if isinstance(api_response, BaseModel):
+ return response_model.model_validate(api_response.model_dump())
+ if isinstance(api_response, dict):
+ return response_model.model_validate(api_response)
+ if isinstance(api_response, str):
+ data = json.loads(api_response)
+ return response_model.model_validate(data)
+
+ # Extract JSON text from real API response
+ text: str | None = None
if api_response and api_response.candidates:
cand = api_response.candidates[0]
if cand.content and cand.content.parts:
for part in cand.content.parts:
if part.text:
text = part.text
break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| api_response: types.GenerateContentResponse = await self.executor.execute( | |
| GoogleCompletionTasks.request_completion_task, | |
| RequestCompletionRequest( | |
| config=self.context.config.google, | |
| params=params, | |
| response_model=response_model | |
| if not serialized_response_model | |
| else None, | |
| serialized_response_model=serialized_response_model, | |
| response_str=response, | |
| model=model, | |
| payload={ | |
| "model": model, | |
| "contents": conversation, | |
| "config": config, | |
| }, | |
| ), | |
| ) | |
| # TODO: saqadri (MAC) - fix request_structured_completion_task to return ensure_serializable | |
| # Convert dict back to the proper model instance if needed | |
| if isinstance(structured_response, dict): | |
| structured_response = response_model.model_validate(structured_response) | |
| # Extract JSON text from response | |
| text = None | |
| if api_response and api_response.candidates: | |
| cand = api_response.candidates[0] | |
| if cand.content and cand.content.parts: | |
| for part in cand.content.parts: | |
| if part.text: | |
| text = part.text | |
| break | |
| if not text: | |
| raise ValueError("No structured response returned by Gemini") | |
| return structured_response | |
| data = json.loads(text) | |
| return response_model.model_validate(data) | |
| api_response = await self.executor.execute( | |
| GoogleCompletionTasks.request_completion_task, | |
| RequestCompletionRequest( | |
| config=self.context.config.google, | |
| payload={ | |
| "model": model, | |
| "contents": conversation, | |
| "config": config, | |
| }, | |
| ), | |
| ) | |
| # If tests/mocks return the structured object directly, accept it | |
| if isinstance(api_response, BaseModel): | |
| return response_model.model_validate(api_response.model_dump()) | |
| if isinstance(api_response, dict): | |
| return response_model.model_validate(api_response) | |
| if isinstance(api_response, str): | |
| data = json.loads(api_response) | |
| return response_model.model_validate(data) | |
| # Extract JSON text from real API response | |
| text: str | None = None | |
| if api_response and api_response.candidates: | |
| cand = api_response.candidates[0] | |
| if cand.content and cand.content.parts: | |
| for part in cand.content.parts: | |
| if part.text: | |
| text = part.text | |
| break | |
| if not text: | |
| raise ValueError("No structured response returned by Gemini") | |
| data = json.loads(text) | |
| return response_model.model_validate(data) |
🧰 Tools
🪛 GitHub Actions: Pull Request Checks
[error] 296-296: AttributeError: 'TestResponseModel' object has no attribute 'candidates'. Test path: tests/workflows/llm/test_augmented_llm_google.py::TestGoogleAugmentedLLM::test_generate_structured.
[error] 296-296: AttributeError: 'TestResponseModel' object has no attribute 'candidates'. Test path: tests/workflows/llm/test_augmented_llm_google.py::TestGoogleAugmentedLLM::test_generate_structured_with_mixed_message_types.
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/llm/augmented_llm_google.py around lines 282 to 309,
the current code assumes api_response.candidates exists and extracts parts,
which breaks tests that stub the task to return a Pydantic model, dict, or JSON
string; update the logic to first check if api_response has a usable
'candidates' sequence and extract text as before, otherwise if api_response is
already a dict or has a serializable structure use it directly, and if
api_response is a string attempt json.loads on it; after these guarded branches
call response_model.model_validate on the resulting dict/object and only raise
ValueError if no usable structured data can be obtained.
| except Exception: | ||
| # Fallback to pydantic JSON parsing if already a JSON string-like | ||
| return response_model.model_validate_json(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling in this section has a potential issue with exception flow. Currently, if json.loads(content) succeeds but response_model.model_validate(data) fails, that validation exception will propagate uncaught.
Consider restructuring to handle parsing and validation errors separately:
try:
data = json.loads(content)
return response_model.model_validate(data)
except json.JSONDecodeError:
# Only fall back if JSON parsing specifically fails
return response_model.model_validate_json(content)
except ValidationError:
# Handle validation errors explicitly
raise ValueError(f"Response data doesn't match expected schema: {content}")This approach provides clearer error boundaries and more specific error messages for debugging.
| except Exception: | |
| # Fallback to pydantic JSON parsing if already a JSON string-like | |
| return response_model.model_validate_json(content) | |
| except json.JSONDecodeError: | |
| # Fallback to pydantic JSON parsing if already a JSON string-like | |
| return response_model.model_validate_json(content) | |
| except ValidationError: | |
| # Handle validation errors explicitly | |
| raise ValueError(f"Response data doesn't match expected schema: {content}") |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
tests/workflows/llm/test_augmented_llm_google.py (1)
1-1: Fix unused import to unblock CI (Ruff F401).
patchis unused and breaks the PR checks.Apply this diff:
-from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMocktests/workflows/llm/test_augmented_llm_openai.py (1)
2-2: Fix unused import to unblock CI (Ruff F401).
patchis unused and fails the Ruff check.-from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMocksrc/mcp_agent/workflows/llm/augmented_llm_azure.py (1)
350-359: Propagatestrictfrom merged params and avoid mutating caller’sRequestParams+ add robust JSON parsing.Currently this bypasses defaults via
get_request_params()and mutates the caller object; it can drop a defaultstrict=Trueset on the instance and surprises callers. Also add guards +model_validate_jsonfallback.- request_params = request_params or RequestParams() - metadata = request_params.metadata or {} - metadata["response_format"] = JsonSchemaFormat( - name=response_model.__name__, - description=response_model.__doc__, - schema=json_schema, - strict=request_params.strict, - ) - request_params.metadata = metadata - - response = await self.generate(message=message, request_params=request_params) - json_data = json.loads(response[-1].content) - - structured_response = response_model.model_validate(json_data) - return structured_response + params = self.get_request_params(request_params) + metadata = dict(params.metadata or {}) + metadata["response_format"] = JsonSchemaFormat( + name=response_model.__name__, + description=getattr(response_model, "__doc__", None), + schema=json_schema, + strict=params.strict, + ) + params.metadata = metadata + + responses = await self.generate(message=message, request_params=params) + if not responses or responses[-1].content is None: + raise ValueError("No structured content returned by model") + try: + data = json.loads(responses[-1].content) + return response_model.model_validate(data) + except Exception: + # If already JSON string-like, let Pydantic parse directly + return response_model.model_validate_json(responses[-1].content)Also applies to: 360-364
♻️ Duplicate comments (3)
src/mcp_agent/workflows/llm/augmented_llm_azure.py (1)
545-576: Deep-copy payload, use project logger, and don’t overridetemperaturein 400 fallback.
- Shallow copy risks nested mutation by the SDK.
- Prefer
get_loggerfor consistency.- Changing
temperatureto 1 can materially alter behavior; keep original value (prior feedback echoed here).- payload = request.payload.copy() + # Protect nested structures from mutation + import copy + payload = copy.deepcopy(request.payload) @@ - except HttpResponseError as e: - logger = logging.getLogger(__name__) + except HttpResponseError as e: + logger = get_logger(__name__) @@ - # Create a new payload with fallback values for commonly problematic parameters - fallback_payload = {**payload, "max_tokens": None, "temperature": 1} + # Retry with conservative fallback (preserve caller's temperature) + fallback_payload = {**payload, "max_tokens": None}Add the missing import near the top if not present:
+import copysrc/mcp_agent/workflows/llm/augmented_llm_anthropic.py (2)
475-492: Narrow exception handling and log validation/parse errors.Catching
Exceptionhides root causes and makes CI debugging hard. Catchjson.JSONDecodeErrorandpydantic.ValidationErrorexplicitly and log them.- import json + import json + from pydantic import ValidationError ... - except Exception: - # Fallthrough to error - break + except json.JSONDecodeError as e: + self.logger.error("Failed to parse JSON structured output", exc_info=e) + break + except ValidationError as e: + self.logger.error("Structured output failed model validation", exc_info=e) + break + except Exception as e: + self.logger.error("Unexpected error validating structured output", exc_info=e) + break
462-474: Don’t instantiate/streamAsyncAnthropicdirectly here—reuse the executor path to preserve provider routing (bedrock/vertex), telemetry, and avoid SDK context-manager mismatches.This bypasses
AnthropicCompletionTasks.request_completion_task, so config-driven routing, tracing, and our workflow_task semantics are skipped. Also, some Anthropic SDK versions don’t supportasync with client.messages.stream(...)causing__aenter__failures in CI. Use the existing executor likegenerate()does.- # Call Anthropic directly (one-turn streaming for consistency) - base_url = None - if self.context and self.context.config and self.context.config.anthropic: - base_url = self.context.config.anthropic.base_url - api_key = self.context.config.anthropic.api_key - client = AsyncAnthropic(api_key=api_key, base_url=base_url) - else: - client = AsyncAnthropic() - - async with client: - async with client.messages.stream(**args) as stream: - final = await stream.get_final_message() + # Call Anthropic via the workflow task to honor provider routing & tracing + request = RequestCompletionRequest( + config=self.context.config.anthropic, + payload=args, + ) + self._annotate_span_for_completion_request(span, request, 0) + final: Message = await self.executor.execute( + AnthropicCompletionTasks.request_completion_task, + ensure_serializable(request), + ) + self._annotate_span_for_completion_response(span, final, 0)If you prefer to keep direct SDK usage, at minimum gate by provider (avoid
AsyncAnthropicfor bedrock/vertex) and verify the installed SDK supports the async CM onmessages.stream().
🧹 Nitpick comments (6)
tests/workflows/llm/test_augmented_llm_google.py (1)
176-180: Update docstring to reflect native Gemini structured outputs (not Instructor).- Tests structured output generation using Instructor. + Tests structured output generation using Gemini native structured outputs.tests/workflows/llm/test_augmented_llm_openai.py (1)
179-179: Remove redundant localimport json; top-level import already present.- import json + # use module-level json importAlso applies to: 610-610
src/mcp_agent/workflows/llm/augmented_llm_azure.py (1)
548-576: Optional: add capped backoff for transient errors (429/5xx).Exponential backoff with jitter and a small retry budget will improve resilience without hammering the endpoint. I can draft a focused utility if helpful.
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
440-456: ThreadRequestParams.strictinto the JSON Schema (disallow extras when strict).When
params.strictis true, setadditionalProperties: falseon the root schema so the model is steered to emit only declared fields.- schema = response_model.model_json_schema() + schema = response_model.model_json_schema() + # Tighten schema if strict requested + if getattr(params, "strict", False) and isinstance(schema, dict): + schema.setdefault("additionalProperties", False)tests/workflows/llm/test_augmented_llm_anthropic.py (2)
241-253: Streaming SDK patching tightly couples tests to the direct SDK path. Consider stubbing the executor instead.If you adopt the executor refactor above, update tests to patch
llm.executor.executeto return aMessagewith atool_useblock instead of patchingAsyncAnthropic.Example change:
- with patch( - "mcp_agent.workflows.llm.augmented_llm_anthropic.AsyncAnthropic" - ) as MockAsyncAnthropic: - ... - result = await AnthropicAugmentedLLM.generate_structured( - mock_llm, "Test query", TestResponseModel - ) + mock_llm.executor.execute = AsyncMock(return_value=mock_message) + result = await mock_llm.generate_structured("Test query", TestResponseModel)
255-257: Call the bound method on the instance for clarity.Minor readability improvement; current call works but is unconventional.
- result = await AnthropicAugmentedLLM.generate_structured( - mock_llm, "Test query", TestResponseModel - ) + result = await mock_llm.generate_structured("Test query", TestResponseModel)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py(2 hunks)src/mcp_agent/workflows/llm/augmented_llm_azure.py(4 hunks)tests/workflows/llm/test_augmented_llm_anthropic.py(3 hunks)tests/workflows/llm/test_augmented_llm_google.py(2 hunks)tests/workflows/llm/test_augmented_llm_openai.py(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
🧬 Code graph analysis (3)
tests/workflows/llm/test_augmented_llm_openai.py (2)
tests/workflows/llm/test_augmented_llm_azure.py (1)
create_text_response(97-115)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
generate_structured(432-515)
tests/workflows/llm/test_augmented_llm_google.py (1)
src/mcp_agent/workflows/llm/augmented_llm_google.py (1)
generate_structured(241-308)
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (3)
src/mcp_agent/tracing/telemetry.py (1)
get_tracer(171-175)src/mcp_agent/workflows/llm/augmented_llm.py (4)
get_request_params(381-402)select_model(335-379)get(94-95)get(117-118)src/mcp_agent/workflows/llm/multipart_converter_anthropic.py (2)
AnthropicConverter(50-518)convert_mixed_messages_to_anthropic(485-518)
🪛 GitHub Actions: Pull Request Checks
tests/workflows/llm/test_augmented_llm_openai.py
[error] 2-2: Ruff: F401: 'patch' imported but unused. (Command: 'ruff check').
tests/workflows/llm/test_augmented_llm_google.py
[error] 1-1: Ruff: F401: 'patch' imported but unused. (Command: 'ruff check').
tests/workflows/llm/test_augmented_llm_anthropic.py
[error] 216-216: Ruff: F401: 'json' imported but unused. (Command: 'ruff check').
[error] 808-808: Ruff: F401: 'json' imported but unused. (Command: 'ruff check').
🔇 Additional comments (5)
tests/workflows/llm/test_augmented_llm_google.py (2)
186-203: LGTM: native JSON structured-output test is correct.Good switch to returning a GenerateContentResponse with JSON text and validating into Pydantic.
Also applies to: 191-195, 196-203
770-784: LGTM: mixed message types structured-output test.The JSON payload + model validation path matches the new Gemini implementation.
Also applies to: 775-779, 780-784
tests/workflows/llm/test_augmented_llm_openai.py (2)
177-204: LGTM: native OpenAI structured-output test.Constructing JSON content and validating via Pydantic mirrors the production flow.
627-640: LGTM: mixed message types structured-output test.Payload and assertions align with the new OpenAI path.
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
451-456: LGTM on forced tool choice for native structured output.Forcing
tool_choicetoreturn_structured_outputaligns with the PR goal and avoids free-form text paths.
| for block in final.content: | ||
| if ( | ||
| getattr(block, "type", None) == "tool_use" | ||
| and getattr(block, "name", "") == "return_structured_output" | ||
| ): | ||
| data = getattr(block, "input", None) | ||
| try: | ||
| if isinstance(data, str): | ||
| return response_model.model_validate(json.loads(data)) | ||
| return response_model.model_validate(data) | ||
| except Exception: | ||
| # Fallthrough to error | ||
| break | ||
|
|
||
| return structured_response | ||
| raise ValueError( | ||
| "Failed to obtain structured output from Anthropic response" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling in this section could be improved for better diagnostics. Currently, the code uses getattr() with default values which can mask actual errors in the API response structure. If block.input is None or block.name doesn't match expectations, the code will silently continue to the generic ValueError at the end rather than providing specific information about what went wrong.
Consider adding more specific error handling to distinguish between different failure modes:
for block in final.content:
block_type = getattr(block, "type", None)
if block_type != "tool_use":
continue
block_name = getattr(block, "name", None)
if block_name != "return_structured_output":
continue
data = getattr(block, "input", None)
if data is None:
raise ValueError("Tool use block found but input data is missing")
try:
if isinstance(data, str):
return response_model.model_validate(json.loads(data))
return response_model.model_validate(data)
except Exception as e:
raise ValueError(f"Failed to validate response data: {str(e)}") from e
raise ValueError("No structured output tool use found in Anthropic response")| for block in final.content: | |
| if ( | |
| getattr(block, "type", None) == "tool_use" | |
| and getattr(block, "name", "") == "return_structured_output" | |
| ): | |
| data = getattr(block, "input", None) | |
| try: | |
| if isinstance(data, str): | |
| return response_model.model_validate(json.loads(data)) | |
| return response_model.model_validate(data) | |
| except Exception: | |
| # Fallthrough to error | |
| break | |
| return structured_response | |
| raise ValueError( | |
| "Failed to obtain structured output from Anthropic response" | |
| ) | |
| for block in final.content: | |
| block_type = getattr(block, "type", None) | |
| if block_type != "tool_use": | |
| continue | |
| block_name = getattr(block, "name", None) | |
| if block_name != "return_structured_output": | |
| continue | |
| data = getattr(block, "input", None) | |
| if data is None: | |
| raise ValueError("Tool use block found but input data is missing") | |
| try: | |
| if isinstance(data, str): | |
| return response_model.model_validate(json.loads(data)) | |
| return response_model.model_validate(data) | |
| except Exception as e: | |
| raise ValueError(f"Failed to validate response data: {str(e)}") from e | |
| raise ValueError("No structured output tool use found in Anthropic response") |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
tests/workflows/llm/test_augmented_llm_google.py (1)
434-441: Raise exceptions in mocks viaside_effect, notreturn_value
- In tests/workflows/llm/test_augmented_llm_{google,openai,bedrock,anthropic,azure}.py, replace
withAsyncMock(return_value=Exception("API Error"))so the mock actually raises the exception.AsyncMock(side_effect=Exception("API Error"))src/mcp_agent/mcp/mcp_aggregator.py (2)
858-866: Bug: server_name parameter is overwrittenReassigning server_name to None ignores the caller’s explicit target server. This breaks namespaced calls that rely on the parameter.
- server_name: str = None - local_tool_name: str = None + # Use provided server_name param directly + local_tool_name: Optional[str] = NoneAdd a focused test to cover this path (see tests comment below).
1445-1449: Bug: _list_resources returns wrapper object, not listFor consistency with _list_tools/_list_prompts and likely MCP expectations, return the resources list.
- async def _list_resources(self): + async def _list_resources(self) -> List[Resource]: """List available resources from the connected MCP servers.""" - resources = await self.aggregator.list_resources() - return resources + result = await self.aggregator.list_resources() + return result.resourcessrc/mcp_agent/cli/cloud/commands/logger/tail/main.py (1)
176-181: Don’t mask CLIError exit codes; handle them explicitly.Catching all Exceptions forces exit code 5 and breaks CLI contracts for scripted use. Handle CLIError separately and preserve its exit_code.
- except KeyboardInterrupt: - console.print("\n[yellow]Interrupted by user[/yellow]") - sys.exit(0) - except Exception as e: - console.print(f"[red]Error: {e}[/red]") - raise typer.Exit(5) + except KeyboardInterrupt: + console.print("\n[yellow]Interrupted by user[/yellow]") + raise typer.Exit(0) + except CLIError as e: + console.print(f"[red]{e}[/red]") + raise typer.Exit(e.exit_code) + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + raise typer.Exit(5)
🧹 Nitpick comments (14)
tests/workflows/llm/test_augmented_llm_google.py (4)
176-181: Update stale docstring (no longer uses Instructor).The test now validates native JSON structured outputs; reflect that to avoid confusion.
- Tests structured output generation using Instructor. + Tests structured output generation using Gemini native structured outputs (application/json).
186-202: Good shift to native JSON; also assert JSON-mode config to prevent regressions.Add assertions that the request set
response_mime_typeand the schema as expected.result = await mock_llm.generate_structured("Test query", TestResponseModel) # Assertions assert isinstance(result, TestResponseModel) assert result.name == "Test" assert result.value == 42 + # Verify JSON-mode configuration was used + req = mock_llm.executor.execute.call_args[0][1] + cfg = req.payload["config"] + assert cfg.response_mime_type == "application/json" + assert cfg.response_schema in ( + TestResponseModel, + TestResponseModel.model_json_schema(), + )
186-190: Deduplicate inlineimport json; move to module scope.Inline imports are repeated; centralize at top for clarity.
- import jsonAdditionally add at the top of this file:
# At file top-level (near other imports) import jsonAlso applies to: 769-774
769-781: Mirror JSON-mode assertions for the mixed-message test.This ensures both code paths set the correct response config.
result = await mock_llm.generate_structured(messages, TestResponseModel) assert isinstance(result, TestResponseModel) assert result.name == "MixedTypes" assert result.value == 123 + # Verify JSON-mode configuration on the request + req = mock_llm.executor.execute.call_args[0][1] + cfg = req.payload["config"] + assert cfg.response_mime_type == "application/json" + assert cfg.response_schema in ( + TestResponseModel, + TestResponseModel.model_json_schema(), + )src/mcp_agent/mcp/mcp_aggregator.py (4)
352-359: Config check: OK, but warning level may be noisyThe fallback path is correct. Consider downgrading the log to info to avoid warning fatigue when running without per-server configs.
361-369: Ensure O(1) membership by normalizing allowed_tools to a setIf callers pass a list, membership is O(n) despite the comment. Normalize once.
Apply:
- allowed_tools = self.context.server_registry.get_server_config( - server_name - ).allowed_tools + cfg = self.context.server_registry.get_server_config(server_name) + allowed_tools = ( + set(cfg.allowed_tools) + if getattr(cfg, "allowed_tools", None) is not None + else None + )
553-555: Typo in span name ("get_capabilitites")Fix spelling to keep traces searchable.
- f"{self.__class__.__name__}.get_capabilitites" + f"{self.__class__.__name__}.get_capabilities"
311-317: Duplicate loading during create()aenter() already initializes and loads servers; calling load_servers() again is redundant work.
- logger.debug("Loading servers...") - await instance.load_servers() - - logger.debug("MCPAggregator created and initialized.") + logger.debug("MCPAggregator created and initialized via __aenter__().")tests/mcp/test_mcp_aggregator.py (1)
919-961: Filtering happy-path coverage: LGTM; minor nitGood coverage for allowed set behavior. Tiny nit: mock_fetch_capabilities returns (None, ...); returning (server_name, ...) would more closely mirror production, though not functionally required.
src/mcp_agent/cli/cloud/commands/logger/tail/main.py (5)
316-318: Set a finite connect timeout for streaming.Keep read timeout unlimited for SSE, but avoid hanging indefinitely on connect.
- async with httpx.AsyncClient(timeout=None) as client: + # No read timeout for SSE; finite connect timeout for resiliency + async with httpx.AsyncClient(timeout=httpx.Timeout(connect=30.0, read=None)) as client:
451-454: YAML streaming: emit a single document per log, not a list.Printing a list per event is awkward for consumers. Emit a mapping.
- elif format == "yaml": - cleaned_entry = _clean_log_entry(log_entry) - print(yaml.dump([cleaned_entry], default_flow_style=False)) + elif format == "yaml": + cleaned_entry = _clean_log_entry(log_entry) + print(yaml.safe_dump(cleaned_entry, default_flow_style=False))
478-486: Make _parse_log_level robust to non-string inputs.Avoid AttributeError if level isn’t a string.
-def _parse_log_level(level: str) -> str: +def _parse_log_level(level: Any) -> str: """Parse log level from API format to clean display format.""" - if level.startswith("LOG_LEVEL_"): + if isinstance(level, str) and level.startswith("LOG_LEVEL_"): clean_level = level.replace("LOG_LEVEL_", "") if clean_level == "UNSPECIFIED": return "UNKNOWN" return clean_level - return level.upper() + return str(level).upper()
72-76: Leverage Typer choices to drop manual validation.Define choices at the option level so invalid values are rejected before runtime; then you can remove the manual checks.
Example:
from typing import Literal def tail_logs( ..., order_by: Optional[Literal["timestamp", "severity"]] = typer.Option(None, "--order-by", help="..."), format: Literal["text", "json", "yaml"] = typer.Option("text", "--format", help="..."), ): ...Then the manual validations for --order-by and --format can be removed.
Also applies to: 139-143
33-34: Clarify argument help text for identifiers.Be explicit about expected forms to reduce confusion.
- help="Server ID, URL, or app configuration ID to retrieve logs for" + help="App ID (app_*), App Configuration ID (apcnf_*), or server URL to retrieve logs for"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
src/mcp_agent/cli/cloud/commands/auth/whoami/main.py(0 hunks)src/mcp_agent/cli/cloud/commands/logger/__init__.py(1 hunks)src/mcp_agent/cli/cloud/commands/logger/configure/__init__.py(1 hunks)src/mcp_agent/cli/cloud/commands/logger/configure/main.py(4 hunks)src/mcp_agent/cli/cloud/commands/logger/tail/__init__.py(1 hunks)src/mcp_agent/cli/cloud/commands/logger/tail/main.py(7 hunks)src/mcp_agent/cli/cloud/commands/logger/utils.py(2 hunks)src/mcp_agent/cli/cloud/main.py(2 hunks)src/mcp_agent/mcp/mcp_aggregator.py(1 hunks)src/mcp_agent/workflows/llm/augmented_llm_azure.py(4 hunks)tests/mcp/test_mcp_aggregator.py(7 hunks)tests/workflows/llm/test_augmented_llm_anthropic.py(3 hunks)tests/workflows/llm/test_augmented_llm_google.py(3 hunks)tests/workflows/llm/test_augmented_llm_openai.py(4 hunks)
💤 Files with no reviewable changes (1)
- src/mcp_agent/cli/cloud/commands/auth/whoami/main.py
✅ Files skipped from review due to trivial changes (6)
- src/mcp_agent/cli/cloud/main.py
- src/mcp_agent/cli/cloud/commands/logger/init.py
- src/mcp_agent/cli/cloud/commands/logger/tail/init.py
- src/mcp_agent/cli/cloud/commands/logger/configure/init.py
- src/mcp_agent/cli/cloud/commands/logger/utils.py
- src/mcp_agent/cli/cloud/commands/logger/configure/main.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/mcp_agent/workflows/llm/augmented_llm_azure.py
- tests/workflows/llm/test_augmented_llm_openai.py
- tests/workflows/llm/test_augmented_llm_anthropic.py
🧰 Additional context used
🧬 Code graph analysis (4)
src/mcp_agent/mcp/mcp_aggregator.py (2)
src/mcp_agent/app.py (3)
context(145-150)server_registry(157-158)logger(189-204)tests/mcp/test_mcp_aggregator.py (1)
get_server_config(890-891)
tests/mcp/test_mcp_aggregator.py (1)
src/mcp_agent/mcp/mcp_aggregator.py (1)
load_server(329-458)
src/mcp_agent/cli/cloud/commands/logger/tail/main.py (4)
src/mcp_agent/cli/cloud/commands/logger/utils.py (2)
parse_app_identifier(12-30)resolve_server_url(33-100)src/mcp_agent/cli/auth/main.py (1)
load_credentials(32-46)src/mcp_agent/cli/exceptions.py (1)
CLIError(4-9)src/mcp_agent/cli/auth/models.py (1)
UserCredentials(10-64)
tests/workflows/llm/test_augmented_llm_google.py (2)
tests/workflows/llm/test_augmented_llm_openai.py (1)
create_text_response(68-88)src/mcp_agent/workflows/llm/augmented_llm_google.py (1)
generate_structured(241-308)
🔇 Additional comments (10)
tests/workflows/llm/test_augmented_llm_google.py (1)
1-1: Imports cleanup LGTM.Dropping unused
patchimport is correct; the remaining mocks cover all uses.src/mcp_agent/mcp/mcp_aggregator.py (2)
372-375: Filtering debug message: LGTMClear, actionable debug output when a tool is filtered.
1406-1419: Return type from _call_tool likely mismatches MCP specReturning result.content (list) instead of CallToolResult may diverge from MCP expectations; tests assert on content only. Please verify intended contract and adjust either code or tests accordingly.
If aligning to CallToolResult is desired:
- result = await self.aggregator.call_tool(name=name, arguments=arguments) - return result.content + return await self.aggregator.call_tool(name=name, arguments=arguments)tests/mcp/test_mcp_aggregator.py (7)
874-876: MockServerConfig ctor: LGTMSimple, clear surface for allowed_tools.
881-910: Server registry test scaffold: LGTMget_server_config shim and start_server context manager cover filtering and non-persistent paths well.
983-1018: No-filtering (None) path: LGTMConfirms inclusion of all tools when no filter is provided.
1033-1063: Empty-allowlist path: LGTMAsserts complete exclusion when allowlist is empty; matches warning branch in prod code.
1125-1220: Multi-server filtering matrix: LGTMValidates per-server variance and namespaced map integrity.
1223-1277: Exact-match semantics: LGTMEnsures no partial matches slip through.
317-379: Add test for explicit server_name pathThe
call_toolbranch when passing an explicitserver_nameisn’t covered by existing tests—please verify thatMCPAggregator.call_tool(name, …, server_name=…)respects the provided server without falling back to_parse_capability_name, and add a focused test to lock this down.
| parsed = urlparse(server_url) | ||
| stream_url = f"{parsed.scheme}://{parsed.netloc}/logs" | ||
| hostname = parsed.hostname or "" | ||
| deployment_id = hostname.split('.')[0] if '.' in hostname else hostname | ||
| deployment_id = hostname.split(".")[0] if "." in hostname else hostname | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deployment ID extraction is incorrect for URLs like https://app.mcpac.dev/abc123.
You derive deployment_id from the hostname (“app”), but the ID is in the first path segment (“abc123”). Use path first, fall back to subdomain if needed.
- parsed = urlparse(server_url)
- stream_url = f"{parsed.scheme}://{parsed.netloc}/logs"
- hostname = parsed.hostname or ""
- deployment_id = hostname.split(".")[0] if "." in hostname else hostname
+ parsed = urlparse(server_url)
+ stream_url = f"{parsed.scheme}://{parsed.netloc}/logs"
+ # Prefer deployment id from path (e.g., https://host/{deployment_id}) else fallback to subdomain
+ path_id = parsed.path.lstrip("/").split("/", 1)[0]
+ if path_id:
+ deployment_id = path_id
+ else:
+ hostname = parsed.hostname or ""
+ deployment_id = hostname.split(".")[0] if "." in hostname else hostname📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| parsed = urlparse(server_url) | |
| stream_url = f"{parsed.scheme}://{parsed.netloc}/logs" | |
| hostname = parsed.hostname or "" | |
| deployment_id = hostname.split('.')[0] if '.' in hostname else hostname | |
| deployment_id = hostname.split(".")[0] if "." in hostname else hostname | |
| parsed = urlparse(server_url) | |
| stream_url = f"{parsed.scheme}://{parsed.netloc}/logs" | |
| # Prefer deployment id from path (e.g., https://host/{deployment_id}) else fallback to subdomain | |
| path_id = parsed.path.lstrip("/").split("/", 1)[0] | |
| if path_id: | |
| deployment_id = path_id | |
| else: | |
| hostname = parsed.hostname or "" | |
| deployment_id = hostname.split(".")[0] if "." in hostname else hostname |
🤖 Prompt for AI Agents
In src/mcp_agent/cli/cloud/commands/logger/tail/main.py around lines 290 to 294,
the code extracts deployment_id from the hostname but some URLs put the ID in
the first path segment (e.g. https://app.mcpac.dev/abc123); change the logic to
parse server_url.path first and if the first non-empty path segment exists use
that as deployment_id, otherwise fall back to using the hostname/subdomain as
before, and keep stream_url construction unchanged.
| buffer = "" | ||
| async for chunk in response.aiter_text(): | ||
| buffer += chunk | ||
| lines = buffer.split('\n') | ||
| lines = buffer.split("\n") | ||
|
|
||
| for line in lines[:-1]: | ||
| if line.startswith('data:'): | ||
| data_content = line.removeprefix('data:') | ||
| if line.startswith("data:"): | ||
| data_content = line.removeprefix("data:") | ||
|
|
||
| try: | ||
| log_data = json.loads(data_content) | ||
| if 'message' in log_data: | ||
| timestamp = log_data.get('time') | ||
|
|
||
| if "message" in log_data: | ||
| timestamp = log_data.get("time") | ||
| if timestamp: | ||
| formatted_timestamp = _convert_timestamp_to_local(timestamp) | ||
| formatted_timestamp = ( | ||
| _convert_timestamp_to_local(timestamp) | ||
| ) | ||
| else: | ||
| formatted_timestamp = datetime.now().isoformat() | ||
|
|
||
| log_entry = { | ||
| 'timestamp': formatted_timestamp, | ||
| 'message': log_data['message'], | ||
| 'level': log_data.get('level', 'INFO') | ||
| "timestamp": formatted_timestamp, | ||
| "message": log_data["message"], | ||
| "level": log_data.get("level", "INFO"), | ||
| } | ||
|
|
||
| if not grep_pattern or _matches_pattern(log_entry['message'], grep_pattern): | ||
|
|
||
| if not grep_pattern or _matches_pattern( | ||
| log_entry["message"], grep_pattern | ||
| ): | ||
| _display_log_entry(log_entry, format=format) | ||
|
|
||
| except json.JSONDecodeError: | ||
| # Skip malformed JSON | ||
| continue | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix SSE buffering to prevent duplicates and unbounded memory growth.
You never reset buffer to the trailing partial line, so previously processed lines are reprocessed on every chunk and buffer grows indefinitely.
lines = buffer.split("\n")
-
- for line in lines[:-1]:
+ for line in lines[:-1]:
if line.startswith("data:"):
data_content = line.removeprefix("data:")
try:
log_data = json.loads(data_content)
if "message" in log_data:
@@
except json.JSONDecodeError:
# Skip malformed JSON
continue
+ # Keep only the last (possibly partial) line for the next chunk
+ buffer = lines[-1]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| buffer = "" | |
| async for chunk in response.aiter_text(): | |
| buffer += chunk | |
| lines = buffer.split('\n') | |
| lines = buffer.split("\n") | |
| for line in lines[:-1]: | |
| if line.startswith('data:'): | |
| data_content = line.removeprefix('data:') | |
| if line.startswith("data:"): | |
| data_content = line.removeprefix("data:") | |
| try: | |
| log_data = json.loads(data_content) | |
| if 'message' in log_data: | |
| timestamp = log_data.get('time') | |
| if "message" in log_data: | |
| timestamp = log_data.get("time") | |
| if timestamp: | |
| formatted_timestamp = _convert_timestamp_to_local(timestamp) | |
| formatted_timestamp = ( | |
| _convert_timestamp_to_local(timestamp) | |
| ) | |
| else: | |
| formatted_timestamp = datetime.now().isoformat() | |
| log_entry = { | |
| 'timestamp': formatted_timestamp, | |
| 'message': log_data['message'], | |
| 'level': log_data.get('level', 'INFO') | |
| "timestamp": formatted_timestamp, | |
| "message": log_data["message"], | |
| "level": log_data.get("level", "INFO"), | |
| } | |
| if not grep_pattern or _matches_pattern(log_entry['message'], grep_pattern): | |
| if not grep_pattern or _matches_pattern( | |
| log_entry["message"], grep_pattern | |
| ): | |
| _display_log_entry(log_entry, format=format) | |
| except json.JSONDecodeError: | |
| # Skip malformed JSON | |
| continue | |
| buffer = "" | |
| async for chunk in response.aiter_text(): | |
| buffer += chunk | |
| lines = buffer.split("\n") | |
| for line in lines[:-1]: | |
| if line.startswith("data:"): | |
| data_content = line.removeprefix("data:") | |
| try: | |
| log_data = json.loads(data_content) | |
| if "message" in log_data: | |
| timestamp = log_data.get("time") | |
| if timestamp: | |
| formatted_timestamp = ( | |
| _convert_timestamp_to_local(timestamp) | |
| ) | |
| else: | |
| formatted_timestamp = datetime.now().isoformat() | |
| log_entry = { | |
| "timestamp": formatted_timestamp, | |
| "message": log_data["message"], | |
| "level": log_data.get("level", "INFO"), | |
| } | |
| if not grep_pattern or _matches_pattern( | |
| log_entry["message"], grep_pattern | |
| ): | |
| _display_log_entry(log_entry, format=format) | |
| except json.JSONDecodeError: | |
| # Skip malformed JSON | |
| continue | |
| # Keep only the last (possibly partial) line for the next chunk | |
| buffer = lines[-1] |
🤖 Prompt for AI Agents
In src/mcp_agent/cli/cloud/commands/logger/tail/main.py around lines 331-366,
the SSE chunk buffering never resets so already-processed lines are reprocessed
and buffer can grow without bound; fix it by, after iterating for line in
lines[:-1] and processing each, set buffer = lines[-1] (the trailing partial
line) so only the incomplete remainder is kept for the next chunk, and when the
stream finishes attempt to process any remaining buffered content (if it begins
with "data:") before exit; additionally enforce a reasonable max buffer size
(truncate or discard if exceeded) to prevent unbounded memory growth.
| console.print( | ||
| f"[bright_black not bold]{timestamp}[/bright_black not bold] " | ||
| f"[{level_style}]{level:7}[/{level_style}] " | ||
| f"{message}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Invalid Rich markup: remove “not bold”.
Rich doesn’t support “not bold” in markup. This can raise a MarkupError. Just use bright_black.
- console.print(
- f"[bright_black not bold]{timestamp}[/bright_black not bold] "
- f"[{level_style}]{level:7}[/{level_style}] "
- f"{message}"
- )
+ console.print(
+ f"[bright_black]{timestamp}[/bright_black] "
+ f"[{level_style}]{level:7}[/{level_style}] "
+ f"{message}"
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| console.print( | |
| f"[bright_black not bold]{timestamp}[/bright_black not bold] " | |
| f"[{level_style}]{level:7}[/{level_style}] " | |
| f"{message}" | |
| ) | |
| console.print( | |
| f"[bright_black]{timestamp}[/bright_black] " | |
| f"[{level_style}]{level:7}[/{level_style}] " | |
| f"{message}" | |
| ) |
🤖 Prompt for AI Agents
In src/mcp_agent/cli/cloud/commands/logger/tail/main.py around lines 418-422 the
Rich markup uses an invalid token "not bold" which will raise a MarkupError;
remove the "not bold" qualifier and use "[bright_black]" (and matching
"[/bright_black]") instead, or pass style="bright_black" to console.print;
update the formatted string to use valid Rich markup so it becomes
"[bright_black]{timestamp}[/bright_black] ..." (or move the color to the
console.print style argument) without any "not bold" text.
5a0e354 to
04e8619
Compare
| ) | ||
|
|
||
| # Create a new payload with fallback values for commonly problematic parameters | ||
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Azure API fallback logic sets max_tokens: None, but Azure typically requires max_tokens to be a positive integer. This could cause the fallback request to also fail with a 400 error, defeating the purpose of the retry mechanism. Consider either removing max_tokens entirely from the fallback payload or setting it to a safe default value (like 1024) instead of None.
| fallback_payload = {**payload, "max_tokens": None, "temperature": 1} | |
| fallback_payload = {**payload, "max_tokens": 1024, "temperature": 1} |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
| data = json.loads(text) | ||
| return response_model.model_validate(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling for JSON parsing could be improved. Currently, if json.loads(text) fails due to malformed JSON from the API, the code will raise a generic ValueError with the message "No structured response returned by Gemini", which doesn't accurately describe the actual issue. Consider catching specific JSON exceptions (like json.JSONDecodeError) separately and providing more descriptive error messages to aid debugging. For example:
try:
data = json.loads(text)
return response_model.model_validate(data)
except json.JSONDecodeError:
raise ValueError(f"Failed to parse JSON response from Gemini: {text[:100]}...")This would make it clearer when the issue is with JSON parsing rather than a missing response.
| data = json.loads(text) | |
| return response_model.model_validate(data) | |
| try: | |
| data = json.loads(text) | |
| return response_model.model_validate(data) | |
| except json.JSONDecodeError: | |
| raise ValueError(f"Failed to parse JSON response from Gemini: {text[:100]}...") |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
| except Exception: | ||
| # Fallback to pydantic JSON parsing if already a JSON string-like | ||
| return response_model.model_validate_json(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling logic here has a potential issue. If json.loads(content) fails because content isn't valid JSON, then falling back to model_validate_json(content) won't help since it also expects valid JSON input. This creates a misleading fallback path that doesn't actually handle different data formats. Consider either:
- Adding more specific exception handling to differentiate between parsing errors and validation errors
- Adding logging before the fallback to aid debugging
- Restructuring to first check if the content appears to be JSON before attempting to parse it
This would make the error handling more robust and the failure modes more predictable.
| except Exception: | |
| # Fallback to pydantic JSON parsing if already a JSON string-like | |
| return response_model.model_validate_json(content) | |
| except json.JSONDecodeError as e: | |
| # Log the specific JSON parsing error | |
| logger.debug(f"JSON parsing failed, attempting Pydantic validation: {e}") | |
| try: | |
| # Fallback to pydantic JSON parsing if already a JSON string-like | |
| return response_model.model_validate_json(content) | |
| except Exception as pydantic_error: | |
| # If both methods fail, raise a more descriptive error | |
| raise ValueError( | |
| f"Failed to parse content as JSON: {content[:100]}..." | |
| ) from pydantic_error |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
359-363: Harden JSON parsing and pick the correct message to parseThe last response may be a tool call or have empty content; raw json.loads can fail without context. Add guard rails and clearer errors.
- response = await self.generate(message=message, request_params=request_params) - json_data = json.loads(response[-1].content) - - structured_response = response_model.model_validate(json_data) + response = await self.generate(message=message, request_params=request_params) + # Find last assistant message with non-empty content + content_str = None + for msg in reversed(response): + if getattr(msg, "content", None): + content_str = msg.content + break + if not content_str: + raise ValueError( + "No textual assistant content found to parse as JSON for structured output." + ) + try: + json_data = json.loads(content_str) + except json.JSONDecodeError as e: + raise ValueError( + f"Assistant content is not valid JSON for {response_model.__name__}: {e}" + ) from e + structured_response = response_model.model_validate(json_data) return structured_response
470-477: Bug: .get() on Azure message objects (not dicts) will raisepayload["messages"] elements are objects (SystemMessage/UserMessage/etc.), not dicts, so .get("role") will AttributeError during tracing.
- event_name = f"completion.request.{turn}" - latest_message_role = request.payload.get("messages", [{}])[-1].get("role") - - if latest_message_role: - event_name = f"gen_ai.{latest_message_role}.message" + event_name = f"completion.request.{turn}" + messages = (request.payload.get("messages") or []) + latest = messages[-1] if messages else None + role = getattr(latest, "role", None) if latest is not None else None + if role: + event_name = f"gen_ai.{role}.message"
♻️ Duplicate comments (2)
src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
544-544: Deep-copy payload to avoid mutating caller-owned structuresShallow copy won’t protect nested dict/list entries if the SDK mutates them, especially across retry paths.
- payload = request.payload.copy() + payload = copy.deepcopy(request.payload)Add import:
# at top-level imports import copy
547-575: Fallback should preserve requested temperature; only relax max_tokensChanging temperature to 1 materially changes output style and determinism. Prior feedback already flagged this. Keep temperature unchanged and only set max_tokens=None (or remove it) for the retry.
- # Create a new payload with fallback values for commonly problematic parameters - fallback_payload = {**payload, "max_tokens": None, "temperature": 1} + # Fallback: relax only max_tokens to let the service choose a safe default + fallback_payload = {**payload, "max_tokens": None}Optional: preserve exception type and attach context instead of wrapping in RuntimeError.
- except Exception as retry_error: - # If retry also fails, raise a more informative error - raise RuntimeError( - f"Azure API call failed even with fallback parameters. " - f"Original error: {e}. Retry error: {retry_error}" - ) from retry_error + except Exception as retry_error: + # Surface original HTTP error with added context + e.add_note(f"Retry with fallback failed: {retry_error}") + raiseAlso consider an explicit retry/backoff path for 429/5xx with jitter and a small budget; I can draft it if desired.
🧹 Nitpick comments (6)
tests/workflows/llm/test_augmented_llm_google.py (2)
769-781: DRY the JSON structured-response setup and assert mixed-input request shape
- Factor the repeated JSON payload creation into a helper to reduce duplication.
- Add assertions that the mixed input list produced three contents entries and preserved roles.
Apply within this test:
@@ - import json - - json_content = json.dumps({"name": "MixedTypes", "value": 123}) - response = self.create_text_response(json_content) + import json + response = self.create_text_response(json.dumps({"name": "MixedTypes", "value": 123})) @@ result = await mock_llm.generate_structured(messages, TestResponseModel) assert isinstance(result, TestResponseModel) assert result.name == "MixedTypes" assert result.value == 123 + # Verify request contained all mixed message types + req = mock_llm.executor.execute.call_args[0][1] + contents = req.payload["contents"] + assert len(contents) == 3 + assert contents[0].role == "user"And introduce a small helper once to replace repeated JSON-response scaffolding (add anywhere in the class, e.g., after create_text_response):
@staticmethod def create_structured_json_response(data: dict): from google.genai import types import json return TestGoogleAugmentedLLM.create_text_response(json.dumps(data))
186-201: Update test to reflect Gemini native structured outputs and verify request config@@ -178,7 +178,7 @@ class TestGoogleAugmentedLLM(BaseTestLLM): - """ - Tests structured output generation using Instructor. - """ + """ + Tests structured output generation using Gemini native structured outputs. + """ @@ -186,7 +186,6 @@ class TestGoogleAugmentedLLM(BaseTestLLM): - import json json_content = json.dumps({"name": "Test", "value": 42}) response = self.create_text_response(json_content) @@ -196,6 +195,12 @@ class TestGoogleAugmentedLLM(BaseTestLLM): assert result.value == 42 + + # verify native structured-output config on the request + _, req = mock_llm.executor.execute.call_args[0] + cfg = req.payload["config"] + assert cfg.response_mime_type == "application/json" + assert cfg.response_schema is not None[nit]
tests/workflows/llm/test_augmented_llm_anthropic.py (2)
253-261: Add assertions to verify request payload to AnthropicStrengthen the test by asserting
messages.streamreceived the expected tool config and message shape.result = await AnthropicAugmentedLLM.generate_structured( mock_llm, "Test query", TestResponseModel ) # Assertions + # Verify the stream call payload + _, kwargs = mock_client.messages.stream.call_args + assert kwargs["model"] == "claude-3-7-sonnet-latest" + assert kwargs["tool_choice"] == { + "type": "tool", + "name": "return_structured_output", + } + assert kwargs["tools"][0]["name"] == "return_structured_output" + assert kwargs["messages"][0]["role"] == "user" + assert kwargs["messages"][0]["content"] == "Test query" assert isinstance(result, TestResponseModel) assert result.name == "Test" assert result.value == 42
861-867: Also assert tool wiring and message normalizationEnsure mixed inputs are normalized and tool selection is forced as expected.
result = await mock_llm.generate_structured(messages, TestResponseModel) # Assertions + # Verify stream kwargs + _, kwargs = mock_client.messages.stream.call_args + assert kwargs["tools"][0]["name"] == "return_structured_output" + assert kwargs["tool_choice"]["name"] == "return_structured_output" + assert len(kwargs["messages"]) == 3 + assert any(m.get("role") == "user" for m in kwargs["messages"]) assert isinstance(result, TestResponseModel) assert result.name == "MixedTypes" assert result.value == 123src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
98-98: Propagate session_id into logger for trace correlationWithout session_id, events won’t correlate across components. Pass it when creating the logger.
- self.logger = get_logger(f"{__name__}.{self.name}" if self.name else __name__) + session_id = getattr(self.context, "session_id", None) + self.logger = get_logger( + f"{__name__}.{self.name}" if self.name else __name__, + session_id=session_id, + )
479-501: Type hint mismatch: this function handles ChatCompletions, not ResponseMessageYou pass a ChatCompletions object at call sites. Update the annotation to improve static checks and future refactors.
- def _annotate_span_for_completion_response( - self, span: trace.Span, response: ResponseMessage, turn: int - ) -> None: + def _annotate_span_for_completion_response( + self, span: trace.Span, response: ChatCompletions, turn: int + ) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/mcp_agent/workflows/llm/augmented_llm_azure.py(4 hunks)tests/workflows/llm/test_augmented_llm_anthropic.py(3 hunks)tests/workflows/llm/test_augmented_llm_google.py(3 hunks)tests/workflows/llm/test_augmented_llm_openai.py(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/workflows/llm/test_augmented_llm_openai.py
🧰 Additional context used
🧬 Code graph analysis (3)
tests/workflows/llm/test_augmented_llm_anthropic.py (2)
tests/workflows/llm/test_augmented_llm_openai.py (1)
default_usage(57-65)src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
generate_structured(407-492)
tests/workflows/llm/test_augmented_llm_google.py (2)
tests/workflows/llm/test_augmented_llm_openai.py (1)
create_text_response(68-88)src/mcp_agent/workflows/llm/augmented_llm_google.py (1)
generate_structured(241-308)
src/mcp_agent/workflows/llm/augmented_llm_azure.py (2)
src/mcp_agent/app.py (1)
logger(189-204)src/mcp_agent/logging/logger.py (3)
get_logger(387-415)error(166-174)warning(156-164)
🔇 Additional comments (11)
tests/workflows/llm/test_augmented_llm_google.py (1)
1-1: Import cleanup looks goodRemoving unused
patchkeeps test deps lean.tests/workflows/llm/test_augmented_llm_anthropic.py (7)
213-216: LGTM: docstring + patch importDocstring reflects native structured output;
patchimport is used and resolves prior F401.
222-228: LGTM: ToolUseBlock setup matches contract
name="return_structured_output"andinputpayload align with the implementation’s expectation.
230-238: LGTM: Mocked Message constructionMessage fields and
stop_reason="tool_use"look correct.
240-252: LGTM: AsyncAnthropic stream mocking is correctly wiredContext managers and
get_final_messageare patched appropriately.
806-807: LGTM: patch import is usedThe local import is necessary for the subsequent mocking.
823-829: LGTM: ToolUseBlock for mixed-types caseMatches the handler’s expected tool name and payload.
848-860: LGTM: stream mocking mirrors first testAsync context and
get_final_messageare patched consistently.src/mcp_agent/workflows/llm/augmented_llm_azure.py (3)
5-5: LGTM: explicit Azure HTTP error importGood addition; it enables targeted handling for Azure SDK failures.
355-356: No action needed – strict parameter is supported
The project declares azure-ai-inference>=1.0.0b9 in pyproject.toml, and JsonSchemaFormat.strict was introduced in 1.0.0b7, so the parameter is available and will be honored.
367-368: Safe to drop**kwargsin Azure override
Confirmed no call sites or overrides ofAzureAugmentedLLM.convert_message_to_message_parampass keyword arguments (in both code and tests), so removing**kwargshere does not break existing callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
tests/workflows/llm/test_augmented_llm_anthropic.py (1)
831-844: Good fix onUsageconstruction.Only supported token fields are used; avoids SDK
TypeError. This addresses the earlier review.
🧹 Nitpick comments (4)
tests/workflows/llm/test_augmented_llm_anthropic.py (4)
215-215: Movepatchimport to module top to avoid duplication.Keep imports centralized and reduce repetition across tests.
Apply this diff in the selected ranges:
- from unittest.mock import patch + # patch imported at module levelAlso add
patchto the top-level import:-from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patchAlso applies to: 806-806
254-256: Prefer instance method call over class method invocation.Call the bound coroutine directly for consistency with other tests.
- result = await AnthropicAugmentedLLM.generate_structured( - mock_llm, "Test query", TestResponseModel - ) + result = await mock_llm.generate_structured("Test query", TestResponseModel)
258-262: Assert request formation to lock in the native structured-output flow.Validate
AsyncAnthropicinit andmessages.streamkwargs (tools/tool_choice/messages).# Assertions assert isinstance(result, TestResponseModel) assert result.name == "Test" assert result.value == 42 + # Verify client init and request payload + client_kwargs = MockAsyncAnthropic.call_args.kwargs + assert client_kwargs["api_key"] == "test_key" + assert client_kwargs.get("base_url") is None + stream_kwargs = mock_client.messages.stream.call_args.kwargs + assert stream_kwargs["tool_choice"] == { + "type": "tool", + "name": "return_structured_output", + } + assert stream_kwargs["tools"][0]["name"] == "return_structured_output" + assert stream_kwargs["model"] == "claude-3-7-sonnet-latest" + assert stream_kwargs["messages"][0]["role"] == "user" + assert stream_kwargs["messages"][0]["content"] == "Test query"
859-865: Add minimal assertions for stream payload in mixed-types case.Ensures proper message conversion and tool forcing.
# Call generate_structured with mixed message types result = await mock_llm.generate_structured(messages, TestResponseModel) # Assertions assert isinstance(result, TestResponseModel) assert result.name == "MixedTypes" assert result.value == 123 + # Verify payload + stream_kwargs = mock_client.messages.stream.call_args.kwargs + assert stream_kwargs["tool_choice"]["name"] == "return_structured_output" + assert len(stream_kwargs["messages"]) == 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/workflows/llm/test_augmented_llm_anthropic.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/workflows/llm/test_augmented_llm_anthropic.py (1)
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
generate_structured(407-492)
🔇 Additional comments (3)
tests/workflows/llm/test_augmented_llm_anthropic.py (3)
222-238: Structured message construction looks correct.
ToolUseBlockname matches the forced tool andMessageshape aligns with the Anthropic SDK.
823-829: Structured tool block LGTM.Matches the forced-tool contract used by
generate_structured.
846-858: Streaming mock wiring is sound.Async context manager behavior is correctly simulated.
i.e. remove instructor dependency
Summary by CodeRabbit
New Features
Bug Fixes
Chores