Skip to content

Commit b926572

Browse files
committed
优化进程管理,修复存在僵尸进程的问题,加快启动速度
1 parent 43846ca commit b926572

File tree

6 files changed

+83
-99
lines changed

6 files changed

+83
-99
lines changed

gpt_server/model_backend/hf_backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(self, tokenizer: PreTrainedTokenizer, model: torch.nn.Module) -> No
5555
self.model.load_adapter(model_id=lora_path, adapter_name=lora_name)
5656

5757
def shutdown(self):
58-
pass
58+
logger.info("hf后端退出")
5959

6060
async def stream_chat(self, params: Dict[str, Any]):
6161
# params 已不需要传入 prompt

gpt_server/model_backend/lmdeploy_backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def __init__(self, model_path, tokenizer: PreTrainedTokenizer) -> None:
103103
self.async_engine.request_logger = CustomRequestLogger(max_log_len=None)
104104

105105
def shutdown(self):
106-
pass
106+
logger.info("lmdeploy后端退出")
107107

108108
async def stream_chat(self, params: Dict[str, Any]) -> AsyncGenerator:
109109
# params 已不需要传入 prompt

gpt_server/model_backend/sglang_backend.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(self, model_path, tokenizer: PreTrainedTokenizer) -> None:
6262

6363
def shutdown(self):
6464
self.async_engine.shutdown()
65+
logger.info("sglang后端退出")
6566

6667
async def stream_chat(self, params: Dict[str, Any]) -> AsyncGenerator:
6768
# params 已不需要传入 prompt

gpt_server/model_backend/vllm_backend.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@
1313
)
1414
from gpt_server.settings import get_model_config
1515

16-
# 解决vllm中 ray集群在 TP>1时死的Bug
17-
import ray
18-
19-
ray.init(ignore_reinit_error=True, num_cpus=8)
20-
2116

2217
class VllmBackend(ModelBackend):
2318
def __init__(self, model_path, tokenizer: PreTrainedTokenizer) -> None:
@@ -63,6 +58,7 @@ def __init__(self, model_path, tokenizer: PreTrainedTokenizer) -> None:
6358

6459
def shutdown(self):
6560
self.engine.shutdown()
61+
logger.info("vllm后端退出")
6662

6763
async def stream_chat(self, params: Dict[str, Any]) -> AsyncGenerator:
6864
# params 已不需要传入 prompt

