-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathslurm_mcp_server.py
More file actions
129 lines (104 loc) · 3.3 KB
/
slurm_mcp_server.py
File metadata and controls
129 lines (104 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# file: slurm_mcp_server.py
# description: SLURM 任务管理 MCP 服务器端代码
# author: Chengze.Xiao
# email: chengzehsiao@outlook.com
# date: 2025-06-23
#
from mcp.server import FastMCP
app = FastMCP('slurm-mcp-server')
@app.tool()
async def get_slurm_jobs() -> str:
"""
获取 SLURM 作业列表。
Returns:
SLURM 作业列表的字符串表示。
"""
import subprocess
result = subprocess.run(['squeue', '-h'], capture_output=True, text=True, check=True)
return result.stdout
@app.tool()
async def query_slurm_job(job_id: str) -> str:
"""
查询特定 SLURM 任务的执行状态。
Args:
job_id: 要查询的作业 ID。
Returns:
指定作业的详细状态信息。
"""
import subprocess
result = subprocess.run(['squeue', '-j', job_id], capture_output=True, text=True, check=True)
return result.stdout
@app.tool()
async def submit_slurm_job(script_content: str, job_name: str = "mcp_job") -> str:
"""
提交新的 SLURM 任务。
Args:
script_content: 任务脚本的内容。
job_name: 任务的名称,默认为 "mcp_job"。
Returns:
提交任务的结果,包含作业 ID 或错误信息。
"""
import subprocess
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(script_content)
script_path = f.name
result = subprocess.run(['sbatch', '--job-name', job_name, script_path], capture_output=True, text=True, check=True)
return result.stdout
@app.tool()
async def cancel_slurm_job(job_id: str) -> str:
"""
取消指定的 SLURM 任务。
Args:
job_id: 要取消的作业 ID。
Returns:
取消操作的结果。
"""
import subprocess
result = subprocess.run(['scancel', job_id], capture_output=True, text=True, check=True)
return result.stdout
@app.tool()
async def get_slurm_job_history(user: str = None, status: str = None) -> str:
"""
查询 SLURM 历史任务。
Args:
user: 可选,按用户过滤历史任务。
status: 可选,按任务状态(如 COMPLETED, FAILED)过滤历史任务。
Returns:
符合条件的 SLURM 历史任务列表。
"""
import subprocess
cmd = ['sacct', '-P', '-o', 'JobID,JobName,User,State,Elapsed']
if user: cmd.extend(['--user', user])
if status: cmd.extend(['--state', status])
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout
@app.tool()
async def get_slurm_nodes_info() -> str:
"""
获取 SLURM 集群中节点的信息。
Returns:
SLURM 节点信息的字符串表示。
"""
import subprocess
result = subprocess.run(['sinfo', '-Nl'], capture_output=True, text=True, check=True)
return result.stdout
@app.tool()
async def get_slurm_partitions_info() -> str:
"""
获取 SLURM 集群中分区的信息。
Returns:
SLURM 分区信息的字符串表示。
"""
import subprocess
result = subprocess.run(['sinfo'], capture_output=True, text=True, check=True)
return result.stdout
def main():
app.settings.host = "0.0.0.0"
app.settings.port = 8000
app.run(transport="sse")
if __name__ == "__main__":
main()