|
| 1 | +from typing import Any, Callable, Dict, List, Optional |
| 2 | + |
| 3 | +from mcp.server.fastmcp import FastMCP |
| 4 | + |
| 5 | +# Initialize the MCP server |
| 6 | +mcp = FastMCP("Self-Extending MCP Server") |
| 7 | + |
| 8 | + |
| 9 | +# Tool registry to track dynamically added tools |
| 10 | +class ToolRegistry: |
| 11 | + def __init__(self): |
| 12 | + self.tools = {} # name -> function |
| 13 | + self.metadata = {} # name -> metadata |
| 14 | + |
| 15 | + def register(self, name: str, func: Callable, description: str): |
| 16 | + """Register a new tool in the registry.""" |
| 17 | + self.tools[name] = func |
| 18 | + self.metadata[name] = {"name": name, "description": description} |
| 19 | + |
| 20 | + def get_tool(self, name: str) -> Optional[Callable]: |
| 21 | + """Get a tool by name.""" |
| 22 | + return self.tools.get(name) |
| 23 | + |
| 24 | + def get_metadata(self, name: str) -> Optional[Dict]: |
| 25 | + """Get tool metadata by name.""" |
| 26 | + return self.metadata.get(name) |
| 27 | + |
| 28 | + def list_tools(self) -> List[Dict]: |
| 29 | + """List all registered tools.""" |
| 30 | + return [self.metadata[name] for name in sorted(self.tools.keys())] |
| 31 | + |
| 32 | + def has_tool(self, name: str) -> bool: |
| 33 | + """Check if a tool exists.""" |
| 34 | + return name in self.tools |
| 35 | + |
| 36 | + |
| 37 | +# Create the registry |
| 38 | +registry = ToolRegistry() |
| 39 | + |
| 40 | + |
| 41 | +# Core functionality: List available tools |
| 42 | +@mcp.tool() |
| 43 | +def get_tools() -> Dict[str, Any]: |
| 44 | + """Get a list of all available tools in the MCP server. |
| 45 | +
|
| 46 | + Returns: |
| 47 | + Dictionary with list of available tools and their metadata |
| 48 | + """ |
| 49 | + try: |
| 50 | + tools = registry.list_tools() |
| 51 | + |
| 52 | + # Add the built-in tools |
| 53 | + built_in_tools = ["get_tools", "add_tool", "call_tool"] |
| 54 | + |
| 55 | + # Combine built-in tools with dynamic tools |
| 56 | + all_tools = [{"name": tool, "built_in": True} for tool in built_in_tools] |
| 57 | + for tool in tools: |
| 58 | + all_tools.append( |
| 59 | + { |
| 60 | + "name": tool["name"], |
| 61 | + "description": tool["description"], |
| 62 | + "built_in": False, |
| 63 | + } |
| 64 | + ) |
| 65 | + |
| 66 | + return {"status": "success", "tools": all_tools} |
| 67 | + except Exception as e: |
| 68 | + return {"status": "error", "message": str(e)} |
| 69 | + |
| 70 | + |
| 71 | +# Core functionality: Add a new tool |
| 72 | +@mcp.tool() |
| 73 | +def add_tool(name: str, code: str, description: str) -> Dict[str, Any]: |
| 74 | + """Add a new tool to the MCP server. |
| 75 | +
|
| 76 | + Args: |
| 77 | + name: Name of the tool |
| 78 | + code: Python code implementing the tool function |
| 79 | + description: Description of what the tool does |
| 80 | +
|
| 81 | + Returns: |
| 82 | + Dictionary with operation status |
| 83 | + """ |
| 84 | + try: |
| 85 | + # Check if tool already exists |
| 86 | + if registry.has_tool(name) or hasattr(mcp, name): |
| 87 | + return {"status": "error", "message": f"Tool '{name}' already exists"} |
| 88 | + |
| 89 | + # Validate the code |
| 90 | + try: |
| 91 | + # Add the tool function to the global namespace |
| 92 | + namespace = {} |
| 93 | + exec(code, namespace) |
| 94 | + |
| 95 | + # Get the function |
| 96 | + if name not in namespace: |
| 97 | + return { |
| 98 | + "status": "error", |
| 99 | + "message": f"Function '{name}' not found in the provided code", |
| 100 | + } |
| 101 | + |
| 102 | + func = namespace[name] |
| 103 | + |
| 104 | + # Check if it's a function |
| 105 | + if not callable(func): |
| 106 | + return { |
| 107 | + "status": "error", |
| 108 | + "message": f"'{name}' is not a callable function", |
| 109 | + } |
| 110 | + |
| 111 | + # Register the tool with our registry |
| 112 | + registry.register(name, func, description) |
| 113 | + |
| 114 | + return {"status": "success", "message": f"Tool '{name}' added successfully"} |
| 115 | + |
| 116 | + except SyntaxError as e: |
| 117 | + return { |
| 118 | + "status": "error", |
| 119 | + "message": f"Syntax error in tool code: {str(e)}", |
| 120 | + } |
| 121 | + except Exception as e: |
| 122 | + return {"status": "error", "message": f"Error creating tool: {str(e)}"} |
| 123 | + |
| 124 | + except Exception as e: |
| 125 | + return {"status": "error", "message": str(e)} |
| 126 | + |
| 127 | + |
| 128 | +# Core functionality: Call a tool |
| 129 | +@mcp.tool() |
| 130 | +def call_tool(name: str, args: Dict[str, Any]) -> Dict[str, Any]: |
| 131 | + """Call a registered tool with the given arguments. |
| 132 | +
|
| 133 | + Args: |
| 134 | + name: Name of the tool to call |
| 135 | + args: Dictionary of arguments to pass to the tool |
| 136 | +
|
| 137 | + Returns: |
| 138 | + Dictionary with the tool's response |
| 139 | + """ |
| 140 | + try: |
| 141 | + # Check if it's a built-in tool |
| 142 | + if name in ["get_tools", "add_tool", "call_tool"]: |
| 143 | + return { |
| 144 | + "status": "error", |
| 145 | + "message": f"Cannot call built-in tool '{name}' using call_tool", |
| 146 | + } |
| 147 | + |
| 148 | + # Get the tool |
| 149 | + tool = registry.get_tool(name) |
| 150 | + |
| 151 | + if not tool: |
| 152 | + return {"status": "error", "message": f"Tool '{name}' not found"} |
| 153 | + |
| 154 | + # Call the tool with the provided arguments |
| 155 | + try: |
| 156 | + result = tool(**args) |
| 157 | + return result |
| 158 | + except TypeError as e: |
| 159 | + # Likely an argument mismatch |
| 160 | + return { |
| 161 | + "status": "error", |
| 162 | + "message": f"Argument error calling tool '{name}': {str(e)}", |
| 163 | + } |
| 164 | + except Exception as e: |
| 165 | + return { |
| 166 | + "status": "error", |
| 167 | + "message": f"Error calling tool '{name}': {str(e)}", |
| 168 | + } |
| 169 | + |
| 170 | + except Exception as e: |
| 171 | + return {"status": "error", "message": str(e)} |
| 172 | + |
| 173 | +# Run the server when the script is executed |
| 174 | +if __name__ == "__main__": |
| 175 | + mcp.run() |
0 commit comments