diff --git a/agents/agent.py b/agents/agent.py index 2b89334..dfe0ba1 100644 --- a/agents/agent.py +++ b/agents/agent.py @@ -69,6 +69,9 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'auth_server')) from cognito_utils import generate_token +# Import the httpx patch context manager +from httpx_patch import httpx_mount_path_patch + # Configure logging with basicConfig logging.basicConfig( level=logging.INFO, # Set the log level to INFO @@ -352,7 +355,9 @@ async def invoke_mcp_tool(mcp_registry_url: str, server_name: str, tool_name: st # Create the server URL by joining the base URL with the server name and sse path server_url = urljoin(base_url + '/', f"{server_name}/sse") - print(f"Server URL: {server_url}") + logger.info(f"invoke_mcp_tool, Server URL: {server_url}") + + # Use context manager to apply httpx monkey patch # Prepare headers based on authentication method headers = { @@ -379,23 +384,24 @@ async def invoke_mcp_tool(mcp_registry_url: str, server_name: str, tool_name: st } try: - # Create an MCP SSE client and call the tool with authentication headers - #print(f"Connecting to MCP server: {server_url}, headers: {redacted_headers}") - logger.info(f"Connecting to MCP server: {server_url}, headers: {redacted_headers}") - async with mcp.client.sse.sse_client(server_url, headers=headers) as (read, write): - async with mcp.ClientSession(read, write, sampling_callback=None) as session: - # Initialize the connection - await session.initialize() - - # Call the specified tool with the provided arguments - result = await session.call_tool(tool_name, arguments=arguments) - - # Format the result as a string - response = "" - for r in result.content: - response += r.text + "\n" - - return response.strip() + # Use context manager to apply httpx monkey patch and create MCP client + async with httpx_mount_path_patch(server_url): + # Create an MCP SSE client and call the tool with authentication headers + logger.info(f"invoke_mcp_tool, Connecting to MCP server: {server_url}, headers: {redacted_headers}") + async with mcp.client.sse.sse_client(server_url, headers=headers) as (read, write): + async with mcp.ClientSession(read, write, sampling_callback=None) as session: + # Initialize the connection + await session.initialize() + + # Call the specified tool with the provided arguments + result = await session.call_tool(tool_name, arguments=arguments) + + # Format the result as a string + response = "" + for r in result.content: + response += r.text + "\n" + + return response.strip() except Exception as e: return f"Error invoking MCP tool: {str(e)}" @@ -408,59 +414,6 @@ def redact_sensitive_value(value: str, show_chars: int = 4) -> str: return "*" * len(value) if value else "" return value[:show_chars] + "*" * (len(value) - show_chars) -def normalize_sse_endpoint_url_for_request(url_str: str, original_sse_url: str) -> str: - """ - Normalize URLs in HTTP requests by preserving mount paths for non-mounted servers. - - This function only applies fixes when the request is for the same server as the original SSE URL. - It should NOT modify requests to different servers (like currenttime, fininfo, etc.) - - Example: - - Original SSE: http://localhost/mcpgw2/sse - - Request to same server: http://localhost/messages/?session_id=123 -> http://localhost/mcpgw2/messages/?session_id=123 - - Request to different server: http://localhost/currenttime/messages/?session_id=123 -> unchanged (already correct) - """ - if '/messages/' not in url_str: - return url_str - - # Parse the original SSE URL to extract the base path - from urllib.parse import urlparse - parsed_original = urlparse(original_sse_url) - parsed_current = urlparse(url_str) - - # Only apply fixes if this is the same host/port as the original SSE URL - if parsed_current.netloc != parsed_original.netloc: - return url_str - - original_path = parsed_original.path - - # Remove /sse from the original path to get the base mount path - if original_path.endswith('/sse'): - base_mount_path = original_path[:-4] # Remove '/sse' - else: - base_mount_path = original_path - - # Only apply the fix if: - # 1. There is a base mount path (non-empty) - # 2. The current path is exactly /messages/... (indicating it's missing the mount path) - # 3. The current path doesn't already contain a mount path - if (base_mount_path and - parsed_current.path.startswith('/messages/') and - not parsed_current.path.startswith(base_mount_path)): - - # The mount path is missing, we need to add it back - # Reconstruct the URL with the mount path - new_path = base_mount_path + parsed_current.path - fixed_url = f"{parsed_current.scheme}://{parsed_current.netloc}{new_path}" - if parsed_current.query: - fixed_url += f"?{parsed_current.query}" - if parsed_current.fragment: - fixed_url += f"#{parsed_current.fragment}" - - logger.debug(f"Fixed mount path in request URL: {url_str} -> {fixed_url}") - return fixed_url - - return url_str def load_system_prompt(): """ @@ -656,117 +609,91 @@ async def main(): redacted_headers[k] = v logger.info(f"Using authentication headers: {redacted_headers}") - # Apply monkey patch to fix mount path issues in httpx requests - # This fixes the issue where non-mounted servers with default paths lose their mount path - # in POST requests to /messages/ endpoints - original_request = httpx.AsyncClient.request - - async def patched_request(self, method, url, **kwargs): - # Fix mount path issues in requests - if isinstance(url, str) and '/messages/' in url: - url = normalize_sse_endpoint_url_for_request(url, server_url) - elif hasattr(url, '__str__') and '/messages/' in str(url): - url = normalize_sse_endpoint_url_for_request(str(url), server_url) - return await original_request(self, method, url, **kwargs) - - # Apply the patch - httpx.AsyncClient.request = patched_request - logger.info("Applied httpx monkey patch to fix mount path issues") - - try: - # Initialize MCP client with the server configuration and authentication headers - client = MultiServerMCPClient( - { - "mcp_registry": { - "url": server_url, - "transport": "sse", - "headers": auth_headers + # Use context manager to apply httpx monkey patch + async with httpx_mount_path_patch(server_url): + # Initialize MCP client with the server configuration and authentication headers + client = MultiServerMCPClient( + { + "mcp_registry": { + "url": server_url, + "transport": "sse", + "headers": auth_headers + } } - } - ) - logger.info("Connected to MCP server successfully with authentication, server_url: " + server_url) - - # Get available tools from MCP and display them - mcp_tools = await client.get_tools() - logger.info(f"Available MCP tools: {[tool.name for tool in mcp_tools]}") - - # Add the calculator and invoke_mcp_tool to the tools array - # The invoke_mcp_tool function already supports authentication parameters - all_tools = [calculator, invoke_mcp_tool] + mcp_tools - logger.info(f"All available tools: {[tool.name if hasattr(tool, 'name') else tool.__name__ for tool in all_tools]}") - - # Create the agent with the model and all tools - agent = create_react_agent( - model, - all_tools - ) - - # Load and format the system prompt with the current time and MCP registry URL - system_prompt_template = load_system_prompt() - - # Prepare authentication parameters for system prompt - if args.use_session_cookie: - system_prompt = system_prompt_template.format( - current_utc_time=current_utc_time, - mcp_registry_url=args.mcp_registry_url, - auth_token='', # Not used for session cookie auth - user_pool_id=args.user_pool_id or '', - client_id=args.client_id or '', - region=args.region or 'us-east-1', - auth_method=auth_method, - session_cookie=session_cookie ) - else: - system_prompt = system_prompt_template.format( - current_utc_time=current_utc_time, - mcp_registry_url=args.mcp_registry_url, - auth_token=access_token, - user_pool_id=args.user_pool_id, - client_id=args.client_id, - region=args.region, - auth_method=auth_method, - session_cookie='' # Not used for M2M auth + logger.info("Connected to MCP server successfully with authentication, server_url: " + server_url) + + # Get available tools from MCP and display them + mcp_tools = await client.get_tools() + logger.info(f"Available MCP tools: {[tool.name for tool in mcp_tools]}") + + # Add the calculator and invoke_mcp_tool to the tools array + # The invoke_mcp_tool function already supports authentication parameters + all_tools = [calculator, invoke_mcp_tool] + mcp_tools + logger.info(f"All available tools: {[tool.name if hasattr(tool, 'name') else tool.__name__ for tool in all_tools]}") + + # Create the agent with the model and all tools + agent = create_react_agent( + model, + all_tools ) - - # Format the message with system message first - formatted_messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": args.message} - ] - - logger.info("\nInvoking agent...\n" + "-"*40) - - # Invoke the agent with the formatted messages - response = await agent.ainvoke({"messages": formatted_messages}) - - logger.info("\nResponse:" + "\n" + "-"*40) - #print(response) - print_agent_response(response) - - # Process and display the response - if response and "messages" in response and response["messages"]: - # Get the last message from the response - last_message = response["messages"][-1] - if isinstance(last_message, dict) and "content" in last_message: - # Display the content of the response - print(last_message["content"]) + # Load and format the system prompt with the current time and MCP registry URL + system_prompt_template = load_system_prompt() + + # Prepare authentication parameters for system prompt + if args.use_session_cookie: + system_prompt = system_prompt_template.format( + current_utc_time=current_utc_time, + mcp_registry_url=args.mcp_registry_url, + auth_token='', # Not used for session cookie auth + user_pool_id=args.user_pool_id or '', + client_id=args.client_id or '', + region=args.region or 'us-east-1', + auth_method=auth_method, + session_cookie=session_cookie + ) else: - print(str(last_message.content)) - else: - print("No valid response received") + system_prompt = system_prompt_template.format( + current_utc_time=current_utc_time, + mcp_registry_url=args.mcp_registry_url, + auth_token=access_token, + user_pool_id=args.user_pool_id, + client_id=args.client_id, + region=args.region, + auth_method=auth_method, + session_cookie='' # Not used for M2M auth + ) + + # Format the message with system message first + formatted_messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": args.message} + ] + + logger.info("\nInvoking agent...\n" + "-"*40) - finally: - # Restore original httpx behavior - httpx.AsyncClient.request = original_request - logger.info("Restored original httpx behavior") + # Invoke the agent with the formatted messages + response = await agent.ainvoke({"messages": formatted_messages}) + + logger.info("\nResponse:" + "\n" + "-"*40) + #print(response) + print_agent_response(response) + + # Process and display the response + if response and "messages" in response and response["messages"]: + # Get the last message from the response + last_message = response["messages"][-1] + + if isinstance(last_message, dict) and "content" in last_message: + # Display the content of the response + print(last_message["content"]) + else: + print(str(last_message.content)) + else: + print("No valid response received") except Exception as e: - # Restore original httpx behavior in case of error - try: - httpx.AsyncClient.request = original_request - except NameError: - pass # original_request might not be defined if error occurred before monkey patch print(f"Error: {str(e)}") import traceback print(traceback.format_exc()) diff --git a/agents/httpx_patch.py b/agents/httpx_patch.py new file mode 100644 index 0000000..78b6e4d --- /dev/null +++ b/agents/httpx_patch.py @@ -0,0 +1,112 @@ +""" +HTTPX Monkey Patch Context Manager + +This module provides a context manager for applying and restoring httpx monkey patches +to fix mount path issues in MCP SSE requests. +""" + +import httpx +import logging +from contextlib import asynccontextmanager +from typing import AsyncGenerator + +logger = logging.getLogger(__name__) + + +def normalize_sse_endpoint_url_for_request(url_str: str, original_sse_url: str) -> str: + """ + Normalize URLs in HTTP requests by preserving mount paths for non-mounted servers. + + This function only applies fixes when the request is for the same server as the original SSE URL. + It should NOT modify requests to different servers (like currenttime, fininfo, etc.) + + Example: + - Original SSE: http://localhost/mcpgw2/sse + - Request to same server: http://localhost/messages/?session_id=123 -> http://localhost/mcpgw2/messages/?session_id=123 + - Request to different server: http://localhost/currenttime/messages/?session_id=123 -> unchanged (already correct) + """ + if '/messages/' not in url_str: + return url_str + + # Parse the original SSE URL to extract the base path + from urllib.parse import urlparse + parsed_original = urlparse(original_sse_url) + parsed_current = urlparse(url_str) + + # Only apply fixes if this is the same host/port as the original SSE URL + if parsed_current.netloc != parsed_original.netloc: + return url_str + + original_path = parsed_original.path + + # Remove /sse from the original path to get the base mount path + if original_path.endswith('/sse'): + base_mount_path = original_path[:-4] # Remove '/sse' + else: + base_mount_path = original_path + + # Only apply the fix if: + # 1. There is a base mount path (non-empty) + # 2. The current path is exactly /messages/... (indicating it's missing the mount path) + # 3. The current path doesn't already contain a mount path + if (base_mount_path and + parsed_current.path.startswith('/messages/') and + not parsed_current.path.startswith(base_mount_path)): + + # The mount path is missing, we need to add it back + # Reconstruct the URL with the mount path + new_path = base_mount_path + parsed_current.path + fixed_url = f"{parsed_current.scheme}://{parsed_current.netloc}{new_path}" + if parsed_current.query: + fixed_url += f"?{parsed_current.query}" + if parsed_current.fragment: + fixed_url += f"#{parsed_current.fragment}" + + logger.debug(f"Fixed mount path in request URL: {url_str} -> {fixed_url}") + return fixed_url + + return url_str + + +@asynccontextmanager +async def httpx_mount_path_patch(server_url: str) -> AsyncGenerator[None, None]: + """ + Context manager that applies httpx monkey patch to fix mount path issues. + + This patches httpx.AsyncClient.request to normalize SSE endpoint URLs + for requests that are missing mount paths. + + Args: + server_url: The original SSE server URL to use for normalization + + Usage: + async with httpx_mount_path_patch(server_url): + # Your code that makes httpx requests + pass + """ + # Store the original request method + original_request = httpx.AsyncClient.request + + async def patched_request(self, method, url, **kwargs): + """Patched request method that fixes mount path issues""" + logger.debug(f"patched_request: {method} {url}") + + # Fix mount path issues in requests + if isinstance(url, str) and '/messages/' in url: + logger.debug(f"Normalizing SSE endpoint URL: {url} -> server_url: {server_url}") + url = normalize_sse_endpoint_url_for_request(url, server_url) + elif hasattr(url, '__str__') and '/messages/' in str(url): + logger.debug(f"Normalizing SSE endpoint URL (str): {url} -> server_url: {server_url}") + url = normalize_sse_endpoint_url_for_request(str(url), server_url) + + return await original_request(self, method, url, **kwargs) + + try: + # Apply the patch + httpx.AsyncClient.request = patched_request + logger.info("Applied httpx monkey patch to fix mount path issues") + yield + finally: + # Restore original behavior + httpx.AsyncClient.request = original_request + logger.info("Restored original httpx behavior") \ No newline at end of file diff --git a/build_and_run.sh b/build_and_run.sh index 97103a4..f2ff9e1 100755 --- a/build_and_run.sh +++ b/build_and_run.sh @@ -42,6 +42,39 @@ log "Stopping existing services (if any)..." docker-compose down --remove-orphans || log "No existing services to stop" log "Existing services stopped" +# Clean up FAISS index files to force registry to recreate them +log "Cleaning up FAISS index files..." +MCPGATEWAY_SERVERS_DIR="/opt/mcpgateway/servers" +FAISS_FILES=("service_index.faiss" "service_index_metadata.json") + +for file in "${FAISS_FILES[@]}"; do + file_path="$MCPGATEWAY_SERVERS_DIR/$file" + if [ -f "$file_path" ]; then + rm -f "$file_path" + log "Deleted $file_path" + else + log "$file not found (already clean)" + fi +done +log "FAISS index cleanup completed" + +# Copy JSON files from registry/servers to /opt/mcpgateway/servers +log "Copying JSON files from registry/servers to $MCPGATEWAY_SERVERS_DIR..." +if [ -d "registry/servers" ]; then + # Create the target directory if it doesn't exist + sudo mkdir -p "$MCPGATEWAY_SERVERS_DIR" + + # Copy all JSON files + if ls registry/servers/*.json 1> /dev/null 2>&1; then + sudo cp registry/servers/*.json "$MCPGATEWAY_SERVERS_DIR/" + log "JSON files copied successfully" + else + log "No JSON files found in registry/servers" + fi +else + log "WARNING: registry/servers directory not found" +fi + # Generate a random SECRET_KEY if not already in .env if ! grep -q "SECRET_KEY=" .env || grep -q "SECRET_KEY=$" .env || grep -q "SECRET_KEY=\"\"" .env; then log "Generating SECRET_KEY..." diff --git a/registry/servers/server_state.json b/registry/servers/server_state.json index f25495c..a261499 100644 --- a/registry/servers/server_state.json +++ b/registry/servers/server_state.json @@ -1,3 +1,6 @@ { - "/knowledge-not": false + "/currenttime": true, + "/fininfo": true, + "/mcpgw": true, + "/realserverfaketools": true } \ No newline at end of file diff --git a/servers/fininfo/pyproject.toml b/servers/fininfo/pyproject.toml index f840390..65b22b6 100644 --- a/servers/fininfo/pyproject.toml +++ b/servers/fininfo/pyproject.toml @@ -1,11 +1,12 @@ [project] -name = "current-time-mcp" +name = "fininfo-mcp-server" version = "0.1.0" -description = "MCP server to get current time from the timeapi.io API" +description = "MCP server to provide financial information using Polygon.io API" readme = "README.md" requires-python = ">=3.12,<3.13" dependencies = [ - "mcp>=1.9.3", + "fastmcp>=2.0.0", # Updated to FastMCP 2.0 "pydantic>=2.11.3", "requests>=2.32.3", + "python-dotenv>=1.0.0", # Added dotenv as it's used in server.py ] diff --git a/servers/fininfo/server.py b/servers/fininfo/server.py index 0d1c6c2..aeb261a 100644 --- a/servers/fininfo/server.py +++ b/servers/fininfo/server.py @@ -1,5 +1,5 @@ """ -This server provides stoack market data using the Polygon.io API. +This server provides stock market data using the Polygon.io API. """ import os @@ -7,8 +7,10 @@ import requests import argparse import logging +import asyncio from pydantic import BaseModel, Field -from mcp.server.fastmcp import FastMCP +from fastmcp import FastMCP, Context # Updated import for FastMCP 2.0 +from fastmcp.server.dependencies import get_http_request # New dependency function for HTTP access from typing import Dict, Any, Optional, ClassVar, Annotated from pydantic import validator from dotenv import load_dotenv @@ -66,12 +68,186 @@ def parse_arguments(): # Parse arguments at module level to make them available args = parse_arguments() -# Initialize FastMCP server using parsed arguments with mount path configured via settings -mcp = FastMCP("fininfo", port=args.port, host="0.0.0.0") -mcp.settings.mount_path = "/fininfo" +# Initialize FastMCP 2.0 server +mcp = FastMCP("fininfo") +# Note: FastMCP 2.0 handles host/port differently - set in run() method + @mcp.tool() -def get_stock_aggregates( +async def get_http_headers(ctx: Context = None) -> Dict[str, Any]: + """ + FastMCP 2.0 tool to access HTTP headers directly using the new dependency system. + This tool demonstrates how to get HTTP request information including auth headers. + + Returns: + Dict[str, Any]: HTTP request information including headers + """ + if not ctx: + return {"error": "No context available"} + + result = { + "fastmcp_version": "2.0", + "tool_name": "get_http_headers", + "server": "fininfo", + "timestamp": str(asyncio.get_event_loop().time()) + } + + try: + # Use FastMCP 2.0's dependency function to get HTTP request + http_request = get_http_request() + + if http_request: + # Extract all headers + all_headers = dict(http_request.headers) + + # Separate auth-related headers for easy viewing + auth_headers = {} + other_headers = {} + + for key, value in all_headers.items(): + key_lower = key.lower() + if key_lower in ['authorization', 'x-user-pool-id', 'x-client-id', 'x-region', 'cookie', 'x-api-key', 'x-scopes', 'x-user', 'x-username', 'x-auth-method']: + if key_lower == 'authorization': + # Show type of auth but not full token + if value.startswith('Bearer '): + auth_headers[key] = f"Bearer (length: {len(value)})" + else: + auth_headers[key] = f" (length: {len(value)})" + elif key_lower == 'cookie': + # Show cookie names but hide values + cookies = [c.split('=')[0] for c in value.split(';')] + auth_headers[key] = f"Cookies: {', '.join(cookies)}" + else: + auth_headers[key] = value + else: + other_headers[key] = value + + result.update({ + "http_request_available": True, + "method": http_request.method, + "url": str(http_request.url), + "path": http_request.url.path, + "query_params": dict(http_request.query_params), + "client_info": { + "host": http_request.client.host if http_request.client else "Unknown", + "port": http_request.client.port if http_request.client else "Unknown" + }, + "auth_headers": auth_headers, + "other_headers": other_headers, + "total_headers_count": len(all_headers) + }) + + # Log the auth headers for server-side debugging + await ctx.info(f"🔐 HTTP Headers Debug - Auth Headers Found: {list(auth_headers.keys())}") + if auth_headers: + for key, value in auth_headers.items(): + await ctx.info(f" {key}: {value}") + else: + await ctx.info(" No auth-related headers found") + + else: + result.update({ + "http_request_available": False, + "error": "No HTTP request context available" + }) + await ctx.warning("No HTTP request context available - may be running in non-HTTP transport mode") + + except RuntimeError as e: + # This happens when not in HTTP context (e.g., stdio transport) + result.update({ + "http_request_available": False, + "error": f"Not in HTTP context: {str(e)}", + "transport_mode": "Likely STDIO or other non-HTTP transport" + }) + await ctx.info(f"Not in HTTP context - this is expected for STDIO transport: {e}") + + except Exception as e: + result.update({ + "http_request_available": False, + "error": f"Error accessing HTTP request: {str(e)}" + }) + await ctx.error(f"Error accessing HTTP request: {e}") + logger.error(f"Error in get_http_headers: {e}", exc_info=True) + + return result + + +async def print_all_http_headers(ctx: Context = None) -> str: + """ + Helper function to print out all HTTP request headers in a formatted string. + This function can be called internally by other tools to display HTTP headers. + + Args: + ctx: FastMCP Context object + + Returns: + str: Formatted string containing all HTTP headers + """ + if not ctx: + return "Error: No context available" + + output = [] + output.append("=== HTTP Request Headers ===") + output.append(f"Server: fininfo") + output.append(f"Timestamp: {asyncio.get_event_loop().time()}") + output.append("") + + try: + # Use FastMCP 2.0's dependency function to get HTTP request + http_request = get_http_request() + + if http_request: + # Extract all headers + all_headers = dict(http_request.headers) + + output.append(f"Total Headers: {len(all_headers)}") + output.append(f"HTTP Method: {http_request.method}") + output.append(f"URL: {http_request.url}") + output.append(f"Path: {http_request.url.path}") + output.append("") + output.append("Headers:") + output.append("-" * 50) + + # Sort headers for consistent output + for key in sorted(all_headers.keys()): + value = all_headers[key] + # Mask sensitive headers + if key.lower() in ['authorization', 'cookie']: + if key.lower() == 'authorization': + if value.startswith('Bearer '): + masked_value = f"Bearer (length: {len(value)})" + else: + masked_value = f" (length: {len(value)})" + else: # cookie + cookie_names = [c.split('=')[0] for c in value.split(';')] + masked_value = f": {', '.join(cookie_names)}" + output.append(f"{key}: {masked_value}") + else: + output.append(f"{key}: {value}") + + # Log to context as well + await ctx.info(f"📋 Printed all HTTP headers - Total: {len(all_headers)}") + + else: + output.append("No HTTP request context available") + output.append("This may occur when using STDIO transport") + await ctx.warning("No HTTP request context available") + + except RuntimeError as e: + output.append(f"Not in HTTP context: {str(e)}") + output.append("This is expected for STDIO transport") + await ctx.info(f"Not in HTTP context - this is expected for STDIO transport: {e}") + + except Exception as e: + output.append(f"Error accessing HTTP request: {str(e)}") + await ctx.error(f"Error accessing HTTP request: {e}") + logger.error(f"Error in print_all_http_headers: {e}", exc_info=True) + + return "\n".join(output) + + +@mcp.tool() +async def get_stock_aggregates( stock_ticker: Annotated[str, Field(..., description="Case-sensitive ticker symbol (e.g., 'AAPL')")], multiplier: Annotated[int, Field(..., description="Size of the timespan multiplier")], timespan: Annotated[str, Field(..., description="Size of the time window")], @@ -79,7 +255,8 @@ def get_stock_aggregates( to_date: Annotated[str, Field(..., description="End date in YYYY-MM-DD format or millisecond timestamp")], adjusted: Annotated[bool, Field(True, description="Whether results are adjusted for splits")] = True, sort: Annotated[Optional[str], Field(None, description="Sort results by timestamp ('asc' or 'desc')")] = None, - limit: Annotated[int, Field(5000, description="Maximum number of base aggregates (max 50000)")] = 5000 + limit: Annotated[int, Field(5000, description="Maximum number of base aggregates (max 50000)")] = 5000, + ctx: Context = None ) -> Dict[str, Any]: """ Retrieve stock aggregate data from Polygon.io API. @@ -101,6 +278,17 @@ def get_stock_aggregates( ValueError: If input parameters are invalid requests.RequestException: If API call fails after retries """ + # Log context information if available + if ctx: + await ctx.info(f"🔍 Getting stock aggregates for {stock_ticker} from {from_date} to {to_date}") + + # Use the helper function to print HTTP headers for debugging + try: + headers_info = await print_all_http_headers(ctx) + await ctx.info(f"📋 HTTP Headers Debug:\n{headers_info}") + except Exception as e: + await ctx.warning(f"Could not print HTTP headers: {e}") + # Validate timespan valid_timespans = ["minute", "hour", "day", "week", "month", "quarter", "year"] if timespan not in valid_timespans: @@ -155,7 +343,7 @@ def get_stock_aggregates( @mcp.tool() -def print_stock_data( +async def print_stock_data( stock_ticker: Annotated[str, Field(..., description="Case-sensitive ticker symbol (e.g., 'AAPL')")], multiplier: Annotated[int, Field(..., description="Size of the timespan multiplier")], timespan: Annotated[str, Field(..., description="Size of the time window")], @@ -163,7 +351,8 @@ def print_stock_data( to_date: Annotated[str, Field(..., description="End date in YYYY-MM-DD format or millisecond timestamp")], adjusted: Annotated[bool, Field(True, description="Whether results are adjusted for splits")] = True, sort: Annotated[Optional[str], Field(None, description="Sort results by timestamp ('asc' or 'desc')")] = None, - limit: Annotated[int, Field(5000, description="Maximum number of base aggregates (max 50000)")] = 5000 + limit: Annotated[int, Field(5000, description="Maximum number of base aggregates (max 50000)")] = 5000, + ctx: Context = None ) -> str: """ Format all fields from the Polygon.io stock aggregate response as a string. @@ -184,7 +373,7 @@ def print_stock_data( # Initialize an empty string to collect all output output = [] - response_data = get_stock_aggregates( + response_data = await get_stock_aggregates( stock_ticker=stock_ticker, multiplier=multiplier, timespan=timespan, @@ -192,7 +381,8 @@ def print_stock_data( to_date=to_date, adjusted=adjusted, sort=sort, - limit=limit + limit=limit, + ctx=ctx ) if not response_data: @@ -274,8 +464,9 @@ def get_config() -> str: def main(): # Run the server with the specified transport from command line args - mcp.run(transport=args.transport) - logger.info(f"Server is running on port {args.port} with transport {args.transport}") + # FastMCP 2.0 handles port and host in the run method + logger.info(f"Starting fininfo server on port {args.port} with transport {args.transport}") + mcp.run(transport=args.transport, host="0.0.0.0", port=int(args.port), path="/sse") if __name__ == "__main__": main() diff --git a/servers/mcpgw/pyproject.toml b/servers/mcpgw/pyproject.toml index bef1dc9..dcc07cd 100644 --- a/servers/mcpgw/pyproject.toml +++ b/servers/mcpgw/pyproject.toml @@ -5,12 +5,12 @@ description = "MCP server to interact with the MCP Gateway Registry API" # Updat readme = "README.md" requires-python = ">=3.12,<3.13" dependencies = [ - "fastmcp>=2.0.0", # Updated to FastMCP 2.0 + "fastmcp>=2.0.0", # Updated to FastMCP 2.0 "pydantic>=2.11.3", "httpx>=0.27.0", # Added httpx "python-dotenv>=1.0.0", # Added dotenv as it's used in server.py "websockets>=15.0.1", "faiss-cpu>=1.7.4", - "sentence-transformers>=2.2.2", # For semantic search - "scikit-learn>=1.3.0" # For cosine similarity + "sentence-transformers>=2.2.2", # For semantic search # For cosine similarity + "scikit-learn>=1.3.0", ] diff --git a/servers/mcpgw/server.py b/servers/mcpgw/server.py index ff3a4d2..4fcb0ec 100644 --- a/servers/mcpgw/server.py +++ b/servers/mcpgw/server.py @@ -580,6 +580,7 @@ async def load_faiss_data_for_mcpgw(): metadata_file_changed = False if FAISS_METADATA_PATH_MCPGW.exists(): try: + logger.info(f"MCPGW: Checking FAISS metadata file {FAISS_METADATA_PATH_MCPGW} for changes...") current_metadata_mtime = await asyncio.to_thread(os.path.getmtime, FAISS_METADATA_PATH_MCPGW) if _faiss_metadata_mcpgw is None or _last_faiss_metadata_mtime is None or current_metadata_mtime > _last_faiss_metadata_mtime or index_file_changed: logger.info(f"MCPGW: FAISS metadata file {FAISS_METADATA_PATH_MCPGW} has changed, not loaded, or index changed. Reloading...") @@ -589,6 +590,7 @@ async def load_faiss_data_for_mcpgw(): _last_faiss_metadata_mtime = current_metadata_mtime metadata_file_changed = True logger.info(f"MCPGW: FAISS metadata loaded. Paths: {len(_faiss_metadata_mcpgw.get('metadata', {})) if _faiss_metadata_mcpgw else 'N/A'}") + logger.info(f"MCPGW: FAISS metadata _faiss_metadata_mcpgw {_faiss_metadata_mcpgw.get('metadata')}") else: logger.debug("MCPGW: FAISS metadata file unchanged since last load.") except Exception as e: @@ -895,6 +897,80 @@ async def get_http_headers(ctx: Context = None) -> Dict[str, Any]: return result +async def print_all_http_headers(ctx: Context = None) -> str: + """ + Helper function to print out all HTTP request headers in a formatted string. + This function can be called internally by other tools to display HTTP headers. + + Args: + ctx: FastMCP Context object + + Returns: + str: Formatted string containing all HTTP headers + """ + if not ctx: + return "Error: No context available" + + output = [] + output.append("=== HTTP Request Headers ===") + output.append(f"Server: mcpgw") + output.append(f"Timestamp: {asyncio.get_event_loop().time()}") + output.append("") + + try: + # Use FastMCP 2.0's dependency function to get HTTP request + http_request = get_http_request() + + if http_request: + # Extract all headers + all_headers = dict(http_request.headers) + + output.append(f"Total Headers: {len(all_headers)}") + output.append(f"HTTP Method: {http_request.method}") + output.append(f"URL: {http_request.url}") + output.append(f"Path: {http_request.url.path}") + output.append("") + output.append("Headers:") + output.append("-" * 50) + + # Sort headers for consistent output + for key in sorted(all_headers.keys()): + value = all_headers[key] + # Mask sensitive headers + if key.lower() in ['authorization', 'cookie']: + if key.lower() == 'authorization': + if value.startswith('Bearer '): + masked_value = f"Bearer (length: {len(value)})" + else: + masked_value = f" (length: {len(value)})" + else: # cookie + cookie_names = [c.split('=')[0] for c in value.split(';')] + masked_value = f": {', '.join(cookie_names)}" + output.append(f"{key}: {masked_value}") + else: + output.append(f"{key}: {value}") + + # Log to context as well + await ctx.info(f"📋 Printed all HTTP headers - Total: {len(all_headers)}") + + else: + output.append("No HTTP request context available") + output.append("This may occur when using STDIO transport") + await ctx.warning("No HTTP request context available") + + except RuntimeError as e: + output.append(f"Not in HTTP context: {str(e)}") + output.append("This is expected for STDIO transport") + await ctx.info(f"Not in HTTP context - this is expected for STDIO transport: {e}") + + except Exception as e: + output.append(f"Error accessing HTTP request: {str(e)}") + await ctx.error(f"Error accessing HTTP request: {e}") + logger.error(f"Error in print_all_http_headers: {e}", exc_info=True) + + return "\n".join(output) + + @mcp.tool() async def toggle_service( service_path: str = Field(..., description="The unique path identifier for the service (e.g., '/fininfo'). Must start with '/'."), @@ -1178,21 +1254,24 @@ async def intelligent_tool_finder( if not service_path: logger.warning(f"MCPGW: Could not find service_path for FAISS ID {faiss_id}. Skipping.") continue + else: + logger.debug(f"MCPGW: Found service_path {service_path} for FAISS ID {faiss_id}") service_metadata = registry_faiss_metadata.get(service_path) if not service_metadata or "full_server_info" not in service_metadata: logger.warning(f"MCPGW: Metadata or full_server_info not found for service path {service_path}. Skipping.") continue - + else: + logger.debug(f"MCPGW: Found metadata for service path {service_path}, service_metadata: {service_metadata}") full_server_info = service_metadata["full_server_info"] if not full_server_info.get("is_enabled", False): logger.info(f"MCPGW: Service {service_path} is disabled. Skipping its tools.") continue - + logger.info(f"MCPGW: Processing service {service_path} with full_server_info: {full_server_info}") service_name = full_server_info.get("server_name", "Unknown Service") tool_list = full_server_info.get("tool_list", []) - + logger.info(f"MCPGW: Found {len(tool_list)} tools for service {service_name} at path {service_path}") for tool_info in tool_list: tool_name = tool_info.get("name", "Unknown Tool") parsed_desc = tool_info.get("parsed_description", {}) @@ -1203,7 +1282,7 @@ async def intelligent_tool_finder( # Check if user has access to this tool based on scopes # Map service_path to server name for scope checking server_name = service_path.lstrip('/') if service_path.startswith('/') else service_path - + logger.info(f"MCPGW: Checking access for user to tool {server_name}.{tool_name} with scopes {user_scopes}") if not check_tool_access(server_name, tool_name, user_scopes, scopes_config): logger.debug(f"User does not have access to tool {server_name}.{tool_name}, skipping") continue