Skip to content

Commit 0a9e226

Browse files
committed
fix(gpu): 新增用户自定义GPU优先级排序功能,支持通过user_cmp参数传入自定义比较函数;更新README.md以反映新特性和示例;修复部分代码格式和注释。
1 parent d45f9b4 commit 0a9e226

File tree

7 files changed

+129
-59
lines changed

7 files changed

+129
-59
lines changed

flowline/api/interface.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ def do_task(self, arg):
129129
print(f"{v['task_id']:<8} {v['name']:<12} {v['run_num']:<8} {v['dict']:<20}")
130130
print("-" * 100)
131131

132-
def run_cli(func, task_dir=None):
132+
def run_cli(func, task_dir=None, user_cmp=None):
133133
"""run the command line interface"""
134-
program = ProgramManager(func, task_dir)
134+
program = ProgramManager(func, task_dir, user_cmp)
135135
cli = CommandLineInterface(program)
136136
try:
137137
cli.cmdloop()

flowline/core/gpu.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pynvml
22
import time
33
import threading
4+
from functools import cmp_to_key
45

56
from flowline.config import config
67
from flowline.utils import Log
@@ -39,7 +40,7 @@ def __init__(self, gpu_id, on_flash=None):
3940
self.gpu_id = gpu_id
4041
self.info_history = []
4142
self.info_history_length = 10
42-
self.info = []
43+
self.info = None
4344
self.user_process_num = 0
4445
self.on_flash = on_flash
4546
self.monitor_interval = 5
@@ -101,9 +102,10 @@ def get_gpu_count():
101102
return gpu_count
102103

103104
class GPU_Manager:
104-
def __init__(self, use_gpu_id: list, on_flash=None):
105+
def __init__(self, use_gpu_id: list, on_flash=None, user_cmp=None):
105106
self._lock = threading.Lock()
106107
self.all_gpu = [GPU(i, on_flash) for i in range(get_gpu_count())]
108+
self.user_cmp = user_cmp
107109
self.usable_mark = [False] * len(self.all_gpu)
108110
for gpu_id in use_gpu_id:
109111
self.usable_mark[gpu_id] = True
@@ -127,25 +129,32 @@ def update_user_process_num(self, gpu_id, pid, status):
127129
def flash_all_gpu(self):
128130
for gpu in self.all_gpu:
129131
gpu.flash()
130-
132+
131133
@synchronized
132134
def choose_gpu(self):
135+
def gpu_priority_cmp(info1: GPU_info, info2: GPU_info):
136+
if info1.free_memory > info2.free_memory:
137+
return -1
138+
elif info1.free_memory < info2.free_memory:
139+
return 1
140+
else:
141+
return 0
142+
133143
self.flash_all_gpu()
134-
choose_gpu = None
135-
for gpu in self.all_gpu:
136-
if self.usable_mark[gpu.gpu_id]:
137-
info = gpu.info
138-
if info.free_memory > self.min_process_memory:
139-
if choose_gpu is None:
140-
choose_gpu = gpu
141-
continue
142-
elif info.utilization < choose_gpu.info.utilization:
143-
choose_gpu = gpu
144-
elif info.utilization == gpu.info.utilization:
145-
if info.free_memory > choose_gpu.info.free_memory:
146-
choose_gpu = gpu
147-
logger.info(f"GPU_Manager choose_gpu: {choose_gpu}")
148-
return choose_gpu.gpu_id if choose_gpu is not None else None
144+
if all(not mark for mark in self.usable_mark) or len(self.all_gpu) == 0:
145+
return None, []
146+
usable_gpus = [
147+
(gpu.gpu_id, gpu.info)
148+
for gpu, mark in zip(self.all_gpu, self.usable_mark)
149+
if mark
150+
]
151+
cmp_func = gpu_priority_cmp if self.user_cmp is None else self.user_cmp
152+
usable_gpus.sort(key=cmp_to_key(lambda a, b: cmp_func(a[1], b[1])))
153+
sorted_gpu_ids = [gpu_id for gpu_id, _ in usable_gpus]
154+
logger.info(f"GPU_Manager choose_gpu: {sorted_gpu_ids}")
155+
chosen_gpu_id = usable_gpus[0][0] if usable_gpus[0][1].free_memory > self.min_process_memory else None
156+
return chosen_gpu_id, sorted_gpu_ids
157+
149158

150159
@synchronized
151160
def switch_gpu(self, gpu_id):

flowline/core/program.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
logger = Log(__name__)
1515

