diff --git a/ai_platform_engineering/multi_agents/platform_engineer/deep_agent.py b/ai_platform_engineering/multi_agents/platform_engineer/deep_agent.py index 1a5185ece..e77cbdc96 100644 --- a/ai_platform_engineering/multi_agents/platform_engineer/deep_agent.py +++ b/ai_platform_engineering/multi_agents/platform_engineer/deep_agent.py @@ -9,15 +9,18 @@ import time import httpx import traceback +import json + from langchain_core.messages import AIMessage +from langchain_core.tools import tool from langgraph.graph.state import CompiledStateGraph from langgraph.checkpoint.memory import InMemorySaver from cnoe_agent_utils import LLMFactory from langchain_mcp_adapters.client import MultiServerMCPClient from typing import Optional, Dict, Any, List - from ai_platform_engineering.multi_agents.platform_engineer import platform_registry +from ai_platform_engineering.multi_agents.platform_engineer.response_format import PlatformEngineerResponse from ai_platform_engineering.multi_agents.platform_engineer.prompts import agent_prompts, generate_system_prompt from ai_platform_engineering.multi_agents.tools import ( reflect_on_output, @@ -48,12 +51,24 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) +# Explicit ResponseFormat tool so Bedrock sees it in tool list +@tool("ResponseFormat", args_schema=PlatformEngineerResponse) +def response_format_tool(**kwargs): + try: + return json.dumps(kwargs) + except Exception: + return str(kwargs) + # RAG Configuration -ENABLE_RAG = os.getenv("ENABLE_RAG", "false").lower() in ("true", "1", "yes") +ENABLE_RAG = os.getenv("ENABLE_RAG", "false").lower() == "true" RAG_SERVER_URL = os.getenv("RAG_SERVER_URL", "http://localhost:9446").strip("/") RAG_CONNECTIVITY_RETRIES = 5 RAG_CONNECTIVITY_WAIT_SECONDS = 10 +# Structured Response Configuration +# When enabled, LLM uses ResponseFormat tool for final answers instead of [FINAL ANSWER] marker +USE_STRUCTURED_RESPONSE = os.getenv("USE_STRUCTURED_RESPONSE", "false").lower() == "true" + class AIPlatformEngineerMAS: def __init__(self): # Use existing platform_registry and enable dynamic monitoring @@ -248,7 +263,7 @@ def _build_graph(self) -> None: logger.error(f"Error during RAG setup: {e}") self.rag_enabled = False - system_prompt = generate_system_prompt(current_agents, self.rag_config) + system_prompt = generate_system_prompt(current_agents, self.rag_config, USE_STRUCTURED_RESPONSE) logger.info(f"📝 Generated system prompt: {len(system_prompt)} chars") @@ -281,6 +296,13 @@ def _build_graph(self) -> None: list_files, # list_files("/tmp/repo", pattern="*.yaml") ] + # Add ResponseFormat tool only when structured response mode is enabled + if USE_STRUCTURED_RESPONSE: + all_tools.append(response_format_tool) + logger.info("✅ Structured response mode enabled - added ResponseFormat tool") + else: + logger.info("❌ Structured response mode disabled - ResponseFormat tool not added and using [FINAL ANSWER] marker in prompt config") + # Add RAG tools if initially loaded if self.rag_tools: all_tools.extend(self.rag_tools) @@ -303,14 +325,36 @@ def _build_graph(self) -> None: logger.info("🎨 Creating deep agent with system prompt") - deep_agent = async_create_deep_agent( - tools=all_tools, # A2A tools + RAG tools + reflect_on_output for validation - instructions=system_prompt, # System prompt enforces TODO-based execution workflow - subagents=subagents, # CustomSubAgents for proper task() delegation - model=base_model, - # response_format=PlatformEngineerResponse # Removed: Causes embedded JSON in streaming output - # Sub-agent DataParts (like Jarvis forms) still work - they're forwarded independently - ) + # Response format instruction tells the LLM how to use the ResponseFormat tool + # Only used when USE_STRUCTURED_RESPONSE is enabled + if USE_STRUCTURED_RESPONSE: + response_format_instruction = ( + "CRITICAL: You MUST call the ResponseFormat tool for EVERY response - including greetings, simple questions, and informational queries. " + "This is NON-NEGOTIABLE. Never output the final answer as plain text. " + "When you are ready to provide ANY answer (simple or complex), call ResponseFormat directly. " + "Do NOT output the answer as text before calling the tool - put it ONLY in the tool's 'content' field. " + "Normal streaming (tool calls, planning, intermediate outputs) is fine, but the FINAL answer must go through ResponseFormat. " + "Place the final user-facing answer (clean markdown, no thinking/preamble) in the 'content' field. " + "Set 'is_task_complete' to true when done, false otherwise. " + "Set 'require_user_input' to true only when you need more information from the user." + ) + else: + # Unstructured mode: rely on [FINAL ANSWER] marker in prompt config + response_format_instruction = None + + # Build deep agent kwargs - only include response_format when structured mode is enabled + deep_agent_kwargs = { + "tools": all_tools, # A2A tools + RAG tools + reflect_on_output for validation + "instructions": system_prompt, # System prompt enforces TODO-based execution workflow + "subagents": subagents, # CustomSubAgents for proper task() delegation + "model": base_model, + } + + # Add response_format only when structured response mode is enabled + if USE_STRUCTURED_RESPONSE and response_format_instruction: + deep_agent_kwargs["response_format"] = (response_format_instruction, PlatformEngineerResponse) + + deep_agent = async_create_deep_agent(**deep_agent_kwargs) # Check if LANGGRAPH_DEV is defined in the environment if os.getenv("LANGGRAPH_DEV"): diff --git a/ai_platform_engineering/multi_agents/platform_engineer/prompts.py b/ai_platform_engineering/multi_agents/platform_engineer/prompts.py index 61d64361b..af8be6ce5 100644 --- a/ai_platform_engineering/multi_agents/platform_engineer/prompts.py +++ b/ai_platform_engineering/multi_agents/platform_engineer/prompts.py @@ -9,6 +9,25 @@ import logging logger = logging.getLogger(__name__) +# [FINAL ANSWER] marker section - only included when structured response is disabled +# This tells the LLM to use the marker-based approach for final answers +FINAL_ANSWER_MARKER_SECTION = """ +**OUTPUT FORMAT - MANDATORY [FINAL ANSWER] Marker:** +- EVERY response to the user MUST start with `[FINAL ANSWER]` on its own line +- This marker separates your internal thinking/planning from the user-facing answer +- Content BEFORE `[FINAL ANSWER]` = hidden (thinking, tool calls, search messages) +- Content AFTER `[FINAL ANSWER]` = shown to user (the actual answer) +- Example format: + ``` + I'll search the knowledge base... + 🔍 search... + [FINAL ANSWER] + ## Your Actual Answer Here + The information you requested is... + ``` +- NEVER include "I'll search...", "Let me...", "🔍 search..." AFTER the marker +""" + # ============================================================================ # Load YAML config def load_prompt_config(path="prompt_config.yaml"): @@ -74,12 +93,14 @@ def load_prompt_config(path="prompt_config.yaml"): # This allows CustomSubAgents to be created with proper react agent graphs # Generate system prompt dynamically based on tools and their tasks -def generate_system_prompt(agents: Dict[str, Any], rag_config: Optional[Dict[str, Any]] = None): +def generate_system_prompt(agents: Dict[str, Any], rag_config: Optional[Dict[str, Any]] = None, use_structured_response: bool = False): """ Generate system prompt with static RAG tools. Args: agents: Dictionary of available agents with their descriptions + rag_config: Optional RAG configuration + use_structured_response: If True, exclude [FINAL ANSWER] marker section (use tool-based structured response instead) Returns: System prompt string @@ -142,10 +163,20 @@ def generate_system_prompt(agents: Dict[str, Any], rag_config: Optional[Dict[str logger.debug(f"System Prompt Template: {yaml_template}") logger.debug(f"Tool Instructions: {tool_instructions_str}") + # Conditionally include [FINAL ANSWER] marker section based on structured response mode + # When structured response is enabled, we use the ResponseFormat tool instead of markers + final_answer_instructions = "" if use_structured_response else FINAL_ANSWER_MARKER_SECTION + + if use_structured_response: + logger.info("Structured response mode enabled - excluding [FINAL ANSWER] marker section from prompt") + else: + logger.info("Unstructured response mode - including [FINAL ANSWER] marker section in prompt") + if yaml_template: return yaml_template.format( rag_instructions=rag_instructions, - tool_instructions=tool_instructions_str + tool_instructions=tool_instructions_str, + final_answer_instructions=final_answer_instructions ) else: return f""" @@ -155,6 +186,7 @@ def generate_system_prompt(agents: Dict[str, Any], rag_config: Optional[Dict[str - Only respond to requests related to the integrated tools. Always call the appropriate agent or tool. - When responding, use markdown format. Make sure all URLs are presented as clickable links. +{final_answer_instructions} {tool_instructions_str} """ diff --git a/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py b/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py index 3bec409ad..1bf3c0225 100644 --- a/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py +++ b/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py @@ -5,6 +5,7 @@ import json import logging import os +import re from collections.abc import AsyncIterable from typing import Any @@ -17,6 +18,7 @@ ) from ai_platform_engineering.multi_agents.platform_engineer.deep_agent import ( AIPlatformEngineerMAS, + USE_STRUCTURED_RESPONSE, ) from ai_platform_engineering.multi_agents.platform_engineer.prompts import ( system_prompt @@ -30,8 +32,9 @@ ) from langchain_core.messages import AIMessage, AIMessageChunk, ToolMessage -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) +_log_level = os.getenv("LOG_LEVEL", "INFO").upper() +logging.basicConfig(level=getattr(logging, _log_level, logging.INFO), format='%(asctime)s - %(levelname)s - %(message)s') + class AIPlatformEngineerA2ABinding: """ @@ -264,6 +267,11 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s accumulated_ai_content = [] final_ai_message = None + # Track ResponseFormat tool call for structured response mode + response_format_content = None + response_format_args = None # Complete tool args when captured + response_format_streaming = False # True when we're streaming ResponseFormat args + # Track sub-agent responses for fallback if synthesis fails # Format: {tool_name: response_content} accumulated_subagent_responses = {} @@ -331,10 +339,40 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s if has_tool_calls: logging.debug(f"Message with tool_calls detected: type={type(message).__name__}, tool_calls={message.tool_calls}") + # BEDROCK STREAMING: Accumulate tool args from tool_call_chunks + # Bedrock streams tool args as partial JSON strings in tool_call_chunks[].args + if hasattr(message, "tool_call_chunks") and message.tool_call_chunks: + for chunk in message.tool_call_chunks: + chunk_name = chunk.get("name", "") + chunk_args = chunk.get("args", "") + + # If this chunk has the tool name, track that we're in a ResponseFormat call + if chunk_name and chunk_name.lower() in ('responseformat', 'platformengineerresponse'): + response_format_streaming = True + if response_format_args is None: + response_format_args = {"_partial_json": ""} + logging.info(f"🎯 BEDROCK: ResponseFormat tool streaming started") + + # Accumulate args string (Bedrock streams JSON incrementally) + # Once we're streaming ResponseFormat, accumulate all args chunks + if chunk_args and response_format_streaming: + if response_format_args is None: + response_format_args = {"_partial_json": ""} + if "_partial_json" not in response_format_args: + response_format_args["_partial_json"] = "" + response_format_args["_partial_json"] += chunk_args + # Stream LLM tokens (includes execution plans and responses) if isinstance(message, AIMessageChunk): + # BEDROCK DEBUG: Check additional_kwargs for tool use data + if hasattr(message, "additional_kwargs") and message.additional_kwargs: + add_kwargs = message.additional_kwargs + if "tool_use" in add_kwargs or "toolUse" in add_kwargs: + tool_use_data = add_kwargs.get("tool_use") or add_kwargs.get("toolUse") + logging.info(f"🔍 BEDROCK: Found tool_use in additional_kwargs: {str(tool_use_data)[:500]}") + # Check if this chunk has tool_calls (tool invocation) - if hasattr(message, "tool_calls") and message.tool_calls: + if has_tool_calls: # This is a tool call chunk - emit tool start notifications for tool_call in message.tool_calls: tool_name = tool_call.get("name", "") @@ -345,19 +383,79 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s logging.debug(f"Tool call started (from AIMessageChunk): {tool_name}") + # Agent returned the final structured response + if tool_name.lower() in ('responseformat', 'platformengineerresponse'): + tool_args = tool_call.get("args", {}) + + # CRITICAL: Always accumulate/update tool args as they stream in + # Bedrock streams tool args incrementally, so we keep the latest version + if tool_args: + # Merge new args into existing (in case they're incremental) + if response_format_args is None: + response_format_args = {} + response_format_args.update(tool_args) + logging.info(f"📝 AIMessageChunk: Updated ResponseFormat args, keys={list(response_format_args.keys())}") + + # Extract 'content' field which contains the actual response + structured_content = tool_args.get("content", "") or tool_args.get("message", "") or tool_args.get("response", "") + if structured_content: + response_format_content = structured_content + logging.info(f"📝 SUPERVISOR AIMessageChunk: Captured ResponseFormat content: {len(response_format_content)} chars") + + # When structured response mode is enabled, yield completion event directly + if USE_STRUCTURED_RESPONSE: + logging.info("🎯 Structured response mode: yielding completion from AIMessageChunk ResponseFormat tool") + yield { + "is_task_complete": tool_args.get("is_task_complete", True), + "require_user_input": tool_args.get("require_user_input", False), + "content": structured_content, + "metadata": tool_args.get("metadata"), + "from_response_format_tool": True + } + continue # Skip tool notification, already handled + else: + # Args are empty or content not yet available - in structured response mode, + # skip and wait for complete args (they'll be captured in response_format_args) + if USE_STRUCTURED_RESPONSE: + logging.debug(f"📝 Structured response mode: ResponseFormat content empty, waiting (accumulated args keys: {list(response_format_args.keys()) if response_format_args else 'none'})") + continue # Skip notification, wait for complete tool call + # Stream tool start notification to client with metadata - tool_name_formatted = tool_name.title() - yield { - "is_task_complete": False, - "require_user_input": False, - "content": f"🔧 Supervisor: Calling Agent {tool_name_formatted}...\n", - "tool_call": { - "name": tool_name, - "status": "started", - "type": "notification" + # But ONLY if we haven't already yielded the completion + if not (USE_STRUCTURED_RESPONSE and response_format_content): + tool_name_formatted = tool_name.title() + yield { + "is_task_complete": False, + "require_user_input": False, + "content": f"🔧 Supervisor: Calling Agent {tool_name_formatted}...\n", + "tool_call": { + "name": tool_name, + "status": "started", + "type": "notification" + } } - } - # Don't process content for tool call chunks + + # CRITICAL: Before skipping content processing, check for tool_use blocks in content + # Bedrock puts complete tool args in content[].input, not in tool_calls[].args during streaming + msg_content = message.content + if isinstance(msg_content, list): + for item in msg_content: + if isinstance(item, dict) and item.get('type') == 'tool_use': + tool_name = item.get('name', '') + tool_input = item.get('input', {}) + if tool_name.lower() in ('responseformat', 'platformengineerresponse') and tool_input: + logging.info(f"🎯 AIMessageChunk: Found tool_use in content! tool={tool_name}, input_keys={list(tool_input.keys())}") + response_format_args = tool_input + response_format_content = tool_input.get("content", "") or tool_input.get("message", "") + if response_format_content and USE_STRUCTURED_RESPONSE: + logging.info(f"🎯 AIMessageChunk: Yielding completion from tool_use block ({len(response_format_content)} chars)") + yield { + "is_task_complete": tool_input.get("is_task_complete", True), + "require_user_input": tool_input.get("require_user_input", False), + "content": response_format_content, + "metadata": tool_input.get("metadata"), + "from_response_format_tool": True + } continue content = message.content @@ -366,7 +464,28 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s text_parts = [] for item in content: if isinstance(item, dict): - text_parts.append(item.get('text', '')) + # CRITICAL: Check for tool_use blocks in content (Bedrock format) + # This is where Bedrock puts the complete tool args! + item_type = item.get('type', '') + if item_type == 'tool_use': + tool_name = item.get('name', '') + tool_input = item.get('input', {}) + if tool_name.lower() in ('responseformat', 'platformengineerresponse') and tool_input: + logging.info(f"🎯 BEDROCK: Found tool_use in content! tool={tool_name}, input_keys={list(tool_input.keys())}") + response_format_args = tool_input + response_format_content = tool_input.get("content", "") or tool_input.get("message", "") + if response_format_content and USE_STRUCTURED_RESPONSE: + logging.info(f"🎯 BEDROCK: Yielding completion from content tool_use block ({len(response_format_content)} chars)") + yield { + "is_task_complete": tool_input.get("is_task_complete", True), + "require_user_input": tool_input.get("require_user_input", False), + "content": response_format_content, + "metadata": tool_input.get("metadata"), + "from_response_format_tool": True + } + continue # Skip normal content processing + else: + text_parts.append(item.get('text', '')) elif isinstance(item, str): text_parts.append(item) else: @@ -410,7 +529,7 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s } # Handle AIMessage with tool calls (tool start indicators) - elif isinstance(message, AIMessage) and hasattr(message, "tool_calls") and message.tool_calls: + elif isinstance(message, AIMessage) and has_tool_calls: for tool_call in message.tool_calls: tool_name = tool_call.get("name", "") tool_call_id = tool_call.get("id", "") @@ -427,6 +546,50 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s logging.info(f"Tool call started: {tool_name}") + # CRITICAL: Capture ResponseFormat content from AIMessage + # This is the DETERMINISTIC way to get the final response + # ResponseFormat tool contains the structured final output + # Note: Tool is defined as @tool("ResponseFormat") but Bedrock returns the schema name "PlatformEngineerResponse" + if tool_name.lower() in ('responseformat', 'platformengineerresponse'): + tool_args = tool_call.get("args", {}) + logging.info(f"🎯 AIMessage ResponseFormat detected! tool_name={tool_name}, args_keys={list(tool_args.keys()) if tool_args else 'empty'}") + # Extract 'content' or 'message' field which contains the actual response + structured_content = tool_args.get("content", "") or tool_args.get("message", "") or tool_args.get("response", "") + if structured_content: + response_format_content = structured_content + response_format_args = tool_args # Save complete args for final yield + logging.info(f"🎯 AIMessage ResponseFormat: Captured content ({len(response_format_content)} chars)") + logging.info(f"🎯 AIMessage ResponseFormat content preview: {response_format_content[:300]}") + + # When structured response mode is enabled, yield completion event directly + if USE_STRUCTURED_RESPONSE: + logging.info("🎯 Structured response mode: yielding completion from AIMessage ResponseFormat tool") + yield { + "is_task_complete": tool_args.get("is_task_complete", True), + "require_user_input": tool_args.get("require_user_input", False), + "content": structured_content, + "metadata": tool_args.get("metadata"), + "from_response_format_tool": True + } + continue # Skip tool notification, already handled + else: + # Fallback: try to get any string value from args + import json + for key, val in tool_args.items(): + if isinstance(val, str) and len(val) > 10: + response_format_content = val + response_format_args = tool_args # Save args + logging.info(f"📝 SUPERVISOR: Captured ResponseFormat '{key}' field: {len(response_format_content)} chars") + break + if not response_format_content and tool_args: + try: + response_format_content = json.dumps(tool_args) + response_format_args = tool_args # Save args + logging.info(f"📝 SUPERVISOR: Captured ResponseFormat args as JSON: {len(response_format_content)} chars") + except Exception: + response_format_content = str(tool_args) + response_format_args = tool_args # Save args + # Stream tool start notification to client with metadata tool_name_formatted = tool_name.title() yield { @@ -466,7 +629,7 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s pending_tool_calls.pop(tool_call_id) logging.debug(f"Resolved tool call: {tool_call_id} -> {tool_name}") - logging.debug(f"Tool call completed: {tool_name} (content: {len(tool_content)} chars)") + logging.info(f"SUNNY BUG: ToolMessage received! tool_name={tool_name}, content_len={len(tool_content)}") # Track sub-agent responses for fallback if synthesis fails # Only track significant responses (sub-agent tools like 'task', agent names) @@ -485,6 +648,38 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s 'graph_raw_query_data', 'graph_raw_query_ontology' } + # CRITICAL: Handle ResponseFormat tool in structured response mode + # The tool returns JSON with the structured response fields + if USE_STRUCTURED_RESPONSE and tool_name.lower() in ('responseformat', 'platformengineerresponse'): + logging.info(f"SUNNY BUG: ToolMessage for ResponseFormat detected! tool_name={tool_name}, tool_content_len={len(tool_content)}") + logging.info(f"SUNNY BUG: tool_content preview: {tool_content[:500] if tool_content else 'EMPTY'}") + try: + # Parse the JSON result from the tool + tool_result = json.loads(tool_content) if tool_content else {} + logging.info(f"SUNNY BUG: Parsed tool_result keys: {list(tool_result.keys())}") + structured_content = tool_result.get("content", "") or tool_result.get("message", "") or tool_result.get("response", "") + logging.info(f"SUNNY BUG: structured_content len={len(structured_content) if structured_content else 0}") + if structured_content: + # Save for final yield fallback + response_format_args = tool_result + response_format_content = structured_content + is_task_complete_val = tool_result.get("is_task_complete", True) + require_user_input_val = tool_result.get("require_user_input", False) + logging.info(f"SUNNY BUG: Yielding completion! is_task_complete={is_task_complete_val}, require_user_input={require_user_input_val}") + yield { + "is_task_complete": is_task_complete_val, + "require_user_input": require_user_input_val, + "content": structured_content, + "metadata": tool_result.get("metadata"), + "from_response_format_tool": True + } + continue # Skip normal tool completion handling + else: + logging.warning(f"SUNNY BUG: ResponseFormat tool result has no content: {tool_result}") + except json.JSONDecodeError as e: + logging.warning(f"SUNNY BUG: Failed to parse ResponseFormat result as JSON: {e}, content was: {tool_content[:200] if tool_content else 'EMPTY'}") + # Fall through to normal handling + # Special handling for write_todos: execution plan vs status updates if tool_name == "write_todos" and tool_content and tool_content.strip(): if not self._execution_plan_sent: @@ -889,12 +1084,29 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s and getattr(message, "tool_calls", None) and len(message.tool_calls) > 0 ): - logging.debug("Detected AIMessage with tool calls, yielding") - yield { - "is_task_complete": False, - "require_user_input": False, - "content": "", - } + # Check for ResponseFormat tool in fallback stream + for tool_call in message.tool_calls: + tool_name = tool_call.get("name", "") + if tool_name.lower() in ('responseformat', 'platformengineerresponse'): + tool_args = tool_call.get("args", {}) + structured_content = tool_args.get("content", "") or tool_args.get("message", "") or tool_args.get("response", "") + if structured_content and USE_STRUCTURED_RESPONSE: + logging.info("🎯 Fallback stream: Structured response mode - yielding completion from ResponseFormat tool") + yield { + "is_task_complete": tool_args.get("is_task_complete", True), + "require_user_input": tool_args.get("require_user_input", False), + "content": structured_content, + "metadata": tool_args.get("metadata"), + "from_response_format_tool": True + } + break + else: + logging.debug("Detected AIMessage with tool calls, yielding") + yield { + "is_task_complete": False, + "require_user_input": False, + "content": "", + } elif isinstance(message, ToolMessage): # Stream ToolMessage content (includes formatted TODO lists) tool_content = message.content if hasattr(message, 'content') else "" @@ -955,10 +1167,39 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s final_ai_message = message # After EITHER primary or fallback streaming completes, parse the final response to extract is_task_complete - logging.info(f"🔍 POST-STREAM PARSING: final_ai_message={final_ai_message is not None}, accumulated_chunks={len(accumulated_ai_content)}") - - # Try to use final_ai_message first, otherwise use accumulated content - if final_ai_message: + logging.info(f"🔍 POST-STREAM PARSING: final_ai_message={final_ai_message is not None}, accumulated_chunks={len(accumulated_ai_content)}, response_format_args={response_format_args is not None}") + + # PRIORITY 1: If we captured ResponseFormat tool args during streaming, use them directly + # This is the most reliable way to get structured response in structured response mode + if USE_STRUCTURED_RESPONSE and response_format_args: + logging.info(f"🎯 POST-STREAM: Using captured ResponseFormat tool args for completion, keys={list(response_format_args.keys())}") + + # Handle partial JSON that was accumulated from tool_call_chunks (Bedrock streaming) + if "_partial_json" in response_format_args and response_format_args["_partial_json"]: + partial_str = response_format_args["_partial_json"] + logging.info(f"🎯 POST-STREAM: Parsing accumulated tool_call_chunks JSON ({len(partial_str)} chars)") + logging.debug(f"🎯 POST-STREAM: Partial JSON preview: {partial_str[:500]}...") + try: + import json + parsed = json.loads(partial_str) + if isinstance(parsed, dict): + response_format_args.update(parsed) + del response_format_args["_partial_json"] # Clean up + logging.info(f"🎯 POST-STREAM: Parsed partial JSON successfully! keys={list(response_format_args.keys())}") + except json.JSONDecodeError as e: + logging.warning(f"🎯 POST-STREAM: Failed to parse partial JSON: {e}, content: {partial_str[:200]}...") + + structured_content = response_format_args.get("content", "") or response_format_args.get("message", "") or response_format_args.get("response", "") + final_response = { + 'is_task_complete': response_format_args.get("is_task_complete", True), + 'require_user_input': response_format_args.get("require_user_input", False), + 'content': structured_content, + 'metadata': response_format_args.get("metadata"), + 'from_response_format_tool': True + } + logging.info(f"🎯 POST-STREAM: ResponseFormat response: is_task_complete={final_response['is_task_complete']}, content_len={len(structured_content) if structured_content else 0}") + # PRIORITY 2: Try to use final_ai_message first, otherwise use accumulated content + elif final_ai_message: logging.info("✅ Using final AIMessage for structured response parsing") # Extract content from AIMessage final_content = final_ai_message.content if hasattr(final_ai_message, 'content') else str(final_ai_message) @@ -1003,9 +1244,17 @@ async def stream(self, query, context_id, trace_id=None) -> AsyncIterable[dict[s # Sending it again in the final response would cause duplication. # # Solution: Clear 'content' from final_response when in streaming mode. + # EXCEPTION 1: If content came from ResponseFormat tool, it's the REAL structured content + # EXCEPTION 2: If is_task_complete=True (from [FINAL ANSWER] marker), the content is the final answer + # In both cases, should NOT be cleared (the accumulated chunks were just the LLM "thinking" text) if accumulated_ai_content and len(accumulated_ai_content) > 1: - logging.info(f"⏭️ Clearing content from final response - already streamed {len(accumulated_ai_content)} chunks") - final_response['content'] = '' + if final_response.get('from_response_format_tool'): + logging.info(f"✅ Keeping content from ResponseFormat tool (not clearing despite {len(accumulated_ai_content)} streamed chunks)") + elif final_response.get('is_task_complete'): + logging.info(f"✅ Keeping content - task is complete with [FINAL ANSWER] (not clearing despite {len(accumulated_ai_content)} streamed chunks)") + else: + logging.info(f"⏭️ Clearing content from final response - already streamed {len(accumulated_ai_content)} chunks") + final_response['content'] = '' logging.info(f"🚀 YIELDING FINAL RESPONSE: is_task_complete={final_response.get('is_task_complete')}, require_user_input={final_response.get('require_user_input')}, content_length={len(final_response.get('content', ''))}") yield final_response @@ -1085,7 +1334,19 @@ def handle_structured_response(self, ai_message): content = ai_message if isinstance(ai_message, str) else str(ai_message) # Log the raw content for debugging - logging.info(f"Raw LLM content (fallback handling): {repr(content)}") + + # CRITICAL: Check for [FINAL ANSWER] marker (used when USE_STRUCTURED_RESPONSE=false) + # This marker indicates the task is complete and the content after it is the final answer + # final_answer_marker = "[FINAL ANSWER]" + # if final_answer_marker in content: + # marker_pos = content.find(final_answer_marker) + # final_content = content[marker_pos + len(final_answer_marker):].strip() + # logging.info(f"✅ Found [FINAL ANSWER] marker at position {marker_pos}, extracted {len(final_content)} chars") + # return { + # 'is_task_complete': True, + # 'require_user_input': False, + # 'content': final_content, + # } # Strip markdown code block formatting if present if content.startswith('```json') and content.endswith('```'): @@ -1095,7 +1356,6 @@ def handle_structured_response(self, ai_message): content = content[3:-3].strip() # Remove ``` at start and end logging.info("Stripped ``` formatting") - logging.info(f"Content after stripping: {repr(content)}") # If content doesn't look like JSON, treat it as a working text update if not (content.startswith('{') or content.startswith('[')): diff --git a/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent_executor.py b/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent_executor.py index 00b69c224..12e18cc29 100644 --- a/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent_executor.py +++ b/ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent_executor.py @@ -353,11 +353,17 @@ async def _handle_sub_agent_artifact(self, event: dict, state: StreamState, async def _handle_task_complete(self, event: dict, state: StreamState, content: str, task: A2ATask, event_queue: EventQueue): """Handle task completion event.""" - final_content, is_datapart = self._get_final_content(state) - - # Fall back to event content if nothing accumulated - if not final_content and not is_datapart: + # If event came from ResponseFormat tool (structured response mode), use content directly since it's already clean + if event.get('from_response_format_tool'): + logger.info("Using content directly from ResponseFormat tool (structured response mode)") final_content = content + is_datapart = False + else: + final_content, is_datapart = self._get_final_content(state) + + # Fall back to event content if nothing accumulated + if not final_content and not is_datapart: + final_content = content # Create appropriate artifact if is_datapart: diff --git a/charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml b/charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml index 7f6b7d4cd..caaced6fa 100644 --- a/charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml +++ b/charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml @@ -37,20 +37,7 @@ system_prompt_template: | - NEVER reveal workspace operations (clear_workspace, write_workspace_file, list_workspace_files) or their results - Only present final synthesized results - never show intermediate workspace state or tool internals - **OUTPUT FORMAT - MANDATORY [FINAL ANSWER] Marker:** - - EVERY response to the user MUST start with `[FINAL ANSWER]` on its own line - - This marker separates your internal thinking/planning from the user-facing answer - - Content BEFORE `[FINAL ANSWER]` = hidden (thinking, tool calls, search messages) - - Content AFTER `[FINAL ANSWER]` = shown to user (the actual answer) - - Example format: - ``` - I'll search the knowledge base... - 🔍 search... - [FINAL ANSWER] - ## Your Actual Answer Here - The information you requested is... - ``` - - NEVER include "I'll search...", "Let me...", "🔍 search..." AFTER the marker + {final_answer_instructions} ## AVAILABLE TOOLS @@ -207,7 +194,6 @@ system_prompt_template: | - If some agents failed: still synthesize successful results and clearly mark failures with ❌ - Add source footer: `_Sources: Agent1, Agent2_` (only include successful agents) - Mark final TODO as completed - - **MANDATORY - [FINAL ANSWER] Marker**: Start your final synthesized response with `[FINAL ANSWER]` on its own line. This marker separates thinking/planning from the user-facing answer. Everything AFTER this marker is shown to the user; everything BEFORE is hidden. **CRITICAL - Report Generation**: When user requests "generate report" or similar, DO NOT truncate - include ALL data, details, tables, and information. Reports must be complete and comprehensive.