@@ -114,7 +114,7 @@ def recv_task_loop(self):
114114
115115 # 初次校验 time out
116116 if trans_task .time_out ():
117- trans_task .error_info = "time out"
117+ trans_task .error_info = "time out in recv_task_loop "
118118 self .failed_queue .put (trans_task )
119119 else :
120120 self .local_copy_kv_queue .put (trans_task )
@@ -177,33 +177,38 @@ def update_task_status_loop(
177177 except :
178178 notify_obj = None
179179
180- if isinstance (notify_obj , NIXLChunckedTransTaskRet ):
181- key = notify_obj .get_key ()
182- with self .waiting_dict_lock :
183- trans_task = self .waiting_dict .pop (key , None )
184-
185- if trans_task is not None :
186- trans_task .error_info = notify_obj .error_info
187- if trans_task .error_info is not None :
188- self .failed_queue .put (trans_task )
189- else :
190- self .success_queue .put (trans_task )
180+ if isinstance (notify_obj , NIXLChunckedTransTaskRet ):
181+ key = notify_obj .get_key ()
182+ with self .waiting_dict_lock :
183+ trans_task = self .waiting_dict .pop (key , None )
184+
185+ if trans_task is not None :
186+ trans_task .error_info = notify_obj .error_info
187+ if trans_task .error_info is not None :
188+ self .failed_queue .put (trans_task )
189+ else :
190+ self .success_queue .put (trans_task )
191191
192192 # check time_out update
193+ self ._check_tasks_time_out ()
194+
195+ def _check_tasks_time_out (self ):
196+ with self .waiting_dict_lock :
197+ keys = list (self .waiting_dict .keys ())
198+
199+ for key in keys :
193200 with self .waiting_dict_lock :
194- iter_keys = list ( self .waiting_dict .keys () )
201+ trans_task = self .waiting_dict .pop ( key , None )
195202
196- for key in iter_keys :
203+ if trans_task is not None and trans_task .time_out ():
204+ trans_task .error_info = "time out in update_task_status_loop"
205+ self .failed_queue .put (trans_task )
206+ continue
207+
208+ if trans_task is not None :
197209 with self .waiting_dict_lock :
198- trans_task = self .waiting_dict .pop (key , None )
199-
200- if trans_task is not None :
201- if trans_task .time_out ():
202- trans_task .error_info = "xfer time out"
203- self .failed_queue .put (trans_task )
204- else :
205- with self .waiting_dict_lock :
206- self .waiting_dict [trans_task .get_key ()] = trans_task
210+ self .waiting_dict [trans_task .get_key ()] = trans_task
211+ return
207212
208213
209214 @log_exception
0 commit comments