1616
class ProgramManager:
17-
def __init__(self, user_func, task_dir):
17+
def __init__(self, user_func, task_dir, user_cmp=None):
1818
self._lock = threading.Lock()
19-
self.gpu_manager = GPU_Manager([0], self.on_gpu_flash)
19+
self.gpu_manager = GPU_Manager([0], self.on_gpu_flash, user_cmp)
2020
self.process_manager = ProcessManager(self.on_process_changed)
2121
self.task_manager = TaskManager(task_dir)
2222

@@ -59,15 +59,15 @@ def new_process(self):
5959
if not self.process_manager.have_space():
6060
logger.info(f"over max processes")
6161
return
62-
gpu_id = self.gpu_manager.choose_gpu()
62+
gpu_id, sorted_gpu_ids = self.gpu_manager.choose_gpu()
6363
if gpu_id is None:
6464
logger.info(f"no available GPU")
6565
return
6666
task_id, dict = self.task_manager.get_next_task()
6767
if task_id is None:
6868
logger.info("no task to handle")
6969
return
70-
cmd = self.func(dict, gpu_id)
70+
cmd = self.func(dict, gpu_id, sorted_gpu_ids)
7171
process = self.process_manager.add_process(cmd, task_id, gpu_id)
7272
if process is None:
7373
self.task_manager.put_task_ids(task_id)

flowline/core/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def format_tidy_df(self):
6565
self.df['name'] = ['Task:' + str(i) for i in self.df.index]
6666
if 'cmd' not in self.df.columns:
6767
self.df['cmd'] = 'No command'
68-
68+
6969
def synchronized(func):
7070
def wrapper(self, *args, **kwargs):
7171
with self._lock:

main_cli.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,20 @@
22

33
from flowline import run_cli
44

5+
def func(dict, gpu_id, sorted_gpu_ids):
6+
print(sorted_gpu_ids)
7+
return "CUDA_VISIBLE_DEVICES="+str(gpu_id)+" python -u test/test.py "+ " ".join([f"--{k} {v}" for k, v in dict.items()])
8+
9+
def cmp(info1, info2):
10+
if info1.free_memory > info2.free_memory:
11+
return -1
12+
elif info1.free_memory < info2.free_memory:
13+
return 1
14+
else:
15+
return 0
16+
517
if __name__ == "__main__":
6-
def func(dict, gpu_id):
7-
return "CUDA_VISIBLE_DEVICES="+str(gpu_id)+" python -u test/test.py "+ " ".join([f"--{k} {v}" for k, v in dict.items()])
8-
9-
run_cli(func, "test/todo.csv")
18+
# run_cli(func, "test/todo.csv")
19+
run_cli(func, "test/todo.csv", user_cmp=cmp) # user_cmp可选
1020

11-
# pkill -9 python
21+
# 如果出现异常进程,可以使用 pkill -9 python 杀死所有进程

readme.md

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,23 @@ FlowLine 是一个用于 **GPU资源管理** 和 **并发指令流调度** 的
1414
* 🧩 **系统概要设计**:详见 [概要设计](./docs/design.md)
1515
* 🏗️ **系统架构详解**:详见 [架构说明](./docs/arch.md)
1616

17-
该系统的设计初衷是为了替代手动监控 GPU 状态并依次执行命令的低效方式。在传统流程中,用户需要持续关注 GPU 的剩余显存和使用情况,以便手动启动 Python 脚本或终止进程,这在多任务实验场景中尤为繁琐。本项目通过自动化机制解决了这些问题,提升了实验效率与资源利用率。
17+
该系统的设计初衷是为了替代手动监控 GPU 状态并依次执行命令的低效方式。在传统流程中,用户需要持续关注 GPU 的显存占用和利用率,还可能cuda out of memory,以便手动启动 Python 脚本或终止进程,这在多任务实验场景中尤为繁琐。本项目通过自动化机制解决了这些问题,提升了实验效率与资源利用率。
1818

19-
> ~你也不想因为 CUDA Out of Memory 重新手动改sh吧。~
19+
本系统的设计初衷在于替代传统的手动 GPU 监控与命令执行流程,从而提升实验效率。在传统方式下,用户需持续关注 GPU 的显存占用与利用率,并可能因 CUDA 内存不足(CUDA Out of Memory)而中断任务,需要手动启动或终止 Python 脚本。这种操作在多任务实验场景中尤为繁琐。本项目通过自动化管理机制,提升了实验效率与资源利用率。
2020

2121
## 核心特性
2222