gpt_server/serving/main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
import yaml
23
import os
34
import sys
@@ -20,6 +21,7 @@
2021
os.environ["PYTHONPATH"] = original_pythonpath + ":" + root_dir
2122
sys.path.append(root_dir)
2223
os.environ["LOGDIR"] = os.path.join(root_dir, "logs")
24+
from gpt_server import utils
2325
from gpt_server.utils import (
2426
start_api_server,
2527
start_model_worker,
@@ -50,3 +52,9 @@ def main():
5052

5153
if __name__ == "__main__":
5254
main()
55+
# 主线程保持空转,收到 SIGINT 后自然落进 atexit
56+
try:
57+
while not utils._SHOULD_EXIT:
58+
time.sleep(0.5)
59+
except KeyboardInterrupt:
60+
pass

gpt_server/utils.py

Lines changed: 71 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,65 @@
33
import os
44
import sys
55
import json
6-
from multiprocessing import Process
76
import subprocess
87
from loguru import logger
98
import torch
109
import psutil
1110
from rich import print
1211
import signal
1312
from pathlib import Path
13+
import atexit
14+
from typing import List, Dict
1415

1516
ENV = os.environ
1617
logger.add("logs/gpt_server.log", rotation="100 MB", level="INFO")
1718
root_dir = Path(__file__).parent
1819
STATIC_DIR = root_dir / "static"
1920

21+
# 全局登记表:{"name": <subprocess.Popen>}
22+
_REGISTRY: Dict[str, List[subprocess.Popen]] = {
23+
"controller": [],
24+
"openai": [],
25+
"worker": [],
26+
}
27+
28+
29+
def _register(group: str, proc: subprocess.Popen):
30+
_REGISTRY[group].append(proc)
31+
32+
33+
def _kill_tree(pid: int, timeout: int = 5):
34+
"""向 pid 及其所有子进程先 SIGTERM 再 SIGKILL"""
35+
try:
36+
parent = psutil.Process(pid)
37+
children = parent.children(recursive=True)
38+
except psutil.NoSuchProcess:
39+
return
40+
# 先发送 SIGTERM
41+
for p in children + [parent]:
42+
try:
43+
p.terminate()
44+
except psutil.NoSuchProcess:
45+
pass
46+
# 等待超时
47+
gone, alive = psutil.wait_procs(children + [parent], timeout=timeout)
48+
# 对还活着的强杀
49+
for p in alive:
50+
try:
51+
p.kill()
52+
except psutil.NoSuchProcess:
53+
pass
54+
55+
56+
@atexit.register
57+
def _graceful_shutdown():
58+
"""程序退出时一定被执行"""
59+
for group, procs in _REGISTRY.items():
60+
for p in procs:
61+
if p.poll() is None: # 还在跑
62+
logger.info(f"[{group}] 终止进程树 {p.pid}")
63+
_kill_tree(p.pid)
64+
2065

2166
def clear_flashinfer_cache():
2267
os.system("flashinfer clear-cache")
@@ -57,68 +102,47 @@ def pre_processing():
57102
clear_flashinfer_cache()
58103

59104

60-
def kill_child_processes(parent_pid, including_parent=False):
61-
"杀死子进程/僵尸进程"
62-
try:
63-
parent = psutil.Process(parent_pid)
64-
children = parent.children(recursive=True)
65-
for child in children:
66-
try:
67-
print(f"终止子进程 {child.pid}...")
68-
os.kill(child.pid, signal.SIGTERM) # 优雅终止
69-
child.wait(5) # 等待子进程最多 5 秒
70-
except psutil.NoSuchProcess:
71-
pass
72-
except psutil.TimeoutExpired():
73-
print(f"终止子进程 {child.pid} 超时!强制终止...")
74-
os.kill(child.pid, signal.SIGKILL) # 强制终止
75-
if including_parent:
76-
print(f"终止父进程 {parent_pid}...")
77-
os.kill(parent_pid, signal.SIGTERM)
78-
except psutil.NoSuchProcess:
79-
print(f"父进程 {parent_pid} 不存在!")
80-
81-
82-
# 记录父进程 PID
83-
parent_pid = os.getpid()
105+
_SHOULD_EXIT = False
84106

85107

86108
def signal_handler(signum, frame):
87-
print("\nCtrl-C detected! Cleaning up...")
88-
# kill_child_processes(parent_pid, including_parent=False)
89-
stop_server()
90-
exit(0) # 正常退出程序
109+
global _SHOULD_EXIT
110+
logger.info("Ctrl-C 收到,准备优雅退出…")
111+
_SHOULD_EXIT = True
91112

92113

93114
signal.signal(signal.SIGINT, signal_handler)
94115

95116

96-
def run_cmd(cmd: str, *args, **kwargs):
97-
logger.info(f"执行命令如下:\n{cmd}\n")
98-
# subprocess.run(cmd, shell=True)
99-
process = subprocess.Popen(cmd, shell=True)
100-
# 等待命令执行完成
101-
process.wait()
102-
return process.pid
117+
def run_cmd(cmd: str, group: str = "worker") -> subprocess.Popen:
118+
logger.info(f" 执行命令如下:\n{cmd}\n")
119+
# 不再用 shell=True 可以避免多一层 /bin/sh 进程;如果必须 shell=True 也能工作
120+
proc = subprocess.Popen(cmd, shell=True)
121+
_register(group, proc)
122+
# 不要 wait(),否则阻塞主线程
123+
return proc
103124

104125

105126
def start_controller(controller_host, controller_port, dispatch_method):
106-
"""启动fastchat控制器"""
107-
cmd = f"python -m gpt_server.serving.controller_v2 --host {controller_host} --port {controller_port} --dispatch-method {dispatch_method} "
108-
cmd += "> /dev/null 2>&1" # 完全静默(Linux/macOS)
109-
controller_process = Process(target=run_cmd, args=(cmd,))
110-
controller_process.start()
127+
cmd = (
128+
f"python -m gpt_server.serving.controller_v2 "
129+
f"--host {controller_host} --port {controller_port} "
130+
f"--dispatch-method {dispatch_method}"
131+
)
132+
cmd += "> /dev/null 2>&1"
133+
run_cmd(cmd, group="controller")
111134

112135

113136
def start_openai_server(host, port, controller_address, api_keys=None):
114-
"""启动openai api 服务"""
115137
os.environ["FASTCHAT_WORKER_API_EMBEDDING_BATCH_SIZE"] = "100000"
116-
117-
cmd = f"python -m gpt_server.serving.openai_api_server --host {host} --port {port} --controller-address {controller_address}"
138+
cmd = (
139+
f"python -m gpt_server.serving.openai_api_server "
140+
f"--host {host} --port {port} "
141+
f"--controller-address {controller_address}"
142+
)
118143
if api_keys:
119144
cmd += f" --api-keys {api_keys}"
120-
openai_server_process = Process(target=run_cmd, args=(cmd,))
121-
openai_server_process.start()
145+
run_cmd(cmd, group="openai")
122146

123147

124148
def start_api_server(config: dict):
@@ -171,7 +195,6 @@ def get_model_types():
171195

172196

173197
def start_model_worker(config: dict):
174-
process = []
175198
try:
176199
host = config["model_worker_args"]["host"]
177200
controller_address = config["model_worker_args"]["controller_address"]
@@ -318,13 +341,7 @@ def start_model_worker(config: dict):
318341
cmd += f" --vad_model '{punc_model}'"
319342
if hf_overrides:
320343
cmd += f" --hf_overrides '{json.dumps(hf_overrides)}'"
321-
p = Process(target=run_cmd, args=(cmd,))
322-
# p.start()
323-
process.append(p)
324-
for p in process:
325-
p.start()
326-
for p in process:
327-
p.join()
344+
proc = run_cmd(cmd, group="worker")
328345

329346

330347
def start_server(
@@ -355,30 +372,6 @@ def start_server(
355372
start_openai_server(host, port, controller_address, api_keys)
356373

357374

358-
def stop_controller():
359-
cmd = "ps -ef | grep fastchat.serve | awk '{print $2}' |xargs -I{} kill -9 {}"
360-
run_cmd(cmd=cmd)
361-
362-
363-
def stop_openai_server():
364-
cmd = "ps -ef | grep gpt_server |grep serving | awk '{print $2}' |xargs -I{} kill -9 {}"
365-
run_cmd(cmd=cmd)
366-
367-
368-
def stop_all_model_worker():
369-
cmd = "ps -ef | grep gpt_server |grep model_worker | awk '{print $2}' |xargs -I{} kill -9 {}"
370-
run_cmd(cmd=cmd)
371-
372-
373-
def stop_server():
374-
"""停止服务"""
375-
stop_all_model_worker()
376-
stop_controller()
377-
stop_openai_server()
378-
379-
logger.info("停止服务成功!")
380-
381-
382375
def delete_log():
383376
logs_path = os.environ.get("LOGDIR")
384377
logger.debug(f"logs_path: {logs_path}")
@@ -423,20 +416,6 @@ def get_physical_ip():
423416
except Exception as e:
424417
local_ip = ENV.get("local_ip", "127.0.0.1")
425418

426-
model_type_mapping = {
427-
"yi": "yi",
428-
"qwen": "qwen",
429-
"glm4": "chatglm",
430-
"chatglm3": "chatglm",
431-
"internvl2-internlm2": "internvl2",
432-
"internlm2": "internlm",
433-
"internlm": "internlm",
434-
"baichuan2": "baichuan",
435-
"llama3": "llama",
436-
"mistral": "mistral",
437-
"deepseek": "deepseek",
438-
}
439-
440419

441420
if __name__ == "__main__":
442421
# /home/dev/model/KirillR/QwQ-32B-Preview-AWQ

0 commit comments

Comments
 (0)