-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathserver.py
More file actions
117 lines (95 loc) · 3.44 KB
/
server.py
File metadata and controls
117 lines (95 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import sys
import os
import re
import time
from typing import Dict, Any
import requests
from fastmcp import FastMCP
from dotenv import load_dotenv
from common_tools import *
load_dotenv()
def load_server_config(config_file: str = "mcp_settings.json") -> Dict[str, Any]:
"""load server config from config file"""
try:
with open(config_file, 'r') as f:
config = json.load(f)
return config.get('mcpServers').get('joern').get('config')
except FileNotFoundError:
print(f"config file {config_file} not exist")
return {}
except json.JSONDecodeError:
print(f"config file {config_file} format error")
return {}
joern_config = load_server_config()
# server_endpoint = f'{os.getenv("HOST")}:{os.getenv("PORT")}'
server_endpoint = f'{joern_config.get('host')}:{joern_config.get("port")}'
log_level = joern_config.get('log_level', 'ERROR')
joern_mcp = FastMCP("joern-mcp", log_level=log_level)
# joern_mcp._tool_manager.
print(server_endpoint)
basic_auth = (os.getenv("JOERN_AUTH_USERNAME"), os.getenv("JOERN_AUTH_PASSWORD"))
timeout = int(joern_config.get('timeout', '300'))
def joern_remote(query):
"""
Execute remote query and return results
Parameters:
query -- The query string to execute
Returns:
Returns the server response stdout content on success
Returns None on failure, error message will be output to stderr
"""
data = {"query": query}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(
f'http://{server_endpoint}/query-sync',
data=json.dumps(data),
headers=headers,
auth=basic_auth,
timeout=timeout
)
response.raise_for_status()
result = response.json()
return remove_ansi_escape_sequences(result.get('stdout', ''))
except requests.exceptions.RequestException as e:
sys.stderr.write(f"Request Error: {str(e)}\n")
except json.JSONDecodeError:
sys.stderr.write("Error: Invalid JSON response\n")
return None
@joern_mcp.tool()
def get_help():
"""Get help information from joern server"""
response = joern_remote('help')
if response:
return response
else:
return 'Query Failed'
@joern_mcp.tool()
def check_connection() -> str:
"""Check if the Joern MCP plugin is running"""
try:
metadata = extract_value(joern_remote("version"))
if not metadata:
return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running."
return f"Successfully connected to Joern MCP, joern server version is {metadata}"
except Exception as e:
return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running."
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
GENERATED_PY = os.path.join(SCRIPT_DIR, "server_tools.py")
def generate():
"""Generate and execute additional server tools from server_tools.py file.
This function reads the content of server_tools.py and executes it to add
more functionality to the server.
"""
with open(GENERATED_PY, "r") as f:
code = f.read()
exec(compile(code, GENERATED_PY, "exec"))
generate()
def main():
"""Start the MCP server using stdio transport.
This is the main entry point for running the Joern MCP server.
"""
joern_mcp.run(transport="stdio")
if __name__ == "__main__":
main()