23-
* 实时 GPU 状态监控:自动检测可用 GPU 数量、显存占用、进程信息等、并选择最恰当的GPU;
23+
* 实时 GPU 状态监控:自动检测可用 GPU 数量、显存占用、进程信息等、并根据自定义优先函数排序;
24+
* **报错自动处理**:错误中断后自动重新入队而解决 CUDA Out of Memory 等非程序BUG;
2425
* 命令调度与资源控制:支持配置每条命令所需 GPU 数量、显存下限、最大并行数等条件;
2526
* 动态调控机制:可手动终止或重启进程,实现任务队列灵活管理;
2627
* 多任务并发执行:支持任务优先级队列、失败重试等策略,适用于批量实验运行;
2728
* 双重交互入口:命令行接口适合在 Linux 服务器上进行脚本化控制与批量部署,Web 界面适合进行任务的可视化查看、状态监控与实时干预。
2829

30+
## Updates
31+
32+
* 2025.09.09: 新增用户自定义 GPU 优先级排序功能,通过 `user_cmp` 参数传入自定义排序函数,详见 [main_cli.py](./main_cli.py) 示例。
33+
2934
## 🚀 快速使用指南
3035

3136
### 🖥️ 使用命令行接口(CLI 模式)
@@ -34,13 +39,13 @@ FlowLine 是一个用于 **GPU资源管理** 和 **并发指令流调度** 的
3439

3540
你可以将 `flowline` 文件夹拷贝到项目的根目录下直接引用,也可以通过以下方式将其安装到你的 Python 环境中:
3641

37-
- 从pip安装
42+
* 从pip安装
3843

3944
```bash
4045
pip install fline
4146
```
4247

43-
- 或者从源代码安装
48+
* 或者从源代码安装
4449

4550
```bash
4651
pip install -e <flowline库路径>
@@ -76,31 +81,61 @@ pip install -e <flowline库路径>
7681

7782
</details>
7883

79-
#### 3. 定义任务构造函数 `func(dict, gpu_id)`
84+
#### 3. 定义任务构造函数 和 GPU优先级比较函数(可选)
8085

8186
你需要自定义一个函数,用于根据 Excel 中的每一行任务参数 `dict` 以及分配的 GPU 编号 `gpu_id` 构造出最终的命令行字符串。
8287

8388
<details>
8489
<summary>示例和说明</summary>
8590

86-
例如
91+
详见 [main_cli.py](./main_cli.py) 示例。主要部分如下
8792

8893
```python
89-
from flowline import run_cli
94+
def func(dict, gpu_id, sorted_gpu_ids):
95+
print(sorted_gpu_ids)
96+
return "CUDA_VISIBLE_DEVICES="+str(gpu_id)+" python -u test/test.py "+ " ".join([f"--{k} {v}" for k, v in dict.items()])
97+
98+
def cmp(info1, info2):
99+
if info1.free_memory > info2.free_memory:
100+
return -1
101+
elif info1.free_memory < info2.free_memory:
102+
return 1
103+
else:
104+
return 0
90105

91106
if __name__ == "__main__":
92-
def func(param_dict, gpu_id):
93-
cmd = "CUDA_VISIBLE_DEVICES=" + str(gpu_id) + " python -u test/test.py "
94-
args = " ".join([f"--{k} {v}" for k, v in param_dict.items()])
95-
return cmd + args
96-
97-
run_cli(func, "test/todo.xlsx")
107+
# run_cli(func, "test/todo.csv")
108+
run_cli(func, "test/todo.csv", user_cmp=cmp) # user_cmp可选
98109
```
99110

100-
* `param_dict` 是由 Excel 中当前任务行构造出的字典,键为列名,值为单元格内容;
101-
* `gpu_id` 是系统动态分配的 GPU 编号,保证任务不冲突;
102-
* 拼接后的命令字符串将作为子进程执行,等效于直接在命令行中执行该命令;
103-
* 你可以根据实际情况替换为 shell 脚本、conda 环境、主命令变体等。
111+
* `dict` 是由 Excel 中当前任务行构造出的字典,键为列名,值为单元格内容;
112+
* `gpu_id` 是系统动态分配的 GPU 编号,保证任务不冲突,其此刻一定满足显存空间最小限制;
113+
* `sorted_gpu_ids`(可选)是经过优先级排序后的可用 GP U序列,为可能的多 GPU 任务适配;
114+
* 拼接后的命令字符串将作为子进程执行,等效于直接在命令行中执行该命令。你可以根据实际情况替换为 shell 脚本、conda 环境、主命令变体等。
115+
116+
<details>
117+
<summary>user_cmp 可用参数表</summary>
118+
119+
info1、info2其实是 [gpu.py](.flowline/core/gpu.py) 的 GPU_info 对象的实例,其可用作比较函数的参数有:
120+
121+
可以用一个 Markdown 表格清晰地表示 `GPU_info` 类中每个参数的含义:
122+
123+
| 参数名 | 类型 | 含义说明 |
124+
| ------------------ | --------- | ------------------------------------------------------------- |
125+
| `free_memory` | int/float | GPU 当前可用显存大小(通常单位为 MB 或 GB) |
126+
| `total_memory` | int/float | GPU 总显存容量 |
127+
| `utilization` | int/float | GPU 当前使用率(百分比,0\~100) |
128+
| `user_process_num` | int | 当前用户进程数量(默认初始化为 0,可统计特定用户的 GPU 进程) |
129+
| `all_process_num` | int | GPU 上运行的所有进程数量 |
130+
| `time` | float | 记录信息更新时间的时间戳(UNIX 时间戳格式) |
131+
| `name` | str | GPU 名称,如 "NVIDIA GeForce RTX 3090" |
132+
| `temperature` | int/float | GPU 当前温度(单位通常为摄氏度 °C) |
133+
| `power` | float | GPU 当前功耗(单位通常为瓦特 W) |
134+
| `max_power` | float | GPU 最大设计功耗(单位瓦特 W) |
135+
136+
如果你需要,我可以帮你画一个**可视化示意图**,把 GPU 状态参数和它们的含义一目了然地展示出来,这在文档或汇报中很直观。你希望我画吗?
137+
138+
</details>
104139

105140
<details>
106141
<summary>关于输出和python -u</summary>
@@ -124,7 +159,13 @@ log/
124159
</details>
125160
</details>
126161

127-
#### 4. 输入`run`开始运行任务流
162+
#### 4. 运行程序后输入`run`开始运行任务流
163+
164+
运行 `main_cli.py` 启动程序:
165+
166+
```bash
167+
python main_cli.py
168+
```
128169

129170
<details>
130171
<summary>FlowLine CLI 命令参考表</summary>
@@ -143,7 +184,6 @@ log/
143184
| `exit` || 退出程序(等效`Ctrl+D`|
144185
| `help``?` || 显示帮助信息 |
145186

146-
147187
<details>
148188
<summary>命令使用示例</summary>
149189

@@ -175,6 +215,7 @@ log/
175215
# 退出程序
176216
> exit
177217
```
218+
178219
</details>
179220
</details>
180221

