Skip to content

Commit 64d34ea

Browse files
author
wangzaijun
committed
fix
1 parent 8046e2a commit 64d34ea

File tree

3 files changed

+34
-33
lines changed

3 files changed

+34
-33
lines changed

lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_impl.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -90,38 +90,37 @@ def _filter_not_ready_reqs(self, req_ids: List[int]) -> List[InferReq]:
9090
ans_list : List[InferReq] = []
9191
for request_id in req_ids:
9292
req_obj: InferReq = g_infer_context.requests_mapping[request_id]
93-
if req_obj.infer_aborted:
94-
if req_obj.nixl_pd_task_num == (req_obj.nixl_pd_task_failed_num + req_obj.nixl_pd_task_sunccess_num):
95-
ans_list.append(req_obj)
93+
if req_obj.nixl_pd_task_num != (req_obj.nixl_pd_task_failed_num + req_obj.nixl_pd_task_sunccess_num):
9694
continue
97-
98-
if req_obj.nixl_pd_task_num == (req_obj.nixl_pd_task_failed_num + req_obj.nixl_pd_task_sunccess_num):
99-
if req_obj.nixl_pd_task_failed_num > 0:
100-
if not req_obj.finish_status.is_finished():
101-
# 强制停止
102-
req_obj.cur_output_len += 1
103-
req_obj.set_next_gen_token_id(0, 0.0, 1)
104-
req_obj.finish_status.set_status(FinishStatus.FINISHED_STOP)
105-
106-
if self.is_master_in_dp:
107-
req_obj.shm_req.shm_cur_output_len = req_obj.cur_output_len
108-
req_obj.shm_req.finish_token_index = req_obj.get_cur_total_len() - 1
109-
req_obj.shm_req.finish_status.set_status(FinishStatus.FINISHED_STOP)
110-
req_obj.shm_req.candetoken_out_len = req_obj.cur_output_len
111-
112-
logger.error(f"req_id: {req_obj.req_id} forced to finished, it exits kv transfer error")
113-
114-
# 提前释放有问题的 mem_index
115-
old_prefix_len = 0 if req_obj.shared_kv_node is None else req_obj.shared_kv_node.node_prefix_total_len
116-
error_mem_len = req_obj.cur_kv_len - old_prefix_len
95+
96+
if req_obj.nixl_pd_task_failed_num > 0:
97+
# 强制停止
98+
if not req_obj.finish_status.is_finished():
99+
req_obj.cur_output_len += 1
100+
req_obj.set_next_gen_token_id(0, 0.0, 1)
101+
req_obj.finish_status.set_status(FinishStatus.FINISHED_STOP)
102+
103+
if self.is_master_in_dp:
104+
req_obj.shm_req.shm_cur_output_len = req_obj.cur_output_len
105+
req_obj.shm_req.finish_token_index = req_obj.get_cur_total_len() - 1
106+
req_obj.shm_req.finish_status.set_status(FinishStatus.FINISHED_STOP)
107+
req_obj.shm_req.candetoken_out_len = req_obj.cur_output_len
108+
109+
logger.error(f"req_id: {req_obj.req_id} forced to finished, it exits kv transfer error")
110+
111+
# 提前释放有问题的 mem_index
112+
old_prefix_len = 0 if req_obj.shared_kv_node is None else req_obj.shared_kv_node.node_prefix_total_len
113+
error_mem_len = req_obj.cur_kv_len - old_prefix_len
114+
if error_mem_len > 0:
117115
req_obj.cur_kv_len -= error_mem_len
118116

119117
mem_indexes = self.model.req_manager.req_to_token_indexs[req_obj.req_idx, req_obj.cur_kv_len:(req_obj.cur_kv_len + error_mem_len)].detach().cpu()
120118
self.model.mem_manager.free(mem_indexes)
121119
if self.is_master_in_dp:
122120
req_obj.shm_req.shm_cur_kv_len = req_obj.cur_kv_len
123-
else:
124-
ans_list.append(req_obj)
121+
122+
ans_list.append(req_obj)
123+
125124
g_infer_state_lock.release()
126125
return ans_list
127126

lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_trans_process.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def accept_peer_task_loop(
152152
# notify update
153153
notifies_dict = self.transporter.get_new_notifs()
154154
if not notifies_dict:
155+
self._check_tasks_time_out()
155156
time.sleep(0.005)
156157
continue
157158

@@ -181,7 +182,7 @@ def accept_peer_task_loop(
181182
local_trans_task.prefill_page_reg_desc = remote_trans_task.prefill_page_reg_desc
182183

183184
self.read_peer_kv_queue.put(local_trans_task)
184-
185+
185186
self._check_tasks_time_out()
186187

187188

lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_impl.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,19 @@ def _filter_not_ready_reqs(self, req_ids: List[int]) -> List[InferReq]:
4242
ans_list : List[InferReq] = []
4343
for request_id in req_ids:
4444
req_obj: InferReq = g_infer_context.requests_mapping[request_id]
45-
if req_obj.infer_aborted:
46-
if req_obj.nixl_pd_task_num == (req_obj.nixl_pd_task_failed_num + req_obj.nixl_pd_task_sunccess_num):
47-
ans_list.append(req_obj)
48-
continue
49-
5045
prefill_finished = req_obj.shm_req.input_len <= req_obj.cur_kv_len
5146
if prefill_finished:
52-
# 所有传输任务都已经完成
47+
# 等待所有传输任务都已经完成
5348
if req_obj.nixl_pd_task_num == (req_obj.nixl_pd_task_failed_num + req_obj.nixl_pd_task_sunccess_num):
5449
ans_list.append(req_obj)
5550
else:
56-
ans_list.append(req_obj)
51+
if req_obj.infer_aborted:
52+
if req_obj.nixl_pd_task_num == (req_obj.nixl_pd_task_failed_num + req_obj.nixl_pd_task_sunccess_num):
53+
ans_list.append(req_obj)
54+
else:
55+
continue
56+
else:
57+
ans_list.append(req_obj)
5758
return ans_list
5859

5960
def _prefill_chuncked_handle_func(self, req_obj: InferReq, next_token_id: int, next_token_prob: float, output_len: int):

0 commit comments

Comments
 (0)