forked from hexaeder/MCPRepl.jl
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmcp-julia-adapter
More file actions
executable file
·100 lines (84 loc) · 3.28 KB
/
mcp-julia-adapter
File metadata and controls
executable file
·100 lines (84 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python3
"""
MCP Julia REPL Adapter
This adapter bridges stdin/stdout MCP clients to the HTTP-based Julia MCP server.
It reads JSON-RPC messages from stdin and forwards them to http://localhost:3000,
then returns responses to stdout.
Usage:
./mcp-julia-adapter
The Julia MCP server must be running first:
julia --project -e "using MCPRepl; MCPRepl.start!()"
"""
import json
import os
import sys
import urllib.request
import urllib.error
import time
def main():
"""Main adapter loop - reads stdin, forwards to HTTP server, writes to stdout"""
for line in sys.stdin:
line = line.strip()
if not line:
continue
port = sys.argv[1] if len(sys.argv) > 1 else None
if not port:
port = os.getenv("JULIA_MCP_PORT", "3000")
try:
# Parse JSON-RPC request from stdin
request = json.loads(line)
# Forward to HTTP server with timeout
data = json.dumps(request).encode('utf-8')
req = urllib.request.Request(f'http://localhost:{port}',
data=data,
headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=600) as response:
result = json.loads(response.read().decode('utf-8'))
print(json.dumps(result), flush=True)
except json.JSONDecodeError as e:
# Invalid JSON from stdin
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
except urllib.error.URLError as e:
# Connection failed to HTTP server - try to recover
request_id = request.get("id") if 'request' in locals() else None
# For connection errors, try a brief retry
if "Connection refused" in str(e):
try:
time.sleep(0.1) # Brief delay
with urllib.request.urlopen(req, timeout=600) as response:
result = json.loads(response.read().decode('utf-8'))
print(json.dumps(result), flush=True)
continue
except:
pass # Fall through to error response
error_response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Connection failed: {str(e)}. Is the Julia MCP server running?"
}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
# Other errors - ensure proper ID handling
request_id = request.get("id") if 'request' in locals() else None
error_response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
main()