@@ -108,7 +108,7 @@ def __init__(self, args: StartArgs, router_port, detokenization_port, metric_por
108108 g_router_lock .obj = self .router_lock
109109
110110 # 调度和推理进行折叠使用的线程池
111- self .schedule_new_batch : Batch = None
111+ self .schedule_new_batch : Batch = None
112112 self .schedule_lock = asyncio .Lock ()
113113 self .schedule_sem = asyncio .Semaphore (1 )
114114 return
@@ -294,9 +294,11 @@ def generate_new_batch(self):
294294 limit_router_queue_length_tensor = torch .tensor (limit_router_queue_length , dtype = torch .int32 , device = "cpu" )
295295 dist .all_reduce (limit_router_queue_length_tensor , op = dist .ReduceOp .MIN , group = self .mulitnode_group )
296296 limit_router_queue_length = limit_router_queue_length_tensor .item ()
297-
297+
298298 # 调度的时候需要考虑当前运行的batch,和调度了但是暂时还没有推理的部分请求。
299- new_batch = self .req_queue .generate_new_batch (Batch .merge (self .running_batch , self .schedule_new_batch ), limit_router_queue_length )
299+ new_batch = self .req_queue .generate_new_batch (
300+ Batch .merge (self .running_batch , self .schedule_new_batch ), limit_router_queue_length
301+ )
300302 self .schedule_new_batch = Batch .merge (self .schedule_new_batch , new_batch )
301303 return
302304
@@ -406,7 +408,6 @@ async def loop_for_netio_req(self):
406408 pass
407409
408410 # 调度新的 batch
409-
410411 self .generate_new_batch ()
411412 await asyncio .sleep (0.005 )
412413
@@ -451,4 +452,3 @@ def handle_exception(loop, context):
451452 loop .create_task (router .loop_for_fwd ())
452453 loop .run_until_complete (router .loop_for_netio_req ())
453454 return
454-
0 commit comments