diff --git a/src/backend/af/magentic_agents/common/lifecycle.py b/src/backend/af/magentic_agents/common/lifecycle.py index fb23f3fb..00648206 100644 --- a/src/backend/af/magentic_agents/common/lifecycle.py +++ b/src/backend/af/magentic_agents/common/lifecycle.py @@ -21,7 +21,7 @@ ToolMode, ToolProtocol, ) -from agent_framework import HostedMCPTool +from agent_framework import MCPStreamableHTTPTool from af.magentic_agents.models.agent_models import MCPConfig from af.config.agent_registry import agent_registry @@ -86,17 +86,18 @@ async def _after_open(self) -> None: """Subclasses must build self._agent here.""" raise NotImplementedError - def _prepare_mcp_tool(self) -> None: + async def _prepare_mcp_tool(self) -> None: """Translate MCPConfig to a HostedMCPTool (agent_framework construct).""" if not self.mcp_cfg: return try: - self.mcp_tool = HostedMCPTool( + mcp_tool = MCPStreamableHTTPTool( name=self.mcp_cfg.name, description=self.mcp_cfg.description, - server_label=self.mcp_cfg.name.replace(" ", "_"), - url="", # URL will be resolved via MCPConfig in HostedMCPTool + url=self.mcp_cfg.url ) + await self._stack.enter_async_context(mcp_tool) + self.mcp_tool = mcp_tool # Store for later use except Exception: # noqa: BLE001 self.mcp_tool = None @@ -145,7 +146,7 @@ async def open(self) -> "AzureAgentBase": await self._stack.enter_async_context(self.client) # Prepare MCP - self._prepare_mcp_tool() + await self._prepare_mcp_tool() # Let subclass build agent client await self._after_open() diff --git a/src/backend/af/magentic_agents/foundry_agent.py b/src/backend/af/magentic_agents/foundry_agent.py index 6cde844e..a8d70bd8 100644 --- a/src/backend/af/magentic_agents/foundry_agent.py +++ b/src/backend/af/magentic_agents/foundry_agent.py @@ -3,33 +3,18 @@ import logging from typing import List, Optional -from azure.ai.agents.models import ( - Agent, - AzureAISearchTool, - CodeInterpreterToolDefinition, -) - from agent_framework import ( + ChatAgent, ChatMessage, Role, - ChatOptions, - HostedMCPTool, - AggregateContextProvider, - ChatAgent, - ChatClientProtocol, - ChatMessageStoreProtocol, - ContextProvider, - Middleware, - ToolMode, - ToolProtocol, + HostedFileSearchTool, + HostedVectorStoreContent, + HostedCodeInterpreterTool, ) from af.magentic_agents.common.lifecycle import AzureAgentBase from af.magentic_agents.models.agent_models import MCPConfig, SearchConfig from af.config.agent_registry import agent_registry -# Broad exception flag -# pylint: disable=w0718 - class FoundryAgentTemplate(AzureAgentBase): """Agent that uses Azure AI Search (RAG) and optional MCP tool via agent_framework.""" @@ -44,253 +29,106 @@ def __init__( mcp_config: MCPConfig | None = None, search_config: SearchConfig | None = None, ) -> None: - super().__init__(mcp=mcp_config) + super().__init__(mcp=mcp_config, model_deployment_name=model_deployment_name) self.agent_name = agent_name self.agent_description = agent_description self.agent_instructions = agent_instructions - self.model_deployment_name = model_deployment_name self.enable_code_interpreter = enable_code_interpreter - self.mcp = mcp_config self.search = search_config - - self._search_connection = None self.logger = logging.getLogger(__name__) - if self.model_deployment_name in {"o3", "o4-mini"}: - raise ValueError( - "Foundry agents do not support reasoning models in this implementation." - ) - # ------------------------- # Tool construction helpers # ------------------------- - async def _make_azure_search_tool(self) -> Optional[AzureAISearchTool]: - """Create Azure AI Search tool (RAG capability).""" - if not ( - self.client - and self.search - and self.search.connection_name - and self.search.index_name - ): - self.logger.info( - "Azure AI Search tool not enabled (missing config or client)." - ) + async def _make_file_search_tool(self) -> Optional[HostedFileSearchTool]: + """Create File Search tool (RAG capability) using vector stores.""" + if not self.search or not self.search.vector_store_id: + self.logger.info("File search tool not enabled (missing vector_store_id).") return None try: - self._search_connection = await self.client.connections.get( - name=self.search.connection_name - ) - self.logger.info( - "Found Azure AI Search connection: %s", self._search_connection.id - ) - - return AzureAISearchTool( - index_connection_id=self._search_connection.id, - index_name=self.search.index_name, + # HostedFileSearchTool uses vector stores, not direct Azure AI Search indexes + file_search_tool = HostedFileSearchTool( + inputs=[HostedVectorStoreContent(vector_store_id=self.search.vector_store_id)], + max_results=self.search.max_results if hasattr(self.search, 'max_results') else None, + description="Search through indexed documents" ) + self.logger.info("Created HostedFileSearchTool with vector store: %s", self.search.vector_store_id) + return file_search_tool except Exception as ex: - self.logger.error( - "Azure AI Search tool creation failed: %s | connection=%s | index=%s", - ex, - getattr(self.search, "connection_name", None), - getattr(self.search, "index_name", None), - ) + self.logger.error("File search tool creation failed: %s", ex) return None - async def _collect_tools_and_resources(self) -> tuple[List, dict]: - """Collect tool definitions + tool_resources for agent definition creation.""" + async def _collect_tools(self) -> List: + """Collect tool definitions for ChatAgent.""" tools: List = [] - tool_resources: dict = {} - # Search tool - if self.search and self.search.connection_name and self.search.index_name: - search_tool = await self._make_azure_search_tool() + # File Search tool (RAG) + if self.search: + search_tool = await self._make_file_search_tool() if search_tool: - tools.extend(search_tool.definitions) - tool_resources = search_tool.resources - self.logger.info( - "Added %d Azure AI Search tool definitions.", - len(search_tool.definitions), - ) - else: - self.logger.warning("Azure AI Search tool not configured properly.") + tools.append(search_tool) + self.logger.info("Added File Search tool.") # Code Interpreter if self.enable_code_interpreter: try: - tools.append(CodeInterpreterToolDefinition()) - self.logger.info("Added Code Interpreter tool definition.") - except ImportError as ie: - self.logger.error("Code Interpreter dependency missing: %s", ie) + code_tool = HostedCodeInterpreterTool() + tools.append(code_tool) + self.logger.info("Added Code Interpreter tool.") + except Exception as ie: + self.logger.error("Code Interpreter tool creation failed: %s", ie) + + # MCP Tool (from base class) + if self.mcp_tool: + tools.append(self.mcp_tool) + self.logger.info("Added MCP tool: %s", self.mcp_tool.name) - self.logger.info("Total tool definitions collected: %d", len(tools)) - return tools, tool_resources + self.logger.info("Total tools collected: %d", len(tools)) + return tools # ------------------------- # Agent lifecycle override # ------------------------- async def _after_open(self) -> None: - # Instantiate persistent AzureAIAgentClient bound to existing agent_id + """Initialize ChatAgent after connections are established.""" try: - # AzureAIAgentClient( - # project_client=self.client, - # agent_id=str(definition.id), - # agent_name=self.agent_name, - # ) - tools, tool_resources = await self._collect_tools_and_resources() + tools = await self._collect_tools() + self._agent = ChatAgent( chat_client=self.client, - instructions=self.agent_description + " " + self.agent_instructions, + instructions=self.agent_instructions, name=self.agent_name, description=self.agent_description, tools=tools if tools else None, tool_choice="auto" if tools else "none", - allow_multiple_tool_calls=True, temperature=0.7, + model_id=self.model_deployment_name, ) - + + self.logger.info("Initialized ChatAgent '%s'", self.agent_name) except Exception as ex: - self.logger.error("Failed to initialize AzureAIAgentClient: %s", ex) + self.logger.error("Failed to initialize ChatAgent: %s", ex) raise # Register agent globally try: agent_registry.register_agent(self) - self.logger.info( - "Registered agent '%s' in global registry.", self.agent_name - ) + self.logger.info("Registered agent '%s' in global registry.", self.agent_name) except Exception as reg_ex: - self.logger.warning( - "Could not register agent '%s': %s", self.agent_name, reg_ex - ) - - # ------------------------- - # Definition compatibility - # ------------------------- - async def _check_connection_compatibility(self, existing_definition: Agent) -> bool: - """Verify existing Azure AI Search connection matches current config.""" - try: - if not (self.search and self.search.connection_name): - self.logger.info("No search config provided; assuming compatibility.") - return True - - tool_resources = getattr(existing_definition, "tool_resources", None) - if not tool_resources: - self.logger.info( - "Existing agent has no tool resources; incompatible with search requirement." - ) - return False - - azure_search = tool_resources.get("azure_ai_search", {}) - indexes = azure_search.get("indexes", []) - if not indexes: - self.logger.info( - "Existing agent has no Azure AI Search indexes; incompatible." - ) - return False - - existing_conn_id = indexes[0].get("index_connection_id") - if not existing_conn_id: - self.logger.info( - "Existing agent missing index_connection_id; incompatible." - ) - return False - - current_connection = await self.client.connections.get( - name=self.search.connection_name - ) - same = existing_conn_id == current_connection.id - if same: - self.logger.info("Search connection compatible: %s", existing_conn_id) - else: - self.logger.info( - "Search connection mismatch: existing=%s current=%s", - existing_conn_id, - current_connection.id, - ) - return same - except Exception as ex: - self.logger.error("Error during connection compatibility check: %s", ex) - return False - - async def _get_azure_ai_agent_definition(self, agent_name: str) -> Agent | None: - """Return existing agent definition by name or None.""" - try: - async for agent in self.client.agents.list_agents(): - if agent.name == agent_name: - self.logger.info( - "Found existing agent '%s' (id=%s).", agent_name, agent.id - ) - return await self.client.agents.get_agent(agent.id) - return None - except Exception as e: - if "ResourceNotFound" in str(e) or "404" in str(e): - self.logger.info("Agent '%s' not found; will create new.", agent_name) - else: - self.logger.warning( - "Unexpected error listing agent '%s': %s; will attempt creation.", - agent_name, - e, - ) - return None - - # ------------------------- - # Diagnostics helper - # ------------------------- - async def fetch_run_details(self, thread_id: str, run_id: str) -> None: - """Log run diagnostics for a failed run.""" - try: - run = await self.client.agents.runs.get(thread=thread_id, run=run_id) - self.logger.error( - "Run failure | status=%s | id=%s | last_error=%s | usage=%s", - getattr(run, "status", None), - run_id, - getattr(run, "last_error", None), - getattr(run, "usage", None), - ) - except Exception as ex: - self.logger.error( - "Failed fetching run details (thread=%s run=%s): %s", - thread_id, - run_id, - ex, - ) + self.logger.warning("Could not register agent '%s': %s", self.agent_name, reg_ex) # ------------------------- # Invocation (streaming) # ------------------------- async def invoke(self, prompt: str): - """ - Stream model output for a prompt. - - Yields ChatResponseUpdate objects: - - update.text for incremental text - - update.contents for tool calls / usage events - """ + """Stream model output for a prompt.""" if not self._agent: - raise RuntimeError("Agent client not initialized; call open() first.") + raise RuntimeError("Agent not initialized; call open() first.") messages = [ChatMessage(role=Role.USER, text=prompt)] - - tools = [] - # Use mcp_tool prepared in AzureAgentBase - if self.mcp_tool and isinstance(self.mcp_tool, HostedMCPTool): - tools.append(self.mcp_tool) - - chat_options = ChatOptions( - model_id=self.model_deployment_name, - tools=tools if tools else None, - tool_choice="auto" if tools else "none", - allow_multiple_tool_calls=True, - temperature=0.7, - ) - - async for update in self._agent.run_stream( - messages=messages, - # chat_options=chat_options, - # instructions=self.agent_instructions, - ): + + async for update in self._agent.run_stream(messages=messages): yield update @@ -305,7 +143,7 @@ async def create_foundry_agent( mcp_config: MCPConfig | None, search_config: SearchConfig | None, ) -> FoundryAgentTemplate: - """Factory to create and open a FoundryAgentTemplate (agent_framework version).""" + """Factory to create and open a FoundryAgentTemplate.""" agent = FoundryAgentTemplate( agent_name=agent_name, agent_description=agent_description, @@ -316,4 +154,4 @@ async def create_foundry_agent( search_config=search_config, ) await agent.open() - return agent + return agent \ No newline at end of file diff --git a/src/backend/af/magentic_agents/magentic_agent_factory.py b/src/backend/af/magentic_agents/magentic_agent_factory.py index c74aef8c..d6a36ee4 100644 --- a/src/backend/af/magentic_agents/magentic_agent_factory.py +++ b/src/backend/af/magentic_agents/magentic_agent_factory.py @@ -120,7 +120,6 @@ async def create_agent_from_config(self, user_id: str, agent_obj: SimpleNamespac model_deployment_name=deployment_name, enable_code_interpreter=getattr(agent_obj, "coding_tools", False), mcp_config=mcp_config, - # bing_config=bing_config, search_config=search_config, ) diff --git a/src/backend/af/magentic_agents/reasoning_agent.py b/src/backend/af/magentic_agents/reasoning_agent.py index a2f97c78..64d7275e 100644 --- a/src/backend/af/magentic_agents/reasoning_agent.py +++ b/src/backend/af/magentic_agents/reasoning_agent.py @@ -1,70 +1,34 @@ -import asyncio +"""Reasoning agent using agent_framework with direct Azure AI Search integration.""" + import logging -import uuid -from dataclasses import dataclass -from typing import AsyncIterator, List, Optional -# from agent_framework.azure import AzureAIAgentClient -from agent_framework_azure_ai import AzureAIAgentClient +import os +from typing import Optional + from agent_framework import ( ChatMessage, - ChatOptions, - ChatResponseUpdate, - HostedMCPTool, Role, - AggregateContextProvider, - ChatAgent, - ChatClientProtocol, - ChatMessageStoreProtocol, - ContextProvider, - Middleware, - ToolMode, - ToolProtocol, ) +from agent_framework_azure_ai import AzureAIAgentClient from azure.identity.aio import DefaultAzureCredential -from azure.ai.projects.aio import AIProjectClient -from azure.search.documents import SearchClient -from azure.core.credentials import AzureKeyCredential +from af.magentic_agents.common.lifecycle import MCPEnabledBase from af.magentic_agents.models.agent_models import MCPConfig, SearchConfig +from af.magentic_agents.reasoning_search import ReasoningSearch, create_reasoning_search from af.config.agent_registry import agent_registry logger = logging.getLogger(__name__) -# ----------------------------- -# Lightweight search helper -# ----------------------------- -@dataclass -class _SearchContext: - client: SearchClient - top_k: int - - def fetch(self, query: str) -> List[str]: - docs: List[str] = [] - try: - results = self.client.search( - search_text=query, - query_type="simple", - select=["content"], - top=self.top_k, - ) - for r in results: - try: - docs.append(str(r["content"])) - except Exception: # noqa: BLE001 - continue - except Exception as ex: # noqa: BLE001 - logger.debug("Search fetch error: %s", ex) - return docs - - -class ReasoningAgentTemplate: +class ReasoningAgentTemplate(MCPEnabledBase): """ - agent_framework-based reasoning agent (replaces SK ChatCompletionAgent). - Class name preserved for backward compatibility. - - + Reasoning agent using agent_framework with direct Azure AI Search integration. + + This agent: + - Uses reasoning models (o1, o3-mini, etc.) + - Augments prompts with search results from Azure AI Search + - Supports optional MCP tools + - Does NOT use Azure AI Agent service (direct client connection) """ def __init__( @@ -73,192 +37,344 @@ def __init__( agent_description: str, agent_instructions: str, model_deployment_name: str, - azure_openai_endpoint: str, # kept name for compatibility; now Azure AI Project endpoint + azure_ai_project_endpoint: str, search_config: SearchConfig | None = None, mcp_config: MCPConfig | None = None, max_search_docs: int = 3, ) -> None: + """Initialize reasoning agent. + + Args: + agent_name: Name of the agent + agent_description: Description of the agent's purpose + agent_instructions: System instructions for the agent + model_deployment_name: Reasoning model deployment (e.g., "o1", "o3-mini") + azure_ai_project_endpoint: Azure AI Project endpoint URL + search_config: Optional search configuration for Azure AI Search + mcp_config: Optional MCP server configuration + max_search_docs: Maximum number of search documents to retrieve + """ + super().__init__(mcp=mcp_config) self.agent_name = agent_name self.agent_description = agent_description self.base_instructions = agent_instructions self.model_deployment_name = model_deployment_name - self.project_endpoint = azure_openai_endpoint # reused meaning + self.project_endpoint = azure_ai_project_endpoint self.search_config = search_config - self.mcp_config = mcp_config self.max_search_docs = max_search_docs + + # Azure resources + self._credential: Optional[DefaultAzureCredential] = None + self._client: Optional[AzureAIAgentClient] = None + + # Search integration + self.reasoning_search: Optional[ReasoningSearch] = None + + self.logger = logging.getLogger(__name__) + + # Validate reasoning model + if self.model_deployment_name not in {"o1", "o1-mini", "o1-preview", "o3-mini"}: + self.logger.warning( + "Model '%s' may not support reasoning features. " + "Recommended models: o1, o1-mini, o3-mini", + self.model_deployment_name + ) - # Azure + client resources - self._credential: DefaultAzureCredential | None = None - self._project_client: AIProjectClient | None = None - self._client: AzureAIAgentClient | None = None - - # Optional search - self._search_ctx: _SearchContext | None = None - - self._opened = False + async def _after_open(self) -> None: + """Initialize Azure client and search after base setup.""" + try: + # Initialize Azure credential + self._credential = DefaultAzureCredential() + if self._stack: + await self._stack.enter_async_context(self._credential) + + # Create AzureAIAgentClient for direct model access + self._client = AzureAIAgentClient( + project_endpoint=self.project_endpoint, + model_deployment_name=self.model_deployment_name, + async_credential=self._credential, + ) + if self._stack: + await self._stack.enter_async_context(self._client) - # ------------- Lifecycle ------------- - async def open(self) -> "ReasoningAgentTemplate": - if self._opened: - return self + self.logger.info( + "Initialized AzureAIAgentClient for model '%s'", + self.model_deployment_name + ) - self._credential = DefaultAzureCredential() - self._project_client = AIProjectClient( - endpoint=self.project_endpoint, - credential=self._credential, - ) + # Initialize search capabilities if configured + if self.search_config: + self.reasoning_search = await create_reasoning_search(self.search_config) + if self.reasoning_search.is_available(): + self.logger.info( + "Initialized Azure AI Search with index '%s'", + self.search_config.index_name + ) + else: + self.logger.warning("Azure AI Search initialization failed or incomplete config") + + # Initialize MCP tools (called after stack is ready) + await self._prepare_mcp_tool() + + if self.mcp_tool: + self.logger.info( + "MCP tool '%s' ready with %d functions", + self.mcp_tool.name, + len(self.mcp_tool.functions) if hasattr(self.mcp_tool, 'functions') else 0 + ) - # Create AzureAIAgentClient (ephemeral agent will be created on first run) - self._client = AzureAIAgentClient( - project_client=self._project_client, - agent_id=None, - agent_name=self.agent_name, - model_deployment_name=self.model_deployment_name, - ) + # Store client reference (for compatibility with base class delegation) + self._agent = self._client - # Optional search setup - if self.search_config and all( - [ - self.search_config.endpoint, - self.search_config.index_name, - self.search_config.api_key, - ] - ): + # Register agent globally try: - sc = SearchClient( - endpoint=self.search_config.endpoint, - index_name=self.search_config.index_name, - credential=AzureKeyCredential(self.search_config.api_key), - ) - self._search_ctx = _SearchContext(client=sc, top_k=self.max_search_docs) - logger.info( - "ReasoningAgentTemplate: search index '%s' configured.", - self.search_config.index_name, + agent_registry.register_agent(self) + self.logger.info("Registered agent '%s' in global registry.", self.agent_name) + except Exception as reg_ex: + self.logger.warning( + "Could not register agent '%s': %s", + self.agent_name, + reg_ex ) - except Exception as ex: # noqa: BLE001 - logger.warning("ReasoningAgentTemplate: search initialization failed: %s", ex) - # Registry - try: - agent_registry.register_agent(self) - except Exception: # noqa: BLE001 - pass - - self._opened = True - return self + except Exception as ex: + self.logger.error("Failed to initialize ReasoningAgentTemplate: %s", ex) + raise async def close(self) -> None: - if not self._opened: - return - try: - if self._client: - await self._client.close() - except Exception: # noqa: BLE001 - pass - try: - if self._credential: - await self._credential.close() - except Exception: # noqa: BLE001 - pass + """Close all resources.""" try: - agent_registry.unregister_agent(self) - except Exception: # noqa: BLE001 - pass + # Close reasoning search + if self.reasoning_search: + await self.reasoning_search.close() + self.reasoning_search = None - self._client = None - self._project_client = None - self._credential = None - self._search_ctx = None - self._opened = False - - async def __aenter__(self) -> "ReasoningAgentTemplate": - return await self.open() + # Unregister from registry + try: + agent_registry.unregister_agent(self) + except Exception: # noqa: BLE001 + pass - async def __aexit__(self, exc_type, exc, tb) -> None: - await self.close() + finally: + await super().close() + self._client = None + self._credential = None - # ------------- Public Invocation ------------- - async def invoke(self, message: str) -> AsyncIterator[ChatResponseUpdate]: + async def _augment_with_search(self, prompt: str) -> str: """ - Mirrors old streaming interface: - Yields ChatResponseUpdate objects (instead of SK ChatMessageContent). - Consumers expecting SK types should translate here. + Augment the prompt with relevant search results. + + Args: + prompt: Original user prompt + + Returns: + Augmented instructions including search results """ - async for update in self._invoke_stream_internal(message): - yield update + instructions = self.base_instructions + + if not self.reasoning_search or not self.reasoning_search.is_available(): + return instructions - # ------------- Internal streaming logic ------------- - async def _invoke_stream_internal(self, prompt: str) -> AsyncIterator[ChatResponseUpdate]: - if not self._opened or not self._client: - raise RuntimeError("Agent not opened. Call open().") + if not prompt.strip(): + return instructions - # Build instructions with optional search - instructions = self.base_instructions - if self._search_ctx and prompt.strip(): - docs = await self._fetch_docs_async(prompt) + try: + # Fetch relevant documents + docs = await self.reasoning_search.search_documents( + query=prompt, + limit=self.max_search_docs + ) + if docs: - joined = "\n\n".join(f"[Doc {i+1}] {d}" for i, d in enumerate(docs)) + # Format documents for inclusion + doc_context = "\n\n".join( + f"[Document {i+1}]\n{doc}" + for i, doc in enumerate(docs) + ) + + # Append to instructions instructions = ( - f"{instructions}\n\nRelevant reference documents:\n{joined}\n\n" - "Use them only if they help answer the question." + f"{instructions}\n\n" + f"**Relevant Reference Documents:**\n{doc_context}\n\n" + f"Use the above documents only if they help answer the user's question. " + f"Do not mention the documents unless directly relevant." ) + + self.logger.debug( + "Augmented prompt with %d search documents", + len(docs) + ) + except Exception as ex: + self.logger.warning("Search augmentation failed: %s", ex) + + return instructions + def _prepare_tools(self) -> list: + """ + Prepare tools for reasoning model invocation. + + Returns: + List of tools (currently only MCP tools supported for reasoning models) + """ tools = [] - if self.mcp_config: - tools.append( - HostedMCPTool( - name=self.mcp_config.name, - description=self.mcp_config.description, - server_label=self.mcp_config.name.replace(" ", "_"), - url=self.mcp_config.url - ) - ) + + if self.mcp_tool: + tools.append(self.mcp_tool) + self.logger.debug("Added MCP tool '%s' to tools list", self.mcp_tool.name) + + return tools + + async def invoke(self, prompt: str): + """ + Stream model output for a prompt with optional search augmentation. + + For reasoning models, this will include: + - Reasoning content (thinking process) + - Final answer content + + Args: + prompt: User prompt/question + + Yields: + ChatResponseUpdate objects with incremental updates + """ + if not self._client: + raise RuntimeError("Agent not initialized; call open() first.") - chat_options = ChatOptions( - model_id=self.model_deployment_name, - tools=tools if tools else None, - tool_choice="auto" if tools else "none", - temperature=0.7, - allow_multiple_tool_calls=True, - ) + # Augment instructions with search results if available + instructions = await self._augment_with_search(prompt) + # Build message messages = [ChatMessage(role=Role.USER, text=prompt)] - async for update in self._client.get_streaming_response( - messages=messages, - chat_options=chat_options, - instructions=instructions, - ): - yield update + # Prepare tools + tools = self._prepare_tools() - async def _fetch_docs_async(self, query: str) -> List[str]: - if not self._search_ctx: - return [] - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, lambda: self._search_ctx.fetch(query)) + try: + # Stream response from reasoning model + async for update in self._client.get_streaming_response( + messages=messages, + instructions=instructions, + tools=tools if tools else None, + tool_choice="auto" if tools else "none", + temperature=1.0, # Reasoning models use fixed temperature + ): + yield update + + except Exception as ex: + self.logger.error("Error during reasoning agent invocation: %s", ex) + raise + + async def invoke_non_streaming(self, prompt: str): + """ + Get complete response (non-streaming) with search augmentation. + + Args: + prompt: User prompt/question + + Returns: + ChatResponse with complete response + """ + if not self._client: + raise RuntimeError("Agent not initialized; call open() first.") + + # Augment instructions with search results + instructions = await self._augment_with_search(prompt) + + # Build message + messages = [ChatMessage(role=Role.USER, text=prompt)] + + # Prepare tools + tools = self._prepare_tools() + + try: + # Get response from reasoning model + response = await self._client.get_response( + messages=messages, + instructions=instructions, + tools=tools if tools else None, + tool_choice="auto" if tools else "none", + temperature=1.0, + ) + return response + + except Exception as ex: + self.logger.error("Error during reasoning agent invocation: %s", ex) + raise - # ------------- Convenience ------------- @property - def client(self) -> AzureAIAgentClient | None: + def client(self) -> Optional[AzureAIAgentClient]: + """Access to underlying client for compatibility.""" return self._client -# Factory (name preserved) +# ------------------------- +# Factory +# ------------------------- async def create_reasoning_agent( agent_name: str, agent_description: str, agent_instructions: str, model_deployment_name: str, - azure_openai_endpoint: str, + azure_ai_project_endpoint: str | None = None, search_config: SearchConfig | None = None, mcp_config: MCPConfig | None = None, ) -> ReasoningAgentTemplate: + """ + Factory to create and open a ReasoningAgentTemplate. + + Args: + agent_name: Name of the agent + agent_description: Description of the agent's purpose + agent_instructions: System instructions for the agent + model_deployment_name: Reasoning model deployment (e.g., "o1", "o3-mini") + azure_ai_project_endpoint: Azure AI Project endpoint (defaults to env var) + search_config: Optional Azure AI Search configuration + mcp_config: Optional MCP server configuration + + Returns: + Initialized and opened ReasoningAgentTemplate instance + + Example: + ```python + from af.magentic_agents.models.agent_models import SearchConfig, MCPConfig + + # With search augmentation and MCP tools + agent = await create_reasoning_agent( + agent_name="ReasoningAgent", + agent_description="Agent that uses reasoning models with RAG", + agent_instructions="You are a helpful reasoning assistant.", + model_deployment_name="o1", + search_config=SearchConfig( + endpoint="https://my-search.search.windows.net", + index_name="my-index", + api_key="...", + ), + mcp_config=MCPConfig( + url="https://my-mcp-server.com", + name="HR Tools", + description="HR data access tools" + ), + ) + + async with agent: + async for update in agent.invoke("Explain quantum entanglement"): + print(update.text, end="") + ``` + """ + # Get endpoint from env if not provided + endpoint = azure_ai_project_endpoint or os.getenv("AZURE_AI_PROJECT_ENDPOINT") + if not endpoint: + raise RuntimeError( + "AZURE_AI_PROJECT_ENDPOINT must be provided or set as environment variable" + ) + agent = ReasoningAgentTemplate( agent_name=agent_name, agent_description=agent_description, agent_instructions=agent_instructions, model_deployment_name=model_deployment_name, - azure_openai_endpoint=azure_openai_endpoint, + azure_ai_project_endpoint=endpoint, search_config=search_config, mcp_config=mcp_config, ) diff --git a/src/backend/af/magentic_agents/reasoning_search.py b/src/backend/af/magentic_agents/reasoning_search.py index 20ea5983..12a00527 100644 --- a/src/backend/af/magentic_agents/reasoning_search.py +++ b/src/backend/af/magentic_agents/reasoning_search.py @@ -1,5 +1,5 @@ """ -Azure AI Search integration for reasoning agents (no agent framework dependency). +Azure AI Search integration for reasoning agents (framework-agnostic). This module provides: - ReasoningSearch: lightweight wrapper around Azure Cognitive Search (Azure AI Search) @@ -8,11 +8,13 @@ Design goals: - Fast to call from other async agent components - Graceful degradation if configuration is incomplete +- Framework-agnostic (works with any agent framework) """ from __future__ import annotations import asyncio +import logging from concurrent.futures import ThreadPoolExecutor from typing import List, Optional @@ -21,9 +23,15 @@ from af.magentic_agents.models.agent_models import SearchConfig +logger = logging.getLogger(__name__) + + class ReasoningSearch: """ Handles Azure AI Search (Cognitive Search) queries for retrieval / RAG augmentation. + + This class is framework-agnostic and can be used with any agent framework + including agent_framework, Semantic Kernel, LangChain, etc. """ def __init__( @@ -32,6 +40,12 @@ def __init__( *, max_executor_workers: int = 4, ) -> None: + """Initialize ReasoningSearch. + + Args: + search_config: Azure AI Search configuration + max_executor_workers: Max threads for search executor pool + """ self.search_config = search_config self.search_client: Optional[SearchClient] = None self._executor: Optional[ThreadPoolExecutor] = None @@ -41,6 +55,7 @@ def __init__( async def initialize(self) -> bool: """ Initialize the search client. Safe to call multiple times. + Returns: bool: True if initialized, False if config missing or failed. """ @@ -54,6 +69,7 @@ async def initialize(self) -> bool: or not self.search_config.api_key ): # Incomplete config => treat as disabled + logger.debug("Search config incomplete, search will be disabled") return False try: @@ -68,9 +84,14 @@ async def initialize(self) -> bool: thread_name_prefix="reasoning_search", ) self._initialized = True + logger.info( + "ReasoningSearch initialized with index '%s'", + self.search_config.index_name + ) return True - except Exception: + except Exception as ex: # Swallow initialization errors (callers can check is_available) + logger.warning("ReasoningSearch initialization failed: %s", ex) self.search_client = None self._initialized = False return False @@ -85,72 +106,107 @@ async def search_documents(self, query: str, limit: int = 3) -> List[str]: Args: query: Natural language or keyword query. - limit: Max number of documents. + limit: Max number of documents (1-50). Returns: List of strings (each a document content snippet). Empty if none or unavailable. """ if not self.is_available(): + logger.debug("Search not available, returning empty results") + return [] + + if not query.strip(): + logger.debug("Empty query, returning empty results") return [] limit = max(1, min(limit, 50)) # basic safety bounds loop = asyncio.get_running_loop() try: - return await loop.run_in_executor( + results = await loop.run_in_executor( self._executor, lambda: self._run_search_sync(query=query, limit=limit), ) - except Exception: + logger.debug("Search returned %d documents for query", len(results)) + return results + except Exception as ex: + logger.warning("Search execution failed: %s", ex) return [] - async def search_raw(self, query: str, limit: int = 3): + async def search_raw(self, query: str, limit: int = 3) -> List: """ Raw search returning native SDK result iterator (materialized to list). Provided for more advanced callers needing metadata. + Args: + query: Natural language or keyword query. + limit: Max number of documents (1-50). + Returns: list of raw SDK result objects (dict-like). """ if not self.is_available(): return [] + if not query.strip(): + return [] + limit = max(1, min(limit, 50)) loop = asyncio.get_running_loop() try: return await loop.run_in_executor( - self._executor, lambda: self._run_search_sync(query, limit, raw=True) + self._executor, + lambda: self._run_search_sync(query, limit, raw=True) ) - except Exception: + except Exception as ex: + logger.warning("Raw search execution failed: %s", ex) return [] - def _run_search_sync(self, query: str, limit: int, raw: bool = False): + def _run_search_sync(self, query: str, limit: int, raw: bool = False) -> List: """ Internal synchronous search (executed inside ThreadPoolExecutor). + + Args: + query: Search query + limit: Max results + raw: If True, return raw result objects; if False, extract 'content' field + + Returns: + List of content strings or raw result objects """ if not self.search_client: - return [] if not raw else [] + return [] - results_iter = self.search_client.search( - search_text=query, - query_type="simple", - select=["content"], - top=limit, - ) + try: + results_iter = self.search_client.search( + search_text=query, + query_type="simple", + select=["content"], + top=limit, + ) - contents: List[str] = [] - raw_items: List = [] - for item in results_iter: - try: - if raw: - raw_items.append(item) - else: - contents.append(f"{item['content']}") - except Exception: - continue + contents: List[str] = [] + raw_items: List = [] + + for item in results_iter: + try: + if raw: + raw_items.append(item) + else: + # Extract content field + content = item.get('content', '') + if content: + contents.append(str(content)) + except (KeyError, TypeError) as ex: + logger.debug("Error extracting content from result: %s", ex) + continue - return raw_items if raw else contents + return raw_items if raw else contents + + except Exception as ex: + logger.warning("Search query execution failed: %s", ex) + return [] async def close(self) -> None: """ @@ -161,9 +217,10 @@ async def close(self) -> None: self._executor = None self.search_client = None self._initialized = False + logger.debug("ReasoningSearch closed") -# Factory (keeps old name, but no 'af' parameter needed anymore) +# Factory async def create_reasoning_search( search_config: Optional[SearchConfig], ) -> ReasoningSearch: @@ -175,6 +232,26 @@ async def create_reasoning_search( Returns: Initialized ReasoningSearch (is_available() indicates readiness). + + Example: + ```python + from af.magentic_agents.models.agent_models import SearchConfig + + search = await create_reasoning_search( + SearchConfig( + endpoint="https://my-search.search.windows.net", + index_name="documents", + api_key="...", + ) + ) + + if search.is_available(): + docs = await search.search_documents("quantum entanglement", limit=5) + for doc in docs: + print(doc) + + await search.close() + ``` """ search = ReasoningSearch(search_config) await search.initialize() diff --git a/src/backend/af/orchestration/orchestration_manager.py b/src/backend/af/orchestration/orchestration_manager.py index 96830b0e..78b87562 100644 --- a/src/backend/af/orchestration/orchestration_manager.py +++ b/src/backend/af/orchestration/orchestration_manager.py @@ -6,18 +6,14 @@ from typing import List, Optional, Callable, Awaitable # agent_framework imports -from agent_framework.azure import AzureOpenAIChatClient - -from agent_framework import ChatMessage, ChatOptions -from agent_framework._workflows import MagenticBuilder -from agent_framework._workflows._magentic import AgentRunResponseUpdate # type: ignore +from agent_framework_azure_ai import AzureAIAgentClient +from agent_framework import ChatMessage, ChatOptions, WorkflowOutputEvent, AgentRunResponseUpdate, MagenticBuilder from common.config.app_config import config from common.models.messages_af import TeamConfiguration -# Existing (legacy) callbacks expecting SK content; we'll adapt to them. -# If you've created af-native callbacks (e.g. response_handlers_af) import those instead. +# Existing (legacy) callbacks from af.callbacks.response_handlers import ( agent_response_callback, streaming_agent_response_callback, @@ -46,9 +42,8 @@ def _user_aware_agent_callback( """Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature.""" async def _cb(agent_id: str, message: ChatMessage): - # Reuse existing callback expecting (ChatMessageContent, user_id). We pass text directly. try: - agent_response_callback(message, user_id) # existing callback is sync + agent_response_callback(agent_id, message, user_id) # Fixed: added agent_id except Exception as e: # noqa: BLE001 logging.getLogger(__name__).error( "agent_response_callback error: %s", e @@ -63,18 +58,8 @@ def _user_aware_streaming_callback( """Adapts streaming updates to existing streaming handler.""" async def _cb(agent_id: str, update: AgentRunResponseUpdate, is_final: bool): - # Build a minimal shim object with text/content for legacy handler if needed. - # Your converted streaming handlers (response_handlers_af) should replace this eventual shim. - class _Shim: # noqa: D401 - def __init__(self, agent_id: str, update: AgentRunResponseUpdate): - self.agent_id = agent_id - self.text = getattr(update, "text", None) - self.contents = getattr(update, "contents", None) - self.role = getattr(update, "role", None) - - shim = _Shim(agent_id, update) try: - await streaming_agent_response_callback(shim, is_final, user_id) + await streaming_agent_response_callback(agent_id, update, is_final, user_id) # Fixed: removed shim except Exception as e: # noqa: BLE001 logging.getLogger(__name__).error( "streaming_agent_response_callback error: %s", e @@ -91,42 +76,56 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): Initialize a Magentic workflow with: - Provided agents (participants) - HumanApprovalMagenticManager as orchestrator manager - - AzureOpenAIChatClient as the underlying chat client + - AzureAIAgentClient as the underlying chat client + + This mirrors the old Semantic Kernel orchestration setup: + - Uses same deployment, endpoint, and credentials + - Applies same execution settings (temperature, max_tokens) + - Maintains same human approval workflow """ if not user_id: raise ValueError("user_id is required to initialize orchestration") + # Get credential from config (same as old version) credential = config.get_azure_credential(client_id=config.AZURE_CLIENT_ID) - def get_token(): - token = credential.get_token("https://cognitiveservices.azure.com/.default") - return token.token - - # Create Azure chat client (agent_framework style) - relying on environment or explicit kwargs. + # Create Azure AI Agent client for orchestration using config + # This replaces AzureChatCompletion from SK try: - chat_client = AzureOpenAIChatClient( - endpoint=config.AZURE_OPENAI_ENDPOINT, - deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME, - ad_token_provider=get_token, + chat_client = AzureAIAgentClient( + project_endpoint=config.AZURE_AI_PROJECT_ENDPOINT, + model_deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME, + async_credential=credential, ) - except Exception as e: # noqa: BLE001 - logging.getLogger(__name__).error( - "chat_client error: %s", e - ) + + cls.logger.info( + "Created AzureAIAgentClient for orchestration with model '%s' at endpoint '%s'", + config.AZURE_OPENAI_DEPLOYMENT_NAME, + config.AZURE_AI_PROJECT_ENDPOINT + ) + except Exception as e: + cls.logger.error("Failed to create AzureAIAgentClient: %s", e) raise - # HumanApprovalMagenticManager needs the chat_client passed as 'chat_client' in its constructor signature (it subclasses StandardMagenticManager) + + # Create HumanApprovalMagenticManager with the chat client + # Execution settings (temperature=0.1, max_tokens=4000) are configured via + # orchestration_config.create_execution_settings() which matches old SK version try: manager = HumanApprovalMagenticManager( user_id=user_id, chat_client=chat_client, - instructions=None, # optionally supply orchestrator system instructions + instructions=None, # Orchestrator system instructions (optional) max_round_count=orchestration_config.max_rounds, ) - except Exception as e: # noqa: BLE001 - logging.getLogger(__name__).error( - "manager error: %s", e - ) + cls.logger.info( + "Created HumanApprovalMagenticManager for user '%s' with max_rounds=%d", + user_id, + orchestration_config.max_rounds + ) + except Exception as e: + cls.logger.error("Failed to create manager: %s", e) raise + # Build participant map: use each agent's name as key participants = {} for ag in agents: @@ -134,6 +133,7 @@ def get_token(): if not name: name = f"agent_{len(participants)+1}" participants[name] = ag + cls.logger.debug("Added participant '%s'", name) # Assemble workflow builder = ( @@ -142,18 +142,12 @@ def get_token(): .with_standard_manager(manager=manager) ) - # Register callbacks (non-streaming manager orchestration events). We'll enable streaming agent deltas via unified mode if desired later. - # Provide direct agent + streaming callbacks (legacy adapter form). - # The builder currently surfaces unified callback OR agent callbacks; we use agent callbacks here. - # NOTE: If you want unified events instead, use builder.on_event(..., mode=MagenticCallbackMode.STREAMING). - # We'll just store callbacks by augmenting manager after build via internal surfaces. + # Build workflow workflow = builder.build() + cls.logger.info("Built Magentic workflow with %d participants", len(participants)) - # Wire agent response callbacks onto executor layer - # The built workflow exposes internal orchestrator/executor attributes; we rely on exported API for adding callbacks if present. + # Wire agent response callbacks onto orchestrator try: - # Attributes available: workflow._orchestrator._agent_response_callback, etc. - # Set them if not already configured (defensive). orchestrator = getattr(workflow, "_orchestrator", None) if orchestrator: if getattr(orchestrator, "_agent_response_callback", None) is None: @@ -171,7 +165,8 @@ def get_token(): "_streaming_agent_response_callback", cls._user_aware_streaming_callback(user_id), ) - except Exception as e: # noqa: BLE001 + cls.logger.debug("Attached callbacks to workflow orchestrator") + except Exception as e: cls.logger.warning( "Could not attach callbacks to workflow orchestrator: %s", e ) @@ -193,23 +188,25 @@ async def get_current_or_new_orchestration( current = orchestration_config.get_current_orchestration(user_id) if current is None or team_switched: if current is not None and team_switched: - # Close prior agents (skip ProxyAgent if desired) + cls.logger.info("Team switched, closing previous agents for user '%s'", user_id) + # Close prior agents (same logic as old version) for agent in getattr(current, "_participants", {}).values(): - if ( - getattr(agent, "agent_name", getattr(agent, "name", "")) - != "ProxyAgent" - ): + agent_name = getattr(agent, "agent_name", getattr(agent, "name", "")) + if agent_name != "ProxyAgent": close_coro = getattr(agent, "close", None) if callable(close_coro): try: await close_coro() - except Exception as e: # noqa: BLE001 + cls.logger.debug("Closed agent '%s'", agent_name) + except Exception as e: cls.logger.error("Error closing agent: %s", e) factory = MagenticAgentFactory() agents = await factory.get_agents( user_id=user_id, team_config_input=team_config ) + cls.logger.info("Created %d agents for user '%s'", len(agents), user_id) + orchestration_config.orchestrations[user_id] = await cls.init_orchestration( agents, user_id ) @@ -221,41 +218,55 @@ async def get_current_or_new_orchestration( async def run_orchestration(self, user_id: str, input_task) -> None: """ Execute the Magentic workflow for the provided user and task description. + + This mirrors the old SK orchestration: + - Uses same execution settings (temperature=0.1, max_tokens=4000) + - Maintains same approval workflow + - Sends same WebSocket updates """ job_id = str(uuid.uuid4()) orchestration_config.set_approval_pending(job_id) + self.logger.info("Starting orchestration job '%s' for user '%s'", job_id, user_id) workflow = orchestration_config.get_current_orchestration(user_id) if workflow is None: raise ValueError("Orchestration not initialized for user.") - # Ensure manager tracks user_id + # Ensure manager tracks user_id (same as old version) try: manager = getattr(workflow, "_manager", None) if manager and hasattr(manager, "current_user_id"): manager.current_user_id = user_id - except Exception as e: # noqa: BLE001 + self.logger.debug("Set user_id on manager = %s", user_id) + except Exception as e: self.logger.error("Error setting user_id on manager: %s", e) - # Build a MagenticContext-like starting message; the workflow interface likely exposes invoke(task=...) + # Build task from input (same as old version) task_text = getattr(input_task, "description", str(input_task)) - - # Provide chat options (temperature mapping from original execution_settings) - chat_options = ChatOptions( - temperature=0.1, - max_output_tokens=4000, - ) + self.logger.debug("Task: %s", task_text) try: - # Invoke orchestrator; API may be workflow.invoke(task=..., chat_options=...) - result_msg: ChatMessage = await workflow.invoke( - task=task_text, chat_options=chat_options - ) - - final_text = result_msg.text if result_msg else "" - self.logger.info("Final result:\n%s", final_text) + # Execute workflow using run_stream with task as positional parameter + # The execution settings are configured in the manager/client + final_output: str | None = None + + self.logger.info("Starting workflow execution...") + async for event in workflow.run_stream(task_text): + # Check if this is the final output event + if isinstance(event, WorkflowOutputEvent): + final_output = str(event.data) + self.logger.debug("Received workflow output event") + + # Extract final result + final_text = final_output if final_output else "" + + # Log results (same format as old version) + self.logger.info("\nAgent responses:") + self.logger.info("Orchestration completed. Final result length: %d chars", len(final_text)) + self.logger.info("\nFinal result:\n%s", final_text) self.logger.info("=" * 50) + # Send final result via WebSocket (same as old version) await connection_config.send_status_update_async( { "type": WebsocketMessageType.FINAL_RESULT_MESSAGE, @@ -268,6 +279,30 @@ async def run_orchestration(self, user_id: str, input_task) -> None: user_id, message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE, ) - self.logger.info("Final result sent via WebSocket to user %s", user_id) - except Exception as e: # noqa: BLE001 - self.logger.error("Unexpected orchestration error: %s", e) + self.logger.info("Final result sent via WebSocket to user '%s'", user_id) + + except Exception as e: + # Error handling (enhanced from old version) + self.logger.error("Unexpected orchestration error: %s", e, exc_info=True) + self.logger.error("Error type: %s", type(e).__name__) + if hasattr(e, "__dict__"): + self.logger.error("Error attributes: %s", e.__dict__) + self.logger.info("=" * 50) + + # Send error status to user + try: + await connection_config.send_status_update_async( + { + "type": WebsocketMessageType.FINAL_RESULT_MESSAGE, + "data": { + "content": f"Error during orchestration: {str(e)}", + "status": "error", + "timestamp": asyncio.get_event_loop().time(), + }, + }, + user_id, + message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE, + ) + except Exception as send_error: + self.logger.error("Failed to send error status: %s", send_error) + raise \ No newline at end of file