Skip to content

Commit 8046e2a

Browse files
author
wangzaijun
committed
fix
1 parent b624959 commit 8046e2a

File tree

1 file changed

+34
-9
lines changed

1 file changed

+34
-9
lines changed

lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_trans_process.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,18 @@ def __init__(
111111
def recv_task_loop(self):
112112
while True:
113113
trans_task_group: NIXLChunckedTransTaskGroup = self.task_in_queue.get()
114+
115+
with self.waiting_dict_lock:
116+
for task in trans_task_group.task_list:
117+
if task.transfer_kv_num() != 0:
118+
self.waiting_dict[task.get_key()] = task
119+
else:
120+
task.start_trans_time = time.time()
121+
self.success_queue.put((None, task))
122+
123+
# up status
114124
task = trans_task_group.task_list[0]
125+
115126
decode_node_info = NIXLDecodeNodeInfo(
116127
decode_node_id=self.args.pd_node_id,
117128
pd_master_node_id=task.pd_master_node_id,
@@ -131,14 +142,6 @@ def recv_task_loop(self):
131142

132143
self.up_status_in_queue.put(up_status)
133144

134-
with self.waiting_dict_lock:
135-
for task in trans_task_group.task_list:
136-
if task.transfer_kv_num() != 0:
137-
self.waiting_dict[task.get_key()] = task
138-
else:
139-
task.start_trans_time = time.time()
140-
self.success_queue.put((None, task))
141-
142145
@log_exception
143146
def accept_peer_task_loop(
144147
self,
@@ -178,6 +181,28 @@ def accept_peer_task_loop(
178181
local_trans_task.prefill_page_reg_desc = remote_trans_task.prefill_page_reg_desc
179182

180183
self.read_peer_kv_queue.put(local_trans_task)
184+
185+
self._check_tasks_time_out()
186+
187+
188+
def _check_tasks_time_out(self):
189+
# check time_out update
190+
with self.waiting_dict_lock:
191+
keys = list(self.waiting_dict.keys())
192+
193+
for key in keys:
194+
with self.waiting_dict_lock:
195+
trans_task = self.waiting_dict.pop(key, None)
196+
197+
if trans_task is not None and trans_task.time_out():
198+
trans_task.error_info = "time out in accept_peer_task_loop"
199+
self.failed_queue.put(trans_task)
200+
continue
201+
202+
if trans_task is not None:
203+
with self.waiting_dict_lock:
204+
self.waiting_dict[trans_task.get_key()] = trans_task
205+
return
181206

182207

183208
@log_exception
@@ -190,7 +215,7 @@ def read_peer_kv_loop(self):
190215
local_trans_task.nixl_dst_page_index = page_index
191216

192217
if local_trans_task.time_out():
193-
local_trans_task.error_info = "time out"
218+
local_trans_task.error_info = "time out in read_peer_kv_loop"
194219
self.failed_queue.put(local_trans_task)
195220
continue
196221

0 commit comments

Comments
 (0)