|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""RPC method validation test |
| 3 | +
|
| 4 | +Copyright 2025 |
| 5 | +SPDX-License-Identifier: Apache-2.0 |
| 6 | +Authors: Mihai Criveti |
| 7 | +
|
| 8 | +This script tests if alicious method names reach the tool lookup logic |
| 9 | +instead of being rejected at the validation layer. |
| 10 | +
|
| 11 | +Usage: |
| 12 | + python test_rpc_vulnerability_demo.py |
| 13 | +""" |
| 14 | + |
| 15 | +# Standard |
| 16 | +import json |
| 17 | +import os |
| 18 | +import sys |
| 19 | +import pytest |
| 20 | + |
| 21 | +try: |
| 22 | + # Third-Party |
| 23 | + import requests |
| 24 | +except ImportError: |
| 25 | + print("Please install requests: pip install requests") |
| 26 | + sys.exit(1) |
| 27 | + |
| 28 | +@pytest.mark.skip(reason="Disabled temporarily as this requires a live MCP Gateway instance") |
| 29 | +def test_rpc_vulnerability(): |
| 30 | + """Test the RPC endpoint with malicious method names.""" |
| 31 | + |
| 32 | + # Configuration |
| 33 | + base_url = os.getenv("MCPGATEWAY_URL", "http://localhost:4444") |
| 34 | + bearer_token = os.getenv("MCPGATEWAY_BEARER_TOKEN") |
| 35 | + |
| 36 | + if not bearer_token: |
| 37 | + print("Please set MCPGATEWAY_BEARER_TOKEN environment variable") |
| 38 | + print("You can generate one with:") |
| 39 | + print(" export MCPGATEWAY_BEARER_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key)") |
| 40 | + sys.exit(1) |
| 41 | + |
| 42 | + headers = {"Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json"} |
| 43 | + |
| 44 | + print("=" * 80) |
| 45 | + print("RPC METHOD VALIDATION VULNERABILITY TEST") |
| 46 | + print("=" * 80) |
| 47 | + print(f"Testing against: {base_url}/rpc") |
| 48 | + print() |
| 49 | + |
| 50 | + # Test cases |
| 51 | + test_cases = [ |
| 52 | + {"name": "XSS in method name", "payload": {"jsonrpc": "2.0", "method": "<script>alert(1)</script>", "id": 1}}, |
| 53 | + {"name": "SQL injection in method name", "payload": {"jsonrpc": "2.0", "method": "'; DROP TABLE users; --", "id": 2}}, |
| 54 | + {"name": "Command injection in method name", "payload": {"jsonrpc": "2.0", "method": "; cat /etc/passwd", "id": 3}}, |
| 55 | + {"name": "Path traversal in method name", "payload": {"jsonrpc": "2.0", "method": "../../../etc/passwd", "id": 4}}, |
| 56 | + {"name": "Valid method name (control)", "payload": {"jsonrpc": "2.0", "method": "tools_list", "id": 5}}, |
| 57 | + ] |
| 58 | + |
| 59 | + for test in test_cases: |
| 60 | + print(f"\nTest: {test['name']}") |
| 61 | + print(f"Method: {test['payload']['method']}") |
| 62 | + print("-" * 40) |
| 63 | + |
| 64 | + try: |
| 65 | + response = requests.post(f"{base_url}/rpc", json=test["payload"], headers=headers, timeout=5) |
| 66 | + |
| 67 | + print(f"Status Code: {response.status_code}") |
| 68 | + |
| 69 | + # Pretty print the response |
| 70 | + try: |
| 71 | + response_data = response.json() |
| 72 | + print(f"Response: {json.dumps(response_data, indent=2)}") |
| 73 | + |
| 74 | + # Check for the vulnerability signature |
| 75 | + if response.status_code == 200 and "error" in response_data: |
| 76 | + error_data = response_data["error"].get("data", "") |
| 77 | + error_message = response_data["error"].get("message", "") |
| 78 | + |
| 79 | + # The vulnerability: malicious input appears in "Tool not found" error |
| 80 | + if "Tool not found:" in str(error_data) and test["payload"]["method"] in str(error_data): |
| 81 | + print("\n❌ VULNERABILITY DETECTED!") |
| 82 | + print(" The malicious method name reached the tool lookup logic.") |
| 83 | + print(" This indicates validation is happening AFTER processing.") |
| 84 | + elif test["payload"]["method"] in str(error_data) or test["payload"]["method"] in error_message: |
| 85 | + print("\n❌ SECURITY ISSUE: User input reflected in error message!") |
| 86 | + else: |
| 87 | + print("\n✅ Method appears to be properly rejected") |
| 88 | + |
| 89 | + elif response.status_code in [400, 422]: |
| 90 | + print("\n✅ Method rejected at validation layer (good!)") |
| 91 | + elif response.status_code == 200 and "result" in response_data: |
| 92 | + if test["name"] == "Valid method name (control)": |
| 93 | + print("\n✅ Valid method processed successfully") |
| 94 | + else: |
| 95 | + print("\n❌ CRITICAL: Malicious method was executed!") |
| 96 | + |
| 97 | + except ValueError: |
| 98 | + print(f"Raw Response: {response.text[:200]}...") |
| 99 | + |
| 100 | + except requests.exceptions.RequestException as e: |
| 101 | + print(f"Request failed: {e}") |
| 102 | + |
| 103 | + print("\n" + "=" * 80) |
| 104 | + print("SUMMARY") |
| 105 | + print("=" * 80) |
| 106 | + print("\nVulnerability Indicators:") |
| 107 | + print("- Error message contains 'Tool not found: <malicious-input>'") |
| 108 | + print("- HTTP 200 status with error instead of 422/400") |
| 109 | + print("- User input reflected in error messages") |
| 110 | + print("\nExpected Secure Behavior:") |
| 111 | + print("- HTTP 422 or 400 for invalid method formats") |
| 112 | + print("- Generic error message without user input") |
| 113 | + print("- Validation before any processing/lookup") |
| 114 | + print("=" * 80) |
| 115 | + |
| 116 | + |
| 117 | +if __name__ == "__main__": |
| 118 | + test_rpc_vulnerability() |
0 commit comments