@@ -203,7 +244,6 @@ python -m http.server 8000
203244

204245
这将在 [http://localhost:8000](http://localhost:8000/) 启动前端服务,前端可通过 RESTful API 访问后端任务状态与控制接口。
205246

206-
207247
<div align=center>
208248
<img src="./docs/fig/gpu.png" alt="Image 1" height="200px" />
209249
<img src="./docs/fig/task.png" alt="Image 1" height="200px" />
@@ -217,26 +257,27 @@ python -m http.server 8000
217257

218258
### 📌 使用前须知
219259

220-
- 本项目提供的工具**不会以暴力方式强制杀掉他人任务**,也**不会绕过权限限制或系统调度机制**
221-
- 本脚本默认**只在用户拥有访问权限的设备上运行**,请确保遵守所在实验室或计算平台的使用规则。
222-
- **请勿用于占用公共资源或干扰他人科研工作**,违者后果自负。
260+
* 本项目提供的工具**不会以暴力方式强制杀掉他人任务**,也**不会绕过权限限制或系统调度机制**
261+
* 本脚本默认**只在用户拥有访问权限的设备上运行**,请确保遵守所在实验室或计算平台的使用规则。
262+
* **请勿用于占用公共资源或干扰他人科研工作**,违者后果自负。
223263

224264
### 🚨 风险声明
225265

226266
使用本脚本可能带来的风险包括但不限于:
227267

228-
- 与他人并发调度产生冲突,影响公平使用;
229-
- 若滥用,可能违反实验室/平台管理规定;
268+
* 与他人并发调度产生冲突,影响公平使用;
269+
* 若滥用,可能违反实验室/平台管理规定;
230270

231271
开发者对因使用本脚本而导致的**资源冲突、账号受限、数据丢失或任何直接间接损失**概不负责。
232272

233273
## 💐 贡献
234274

235275
欢迎大家为本模板贡献代码、修正bug或完善文档!
236-
- 如有建议或问题,请提交Issue。
237-
- 欢迎提交Pull Request。
238276

239-
> [!TIP]
277+
* 如有建议或问题,请提交Issue。
278+
* 欢迎提交Pull Request。
279+
280+
> [!TIP]
240281
> 若对您有帮助,请给这个项目点上 **Star**!
241282
242283
**感谢所有贡献者!**

test/test2.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import time
2+
import sys
3+
4+
if __name__ == "__main__":
5+
# sys.argv 会包含运行脚本时输入的所有参数(包括脚本名)
6+
all_args = " ".join(sys.argv)
7+
8+
while True:
9+
print(time.time(), all_args)
10+
time.sleep(1)

0 commit comments

Comments
 (0)