diff --git a/ai/gen-ai-agents/custom-rag-agent/README.md b/ai/gen-ai-agents/custom-rag-agent/README.md index a4dc34c28..addd4e753 100644 --- a/ai/gen-ai-agents/custom-rag-agent/README.md +++ b/ai/gen-ai-agents/custom-rag-agent/README.md @@ -4,7 +4,8 @@ This repository contains the code for the development of a **custom RAG Agent**, based on **OCI Generative AI**, **Oracle 23AI** Vector Store and **LangGraph** **Author**: L. Saetta -**Last updated**: 09/09/2025 + +**Last updated**: 11/09/2025 ## Design and implementation * The agent is implemented using **LangGraph** @@ -25,14 +26,17 @@ For example, links to the documentation' chunks are displayed before the final a ### MCP support: (07/2025) I have added an implementation of an **MCP** server that exposes the Semantic Search feature. -Security can be handled in two ways: +* added a [demo LLM with MCP](./ui_mcp_agent.py) showing how to integrate a generic MCP server in a Chatbot using a LLM. + +**Security** can be handled in two ways: * custom: generate the **JWT token** using the library **PyJWT** * **OCI**: generate the JWT token using **OCI IAM** ## Status -It is **WIP**. +It is always and proudly **WIP**. ## References +For more information: * [Integration with OCI APM](https://luigi-saetta.medium.com/enhancing-observability-in-rag-solutions-with-oracle-cloud-6f93b2675f40) ## Advantages of the Agentic approach @@ -45,5 +49,5 @@ For example, to ensure that final responses do not disclose Personally Identifia * use Python 3.11 * use the requirements.txt * create your config_private.py using the template provided -* for MCP server: create a confidential application in OCI IAM +* for MCP server: create a confidential application in **OCI IAM** to handle JWT tokens. diff --git a/ai/gen-ai-agents/custom-rag-agent/config_private.py b/ai/gen-ai-agents/custom-rag-agent/config_private.py new file mode 100644 index 000000000..1be47da96 --- /dev/null +++ b/ai/gen-ai-agents/custom-rag-agent/config_private.py @@ -0,0 +1,44 @@ +""" +Private config +""" + +# Oracle Vector Store +# VECTOR_DB_USER = "TEST_VECTOR" +# VECTOR_DB_PWD = "LuigiLuigi2025##" +# VECTOR_WALLET_PWD = "welcome1" +# VECTOR_DSN = "aidb_medium" +# VECTOR_WALLET_DIR = "/Users/lsaetta/Progetti/ai_assistant3/WALLET_VECTOR" + +# switched to ATP to avoid invalid LOB locator +VECTOR_DB_USER = "AIUSER" +VECTOR_DB_PWD = "Pennolina23ai&" + +# point to the environment to test NVIDIA llama3.2 embed model +# VECTOR_DB_USER = "NVIDIAUSER" +# VECTOR_DB_PWD = "Pennolina2025&" +VECTOR_WALLET_PWD = "welcome1" +VECTOR_DSN = "aiatp01_medium" +VECTOR_WALLET_DIR = "/Users/lsaetta/Progetti/custom_rag_agent/wallet_atp" + +CONNECT_ARGS = { + "user": VECTOR_DB_USER, + "password": VECTOR_DB_PWD, + "dsn": VECTOR_DSN, + "config_dir": VECTOR_WALLET_DIR, + "wallet_location": VECTOR_WALLET_DIR, + "wallet_password": VECTOR_WALLET_PWD, +} + +# integration with APM +APM_PUBLIC_KEY = "6OXZ45BTT5AHD5KYICGOMLXXAZYTTLGT" + +# to add JWT to MCP server +JWT_SECRET = "oracle-ai" +# using this in the demo, make it simpler. +# In production should switch to RS256 and use a key-pair +JWT_ALGORITHM = "HS256" + +# if using IAM to generate JWT token +OCI_CLIENT_ID = "b51225fce0374a759f615ed264ddd268" +# th ocid of the secret in the vault +SECRET_OCID = "ocid1.vaultsecret.oc1.eu-frankfurt-1.amaaaaaa2xxap7yalre4qru4asevgtxlmn7hwh27awnzmdcrnmsfqu7cia7a" diff --git a/ai/gen-ai-agents/custom-rag-agent/llm_with_mcp.py b/ai/gen-ai-agents/custom-rag-agent/llm_with_mcp.py new file mode 100644 index 000000000..9ab8862e9 --- /dev/null +++ b/ai/gen-ai-agents/custom-rag-agent/llm_with_mcp.py @@ -0,0 +1,255 @@ +""" +Test LLM and MCP +Based on fastmcp library. +This one provide also support for security in MCP calls, using JWT token. + +This is the backend for the Streamlit MCP UI. +""" + +import json +import asyncio +from typing import List, Dict, Any, Callable, Sequence, Optional + +from fastmcp import Client as MCPClient +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage + +# our code imports +from oci_jwt_client import OCIJWTClient +from oci_models import get_llm +from utils import get_console_logger +from config import IAM_BASE_URL, ENABLE_JWT_TOKEN +from config_private import SECRET_OCID +from mcp_servers_config import MCP_SERVERS_CONFIG + +logger = get_console_logger() + +# ---- Config ---- +MAX_HISTORY = 10 +MCP_URL = MCP_SERVERS_CONFIG["default"]["url"] +TIMEOUT = 60 +# the scope for the JWT token +SCOPE = "urn:opc:idm:__myscopes__" + +# eventually you can taylor the SYSTEM prompt here +SYSTEM_PROMPT = """You are an AI assistant equipped with an MCP server and several tools. +Provide all the needed information with a detailed query when you use a tool. +If the collection name is not provided in the user's prompt, +use the collection BOOKS to get the additional information you need to answer. +""" + + +def default_jwt_supplier() -> str: + """ + Get a valid JWT token to make the call to MCP server + """ + if ENABLE_JWT_TOKEN: + # Always return a FRESH token; do not include "Bearer " (FastMCP adds it) + token, _, _ = OCIJWTClient(IAM_BASE_URL, SCOPE, SECRET_OCID).get_token() + else: + # JWT security disabled + token = None + return token + + +class AgentWithMCP: + """ + LLM + MCP orchestrator. + - Discovers tools from an MCP server (JWT-protected) + - Binds tool JSON Schemas to the LLM + - Executes tool calls emitted by the LLM and loops until completion + + This is a rather simple agent, it does only tool calling, + but tools are provided by the MCP server. + The code introspects the MCP server and decide which tool to call + and what parameters to provide. + """ + + def __init__( + self, + mcp_url: str, + jwt_supplier: Callable[[], str], + timeout: int, + llm, + ): + self.mcp_url = mcp_url + self.jwt_supplier = jwt_supplier + self.timeout = timeout + self.llm = llm + self.model_with_tools = None + # optional: cache tools to avoid re-listing every run + self._tools_cache = None + + # ---------- helpers now INSIDE the class ---------- + + @staticmethod + def _tool_to_schema(t: object) -> dict: + """ + Convert an MCP tool (name, description, inputSchema) to a JSON-Schema dict + that LangChain's ChatCohere.bind_tools accepts (top-level schema). + """ + input_schema = (getattr(t, "inputSchema", None) or {}).copy() + if input_schema.get("type") != "object": + input_schema.setdefault("type", "object") + input_schema.setdefault("properties", {}) + return { + "title": getattr(t, "name", "tool"), + "description": getattr(t, "description", "") or "", + **input_schema, + } + + async def _list_tools(self): + """ + Fetch tools from the MCP server using FastMCP. Must be async. + """ + jwt = self.jwt_supplier() + + logger.info("Listing tools from %s ...", self.mcp_url) + + # FastMCP requires async context + await for client ops. + async with MCPClient(self.mcp_url, auth=jwt, timeout=self.timeout) as c: + # returns Tool objects + return await c.list_tools() + + async def _call_tool(self, name: str, args: Dict[str, Any]): + """ + Execute a single MCP tool call. + """ + jwt = self.jwt_supplier() + logger.info("Calling MCP tool '%s' with args %s", name, args) + async with MCPClient(self.mcp_url, auth=jwt, timeout=self.timeout) as c: + return await c.call_tool(name, args or {}) + + @classmethod + async def create( + cls, + mcp_url: str = MCP_URL, + jwt_supplier: Callable[[], str] = default_jwt_supplier, + timeout: int = TIMEOUT, + model_id: str = "cohere.command-a-03-2025", + ): + """ + Async factory: fetch tools, bind them to the LLM, return a ready-to-use agent. + Important: Avoids doing awaits in __init__. + """ + # should return a LangChain Chat model supporting .bind_tools(...) + llm = get_llm(model_id=model_id) + # after, we call init() + self = cls(mcp_url, jwt_supplier, timeout, llm) + + tools = await self._list_tools() + if not tools: + logger.warning("No tools discovered at %s", mcp_url) + self._tools_cache = tools + + schemas = [self._tool_to_schema(t) for t in tools] + self.model_with_tools = self.llm.bind_tools(schemas) + return self + + def _build_messages( + self, + history: Sequence[Dict[str, Any]], + system_prompt: str, + current_user_prompt: str, + *, + max_history: Optional[ + int + ] = MAX_HISTORY, # keep only the last N items; None = keep all + exclude_last: bool = True, # drop the very last history entry before building + ) -> List[Any]: + """ + Create: [SystemMessage(system_prompt), , + HumanMessage(current_user_prompt)] + History items are dicts like {"role": "user"|"assistant", "content": "..."} + in chronological order. + """ + # 1) Trim to the last `max_history` entries (if set) + if max_history is not None and max_history > 0: + working = list(history[-max_history:]) + else: + working = list(history) + + # 2) Optionally remove the final entry from trimmed history + if exclude_last and working: + working = working[:-1] + + # 3) Build LangChain messages + msgs: List[Any] = [SystemMessage(content=system_prompt)] + for m in working: + role = (m.get("role") or "").lower() + content: Optional[str] = m.get("content") + if not content: + continue + if role == "user": + msgs.append(HumanMessage(content=content)) + elif role == "assistant": + msgs.append(AIMessage(content=content)) + # ignore other/unknown roles (e.g., 'system', 'tool') in this simple variant + + # 4) Add the current user prompt + msgs.append(HumanMessage(content=current_user_prompt)) + return msgs + + # + # ---------- main loop ---------- + # + async def answer(self, question: str, history: list = None) -> str: + """ + Run the LLM+MCP loop until the model stops calling tools. + """ + messages = self._build_messages( + history=history, + system_prompt=SYSTEM_PROMPT, + current_user_prompt=question, + ) + + # List[Any] = [ + # SystemMessage(content=SYSTEM_PROMPT), + # HumanMessage(content=question), + # ] + + while True: + ai: AIMessage = await self.model_with_tools.ainvoke(messages) + + tool_calls = getattr(ai, "tool_calls", None) or [] + if not tool_calls: + # Final answer + return ai.content + + messages.append(ai) # keep the AI msg that requested tools + + # Execute tool calls and append ToolMessage for each + for tc in tool_calls: + name = tc["name"] + args = tc.get("args") or {} + try: + # here we call the tool + result = await self._call_tool(name, args) + payload = ( + getattr(result, "data", None) + or getattr(result, "content", None) + or str(result) + ) + messages.append( + ToolMessage( + content=json.dumps(payload), + # must match the call id + tool_call_id=tc["id"], + name=name, + ) + ) + except Exception as e: + messages.append( + ToolMessage( + content=json.dumps({"error": str(e)}), + tool_call_id=tc["id"], + name=name, + ) + ) + + +# ---- Example CLI usage ---- +# this code is good for CLI, not Streamlit. See ui_mcp_agent.py +if __name__ == "__main__": + QUESTION = "Tell me about Luigi Saetta. I need his e-mail address also." + agent = asyncio.run(AgentWithMCP.create()) + print(asyncio.run(agent.answer(QUESTION))) diff --git a/ai/gen-ai-agents/custom-rag-agent/mcp_explorer.py b/ai/gen-ai-agents/custom-rag-agent/mcp_explorer.py index 2dcf22643..8d80b4cfd 100644 --- a/ai/gen-ai-agents/custom-rag-agent/mcp_explorer.py +++ b/ai/gen-ai-agents/custom-rag-agent/mcp_explorer.py @@ -12,6 +12,7 @@ from utils import get_console_logger from config import DEBUG, ENABLE_JWT_TOKEN, IAM_BASE_URL from config_private import SECRET_OCID +from mcp_servers_config import MCP_SERVERS_CONFIG # the scope for the JWT token SCOPE = "urn:opc:idm:__myscopes__" @@ -22,7 +23,7 @@ st.title("🚀 MCP Tool Explorer") # Config -DEFAULT_URL = "http://localhost:9000/mcp/" +DEFAULT_URL = MCP_SERVERS_CONFIG["default"]["url"] server_url = st.text_input("URL MCP:", DEFAULT_URL) TIMEOUT = 30 @@ -35,7 +36,7 @@ async def fetch_tools(): """ - This function call the MCP sevrer to get list and descriptions of tools + This function call the MCP server to get list and descriptions of tools """ if ENABLE_JWT_TOKEN: # this is a client to OCI IAM to get the JWT token diff --git a/ai/gen-ai-agents/custom-rag-agent/mcp_semantic_search_with_iam.py b/ai/gen-ai-agents/custom-rag-agent/mcp_semantic_search_with_iam.py index 0c75e7ad6..f1783f34f 100644 --- a/ai/gen-ai-agents/custom-rag-agent/mcp_semantic_search_with_iam.py +++ b/ai/gen-ai-agents/custom-rag-agent/mcp_semantic_search_with_iam.py @@ -109,7 +109,7 @@ def semantic_search( Perform a semantic search based on the provided query. Args: query (str): The search query. - top_k (int): The number of top results to return. + top_k (int): The number of top results to return. Must be at least 5. collection_name (str): The name of the collection (DB table) to search in. Returns: dict: a dictionary containing the relevant documents. diff --git a/ai/gen-ai-agents/custom-rag-agent/mcp_servers_config.py b/ai/gen-ai-agents/custom-rag-agent/mcp_servers_config.py new file mode 100644 index 000000000..dfeb2bbfc --- /dev/null +++ b/ai/gen-ai-agents/custom-rag-agent/mcp_servers_config.py @@ -0,0 +1,14 @@ +""" +MCP server config +""" + +MCP_SERVERS_CONFIG = { + "default": { + "transport": "streamable_http", + "url": "http://localhost:9000/mcp/", + }, + "oci-semantic-search": { + "transport": "streamable_http", + "url": "http://localhost:9000/mcp/", + }, +} diff --git a/ai/gen-ai-agents/custom-rag-agent/requirements.txt b/ai/gen-ai-agents/custom-rag-agent/requirements.txt index bb97b6976..ed8aebc1e 100644 --- a/ai/gen-ai-agents/custom-rag-agent/requirements.txt +++ b/ai/gen-ai-agents/custom-rag-agent/requirements.txt @@ -89,6 +89,7 @@ lab==8.4 langchain==0.3.26 langchain-community==0.3.26 langchain-core==0.3.66 +langchain-mcp-adapters==0.1.9 langchain-text-splitters==0.3.8 langchain-unstructured==0.1.6 langdetect==1.0.9 @@ -141,7 +142,7 @@ platformdirs==4.3.6 prometheus_client==0.21.1 prompt_toolkit==3.0.50 propcache==0.3.0 -protobuf==5.29.5 +protobuf==5.29.3 psutil==7.0.0 ptyprocess==0.7.0 pure_eval==0.2.3 @@ -191,7 +192,7 @@ soupsieve==2.6 SQLAlchemy==2.0.38 sse-starlette==2.3.6 stack-data==0.6.3 -starlette==0.47.2 +starlette==0.46.2 streamlit==1.43.0 sympy==1.14.0 tabulate==0.9.0 @@ -201,7 +202,7 @@ tinycss2==1.4.0 tokenize_rt==6.1.0 toml==0.10.2 tomlkit==0.13.2 -tornado==6.5 +tornado==6.4.2 tqdm==4.67.1 traitlets==5.14.3 txt2tags==3.9 @@ -209,7 +210,7 @@ typer==0.16.0 types-python-dateutil==2.9.0.20241206 typing-inspect==0.9.0 typing-inspection==0.4.1 -typing_extensions==4.12.2 +typing_extensions==4.15.0 tzdata==2025.1 unstructured==0.18.2 unstructured-client==0.37.2 diff --git a/ai/gen-ai-agents/custom-rag-agent/ui_mcp_agent.py b/ai/gen-ai-agents/custom-rag-agent/ui_mcp_agent.py new file mode 100644 index 000000000..072e80d59 --- /dev/null +++ b/ai/gen-ai-agents/custom-rag-agent/ui_mcp_agent.py @@ -0,0 +1,95 @@ +""" +Streamlit UI for MCP servers +""" + +import asyncio +import streamlit as st +from mcp_servers_config import MCP_SERVERS_CONFIG + +# this one contains the backend and the test code only for console +from llm_with_mcp import AgentWithMCP, default_jwt_supplier + +# ---------- Page setup ---------- +st.set_page_config(page_title="MCP UI", page_icon="🛠️", layout="wide") +st.title("🛠️ LLM powered by MCP") + +# ---------- Sidebar: connection settings ---------- +with st.sidebar: + st.header("Connection") + mcp_url = st.text_input("MCP URL", value=MCP_SERVERS_CONFIG["default"]["url"]) + model_id = st.selectbox("Model", ["cohere.command-a-03-2025"], index=0) + timeout = st.number_input( + "Timeout (s)", min_value=5, max_value=300, value=60, step=5 + ) + + st.caption("JWT will be fetched on each call via default_jwt_supplier()") + connect = st.button("🔌 Connect / Reload tools", use_container_width=True) + +# ---------- Session state ---------- +if "agent" not in st.session_state: + st.session_state.agent = None +if "chat" not in st.session_state: + # list of {"role": "user"|"assistant", "content": str} + st.session_state.chat = [] + +# ---------- Connect / reload ---------- +if connect: + with st.spinner("Connecting to MCP server and loading tools…"): + try: + # Create an agent (async factory) and cache it in session_state + st.session_state.agent = asyncio.run( + AgentWithMCP.create( + mcp_url=mcp_url, + # returns a fresh raw JWT + jwt_supplier=default_jwt_supplier, + timeout=timeout, + model_id=model_id, + ) + ) + st.success("Connected. Tools loaded.") + except Exception as e: + st.session_state.agent = None + st.error(f"Failed to connect: {e}") + +# ---------- Chat history (display) ---------- +for msg in st.session_state.chat: + with st.chat_message("user" if msg["role"] == "user" else "assistant"): + st.write(msg["content"]) + +# ---------- Input box ---------- +prompt = st.chat_input("Ask your question…") + +if prompt: + # Show the user message immediately + st.session_state.chat.append({"role": "user", "content": prompt}) + with st.chat_message("user"): + st.write(prompt) + + if st.session_state.agent is None: + st.warning( + "Not connected. Click ‘Connect / Reload tools’ in the sidebar first." + ) + else: + with st.chat_message("assistant"): + with st.spinner("Thinking with MCP tools…"): + try: + answer = asyncio.run( + # we pass also the history (chat) + st.session_state.agent.answer(prompt, st.session_state.chat) + ) + except Exception as e: + answer = f"Error: {e}" + st.write(answer) + st.session_state.chat.append({"role": "assistant", "content": answer}) + +# ---------- Optional: the small debug panel in the bottom ---------- +with st.expander("🔎 Debug / State"): + st.json( + { + "connected": st.session_state.agent is not None, + "messages_in_memory": len(st.session_state.chat), + "mcp_url": mcp_url, + "model_id": model_id, + "timeout": timeout, + } + )