Skip to content

Commit b39fc5e

Browse files
author
wangzaijun
committed
fix process manager
1 parent 3bac7a2 commit b39fc5e

File tree

2 files changed

+42
-3
lines changed

2 files changed

+42
-3
lines changed

lightllm/server/router/model_infer/mode_backend/continues_batch/decode_node_impl/decode_kv_move_manager.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
logger = init_logger(__name__)
2525

26+
thread_local_data = threading.local()
27+
2628

2729
@dataclass
2830
class TransProcessObj:
@@ -68,8 +70,8 @@ def check_trans_process(self):
6870
def __del__(self):
6971
# 强制关闭连接和杀掉传输进程
7072
if self.process is not None:
71-
os.kill(self.process.pid, signal.SIGKILL)
7273
logger.warning(f"trans kv process {self.process.pid} is killed")
74+
os.kill(self.process.pid, signal.SIGKILL)
7375
pass
7476

7577

@@ -104,8 +106,24 @@ async def wait_all_future_finish(self, futures: List[AsyncResult]):
104106
await asyncio.gather(*[asyncio.to_thread(future.wait) for future in futures])
105107
return
106108

109+
def on_connect(self, conn):
110+
thread_local_data.prefill_node_id = None
111+
pass
112+
113+
def on_disconnect(self, conn):
114+
if thread_local_data.prefill_node_id is not None:
115+
self.node_id_to_trans_obj.pop(thread_local_data.prefill_node_id, None)
116+
logger.info(f"prefill node id {thread_local_data.prefill_node_id} disconnect")
117+
pass
118+
119+
def exposed_check_alive(self):
120+
# 用于 prefill node check 通信连接的状态。
121+
return
122+
107123
def exposed_build_trans_process(self, prefill_node_id, nccl_ip, nccl_port):
108124
prefill_node_id, nccl_ip, nccl_port = list(map(obtain, [prefill_node_id, nccl_ip, nccl_port]))
125+
thread_local_data.prefill_node_id = prefill_node_id
126+
109127
logger.info(f"build trans infos {prefill_node_id} {nccl_ip} {nccl_port}")
110128
if prefill_node_id in self.node_id_to_trans_obj:
111129
self.node_id_to_trans_obj.pop(prefill_node_id, None)
@@ -194,6 +212,8 @@ def handle_loop(self, task_queue: queue.Queue):
194212
asyncio.run(self.wait_all_future_finish(futures))
195213
logger.error(f"decode kv move task {task.to_decode_log_info()} has error, remove the trans_obj")
196214
self.node_id_to_trans_obj.pop(task.prefill_node_id, None)
215+
finally:
216+
# 去除引用否则进程无法自动退出
197217
trans_obj = None
198218
except BaseException as e:
199219
logger.exception(str(e))

lightllm/server/router/model_infer/mode_backend/continues_batch/prefill_node_impl/prefill_kv_move_manager.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import rpyc
33
import sys
44
import os
5+
import gc
56
import signal
67
import copy
78
import numpy as np
@@ -84,8 +85,8 @@ def check_trans_process(self):
8485
def __del__(self):
8586
# 强制关闭连接和杀掉传输进程
8687
if self.process is not None:
87-
os.kill(self.process.pid, signal.SIGKILL)
8888
logger.warning(f"prefill trans process {self.process.pid} is killed")
89+
os.kill(self.process.pid, signal.SIGKILL)
8990
pass
9091

9192

@@ -114,11 +115,28 @@ def get_next_device_index(self):
114115

115116
def get_trans_obj(self, task: KVMoveTask):
116117
if task.decode_node.node_id not in self.node_id_to_trans_obj:
118+
# 先遍历删除老的不能用的连接
119+
self.remove_dead_trans_obj()
117120
trans_obj = TransProcessObj()
118121
trans_obj.create(task.decode_node.node_id, task.decode_node.ip, task.decode_node.rpyc_port, self)
119122
self.node_id_to_trans_obj[task.decode_node.node_id] = trans_obj
120123
return self.node_id_to_trans_obj[task.decode_node.node_id]
121124

125+
def remove_dead_trans_obj(self):
126+
del_node_ids = []
127+
for node_id, t_obj in self.node_id_to_trans_obj.items():
128+
try:
129+
t_obj.rpyc_conn.root.check_alive()
130+
except BaseException as e:
131+
logger.error(f"check error {str(e)}")
132+
del_node_ids.append(node_id)
133+
for node_id in del_node_ids:
134+
self.node_id_to_trans_obj.pop(node_id)
135+
136+
if len(del_node_ids) != 0:
137+
gc.collect()
138+
return
139+
122140
def handle_loop(self):
123141
try:
124142
while True:
@@ -154,9 +172,10 @@ def handle_loop(self):
154172
logger.exception(str(e))
155173
logger.error(f"kv move task {move_task.to_prefill_log_info()} has error, remove the trans_obj")
156174
self.node_id_to_trans_obj.pop(move_task.decode_node.node_id, None)
157-
trans_obj = None
158175

159176
finally:
177+
# 去引用否则进程无法杀掉
178+
trans_obj = None
160179
# 解除对prefill token的占用状态。
161180
futures: List[AsyncResult] = []
162181
for infer_rpyc in self.infer_rpyc_objs:

0 commit comments

Comments
 (0)