|
| 1 | +""" |
| 2 | +Test LLM and MCP |
| 3 | +Based on fastmcp library. |
| 4 | +This one provide also support for security in MCP calls, using JWT token. |
| 5 | +
|
| 6 | +This is the backend for the Streamlit MCP UI. |
| 7 | +""" |
| 8 | + |
| 9 | +import json |
| 10 | +import asyncio |
| 11 | +from typing import List, Dict, Any, Callable, Sequence, Optional |
| 12 | + |
| 13 | +from fastmcp import Client as MCPClient |
| 14 | +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage |
| 15 | + |
| 16 | +# our code imports |
| 17 | +from oci_jwt_client import OCIJWTClient |
| 18 | +from oci_models import get_llm |
| 19 | +from utils import get_console_logger |
| 20 | +from config import IAM_BASE_URL, ENABLE_JWT_TOKEN |
| 21 | +from config_private import SECRET_OCID |
| 22 | +from mcp_servers_config import MCP_SERVERS_CONFIG |
| 23 | + |
| 24 | +logger = get_console_logger() |
| 25 | + |
| 26 | +# ---- Config ---- |
| 27 | +MAX_HISTORY = 10 |
| 28 | +MCP_URL = MCP_SERVERS_CONFIG["default"]["url"] |
| 29 | +TIMEOUT = 60 |
| 30 | +# the scope for the JWT token |
| 31 | +SCOPE = "urn:opc:idm:__myscopes__" |
| 32 | + |
| 33 | +# eventually you can taylor the SYSTEM prompt here |
| 34 | +SYSTEM_PROMPT = """You are an AI assistant equipped with an MCP server and several tools. |
| 35 | +Provide all the needed information with a detailed query when you use a tool. |
| 36 | +If the collection name is not provided in the user's prompt, |
| 37 | +use the collection BOOKS to get the additional information you need to answer. |
| 38 | +""" |
| 39 | + |
| 40 | + |
| 41 | +def default_jwt_supplier() -> str: |
| 42 | + """ |
| 43 | + Get a valid JWT token to make the call to MCP server |
| 44 | + """ |
| 45 | + if ENABLE_JWT_TOKEN: |
| 46 | + # Always return a FRESH token; do not include "Bearer " (FastMCP adds it) |
| 47 | + token, _, _ = OCIJWTClient(IAM_BASE_URL, SCOPE, SECRET_OCID).get_token() |
| 48 | + else: |
| 49 | + # JWT security disabled |
| 50 | + token = None |
| 51 | + return token |
| 52 | + |
| 53 | + |
| 54 | +class AgentWithMCP: |
| 55 | + """ |
| 56 | + LLM + MCP orchestrator. |
| 57 | + - Discovers tools from an MCP server (JWT-protected) |
| 58 | + - Binds tool JSON Schemas to the LLM |
| 59 | + - Executes tool calls emitted by the LLM and loops until completion |
| 60 | +
|
| 61 | + This is a rather simple agent, it does only tool calling, |
| 62 | + but tools are provided by the MCP server. |
| 63 | + The code introspects the MCP server and decide which tool to call |
| 64 | + and what parameters to provide. |
| 65 | + """ |
| 66 | + |
| 67 | + def __init__( |
| 68 | + self, |
| 69 | + mcp_url: str, |
| 70 | + jwt_supplier: Callable[[], str], |
| 71 | + timeout: int, |
| 72 | + llm, |
| 73 | + ): |
| 74 | + self.mcp_url = mcp_url |
| 75 | + self.jwt_supplier = jwt_supplier |
| 76 | + self.timeout = timeout |
| 77 | + self.llm = llm |
| 78 | + self.model_with_tools = None |
| 79 | + # optional: cache tools to avoid re-listing every run |
| 80 | + self._tools_cache = None |
| 81 | + |
| 82 | + # ---------- helpers now INSIDE the class ---------- |
| 83 | + |
| 84 | + @staticmethod |
| 85 | + def _tool_to_schema(t: object) -> dict: |
| 86 | + """ |
| 87 | + Convert an MCP tool (name, description, inputSchema) to a JSON-Schema dict |
| 88 | + that LangChain's ChatCohere.bind_tools accepts (top-level schema). |
| 89 | + """ |
| 90 | + input_schema = (getattr(t, "inputSchema", None) or {}).copy() |
| 91 | + if input_schema.get("type") != "object": |
| 92 | + input_schema.setdefault("type", "object") |
| 93 | + input_schema.setdefault("properties", {}) |
| 94 | + return { |
| 95 | + "title": getattr(t, "name", "tool"), |
| 96 | + "description": getattr(t, "description", "") or "", |
| 97 | + **input_schema, |
| 98 | + } |
| 99 | + |
| 100 | + async def _list_tools(self): |
| 101 | + """ |
| 102 | + Fetch tools from the MCP server using FastMCP. Must be async. |
| 103 | + """ |
| 104 | + jwt = self.jwt_supplier() |
| 105 | + |
| 106 | + logger.info("Listing tools from %s ...", self.mcp_url) |
| 107 | + |
| 108 | + # FastMCP requires async context + await for client ops. |
| 109 | + async with MCPClient(self.mcp_url, auth=jwt, timeout=self.timeout) as c: |
| 110 | + # returns Tool objects |
| 111 | + return await c.list_tools() |
| 112 | + |
| 113 | + async def _call_tool(self, name: str, args: Dict[str, Any]): |
| 114 | + """ |
| 115 | + Execute a single MCP tool call. |
| 116 | + """ |
| 117 | + jwt = self.jwt_supplier() |
| 118 | + logger.info("Calling MCP tool '%s' with args %s", name, args) |
| 119 | + async with MCPClient(self.mcp_url, auth=jwt, timeout=self.timeout) as c: |
| 120 | + return await c.call_tool(name, args or {}) |
| 121 | + |
| 122 | + @classmethod |
| 123 | + async def create( |
| 124 | + cls, |
| 125 | + mcp_url: str = MCP_URL, |
| 126 | + jwt_supplier: Callable[[], str] = default_jwt_supplier, |
| 127 | + timeout: int = TIMEOUT, |
| 128 | + model_id: str = "cohere.command-a-03-2025", |
| 129 | + ): |
| 130 | + """ |
| 131 | + Async factory: fetch tools, bind them to the LLM, return a ready-to-use agent. |
| 132 | + Important: Avoids doing awaits in __init__. |
| 133 | + """ |
| 134 | + # should return a LangChain Chat model supporting .bind_tools(...) |
| 135 | + llm = get_llm(model_id=model_id) |
| 136 | + # after, we call init() |
| 137 | + self = cls(mcp_url, jwt_supplier, timeout, llm) |
| 138 | + |
| 139 | + tools = await self._list_tools() |
| 140 | + if not tools: |
| 141 | + logger.warning("No tools discovered at %s", mcp_url) |
| 142 | + self._tools_cache = tools |
| 143 | + |
| 144 | + schemas = [self._tool_to_schema(t) for t in tools] |
| 145 | + self.model_with_tools = self.llm.bind_tools(schemas) |
| 146 | + return self |
| 147 | + |
| 148 | + def _build_messages( |
| 149 | + self, |
| 150 | + history: Sequence[Dict[str, Any]], |
| 151 | + system_prompt: str, |
| 152 | + current_user_prompt: str, |
| 153 | + *, |
| 154 | + max_history: Optional[ |
| 155 | + int |
| 156 | + ] = MAX_HISTORY, # keep only the last N items; None = keep all |
| 157 | + exclude_last: bool = True, # drop the very last history entry before building |
| 158 | + ) -> List[Any]: |
| 159 | + """ |
| 160 | + Create: [SystemMessage(system_prompt), <trimmed history except last>, |
| 161 | + HumanMessage(current_user_prompt)] |
| 162 | + History items are dicts like {"role": "user"|"assistant", "content": "..."} |
| 163 | + in chronological order. |
| 164 | + """ |
| 165 | + # 1) Trim to the last `max_history` entries (if set) |
| 166 | + if max_history is not None and max_history > 0: |
| 167 | + working = list(history[-max_history:]) |
| 168 | + else: |
| 169 | + working = list(history) |
| 170 | + |
| 171 | + # 2) Optionally remove the final entry from trimmed history |
| 172 | + if exclude_last and working: |
| 173 | + working = working[:-1] |
| 174 | + |
| 175 | + # 3) Build LangChain messages |
| 176 | + msgs: List[Any] = [SystemMessage(content=system_prompt)] |
| 177 | + for m in working: |
| 178 | + role = (m.get("role") or "").lower() |
| 179 | + content: Optional[str] = m.get("content") |
| 180 | + if not content: |
| 181 | + continue |
| 182 | + if role == "user": |
| 183 | + msgs.append(HumanMessage(content=content)) |
| 184 | + elif role == "assistant": |
| 185 | + msgs.append(AIMessage(content=content)) |
| 186 | + # ignore other/unknown roles (e.g., 'system', 'tool') in this simple variant |
| 187 | + |
| 188 | + # 4) Add the current user prompt |
| 189 | + msgs.append(HumanMessage(content=current_user_prompt)) |
| 190 | + return msgs |
| 191 | + |
| 192 | + # |
| 193 | + # ---------- main loop ---------- |
| 194 | + # |
| 195 | + async def answer(self, question: str, history: list = None) -> str: |
| 196 | + """ |
| 197 | + Run the LLM+MCP loop until the model stops calling tools. |
| 198 | + """ |
| 199 | + messages = self._build_messages( |
| 200 | + history=history, |
| 201 | + system_prompt=SYSTEM_PROMPT, |
| 202 | + current_user_prompt=question, |
| 203 | + ) |
| 204 | + |
| 205 | + # List[Any] = [ |
| 206 | + # SystemMessage(content=SYSTEM_PROMPT), |
| 207 | + # HumanMessage(content=question), |
| 208 | + # ] |
| 209 | + |
| 210 | + while True: |
| 211 | + ai: AIMessage = await self.model_with_tools.ainvoke(messages) |
| 212 | + |
| 213 | + tool_calls = getattr(ai, "tool_calls", None) or [] |
| 214 | + if not tool_calls: |
| 215 | + # Final answer |
| 216 | + return ai.content |
| 217 | + |
| 218 | + messages.append(ai) # keep the AI msg that requested tools |
| 219 | + |
| 220 | + # Execute tool calls and append ToolMessage for each |
| 221 | + for tc in tool_calls: |
| 222 | + name = tc["name"] |
| 223 | + args = tc.get("args") or {} |
| 224 | + try: |
| 225 | + # here we call the tool |
| 226 | + result = await self._call_tool(name, args) |
| 227 | + payload = ( |
| 228 | + getattr(result, "data", None) |
| 229 | + or getattr(result, "content", None) |
| 230 | + or str(result) |
| 231 | + ) |
| 232 | + messages.append( |
| 233 | + ToolMessage( |
| 234 | + content=json.dumps(payload), |
| 235 | + # must match the call id |
| 236 | + tool_call_id=tc["id"], |
| 237 | + name=name, |
| 238 | + ) |
| 239 | + ) |
| 240 | + except Exception as e: |
| 241 | + messages.append( |
| 242 | + ToolMessage( |
| 243 | + content=json.dumps({"error": str(e)}), |
| 244 | + tool_call_id=tc["id"], |
| 245 | + name=name, |
| 246 | + ) |
| 247 | + ) |
| 248 | + |
| 249 | + |
| 250 | +# ---- Example CLI usage ---- |
| 251 | +# this code is good for CLI, not Streamlit. See ui_mcp_agent.py |
| 252 | +if __name__ == "__main__": |
| 253 | + QUESTION = "Tell me about Luigi Saetta. I need his e-mail address also." |
| 254 | + agent = asyncio.run(AgentWithMCP.create()) |
| 255 | + print(asyncio.run(agent.answer(QUESTION))) |
0 commit comments