@@ -110,7 +110,6 @@ def __init__(self, args: StartArgs, router_port, detokenization_port, metric_por
110110 # 调度和推理进行折叠使用的线程池
111111 self .overlap_thread_pool = concurrent .futures .ThreadPoolExecutor (max_workers = 1 )
112112 self .schedule_task = None
113- self .overlap_event = threading .Event ()
114113 return
115114
116115 async def wait_to_model_ready (self ):
@@ -288,8 +287,12 @@ async def loop_for_fwd(
288287
289288 async def get_schedule_result (self , running_batch : Batch ):
290289 if self .schedule_task is None :
290+ _start_time = time .time ()
291291
292292 def get_new_batch ():
293+ if time .time () - _start_time < 0.001 :
294+ time .sleep (0.003 )
295+
293296 limit_router_queue_length = None
294297 if self .is_multinode_tp :
295298 # 使用 all_reduce 获取最小值
@@ -300,9 +303,6 @@ def get_new_batch():
300303 dist .all_reduce (limit_router_queue_length_tensor , op = dist .ReduceOp .MIN , group = self .mulitnode_group )
301304 limit_router_queue_length = limit_router_queue_length_tensor .item ()
302305
303- self .overlap_event .wait (timeout = 0.020 )
304- self .overlap_event .clear ()
305- time .sleep (0.003 )
306306 new_batch = self .req_queue .generate_new_batch (running_batch , limit_router_queue_length )
307307 return new_batch
308308
@@ -320,7 +320,7 @@ async def _step(self):
320320 # 删除所有已经 finished 的 req
321321 # 当前无运行请求时
322322 if self .running_batch is None :
323- new_batch = await self .get_schedule_result (self .running_batch )
323+ new_batch : Batch = await self .get_schedule_result (self .running_batch )
324324 if new_batch is not None :
325325 self .metric_client .histogram_observe ("lightllm_batch_next_size" , len (new_batch .reqs ))
326326 for req in new_batch .reqs :
@@ -383,7 +383,6 @@ async def _prefill_batch(self, batch: Batch):
383383 start_time = time .time ()
384384 self .metric_client .counter_inc ("lightllm_batch_inference_count" , "prefill" )
385385 reqs = [r .to_router_rpc_obj () for r in batch .reqs ]
386- self .overlap_event .set ()
387386 await self .model_rpc_client .prefill (reqs )
388387 batch .filter_out_finished_req (self .shm_req_manager )
389388 self ._send_detokenization_pack ()
@@ -397,7 +396,6 @@ async def _prefill_batch(self, batch: Batch):
397396 async def _decode_batch (self , batch : Batch ):
398397 start_time = time .time ()
399398 self .metric_client .counter_inc ("lightllm_batch_inference_count" , "decode" )
400- self .overlap_event .set ()
401399 await self .model_rpc_client .decode ()
402400 # 在 self.is_multinode_and_multidp 为 True 时,传入的 batch 对象可能为 None。
403401 if batch is not None :
0 commit comments