diff --git a/docs/CN/source/tutorial/api_server_args_zh.rst b/docs/CN/source/tutorial/api_server_args_zh.rst index f40528bba..ec4d67c53 100755 --- a/docs/CN/source/tutorial/api_server_args_zh.rst +++ b/docs/CN/source/tutorial/api_server_args_zh.rst @@ -236,6 +236,10 @@ attention类型选择参数 多结果输出模式 +.. option:: --schedule_time_interval + + 调度时间间隔,默认为 ``0.03``,单位为秒 + 输出约束参数 ----------- diff --git a/docs/EN/source/tutorial/api_server_args_zh.rst b/docs/EN/source/tutorial/api_server_args_zh.rst index 629d34bf8..0769cef55 100755 --- a/docs/EN/source/tutorial/api_server_args_zh.rst +++ b/docs/EN/source/tutorial/api_server_args_zh.rst @@ -236,6 +236,10 @@ Scheduling Parameters Multi-result output mode +.. option:: --schedule_time_interval + + Schedule time interval, default is ``0.03``, unit is seconds + Output Constraint Parameters --------------------------- diff --git a/lightllm/server/api_cli.py b/lightllm/server/api_cli.py index 465f9cc92..dfbf0c84b 100644 --- a/lightllm/server/api_cli.py +++ b/lightllm/server/api_cli.py @@ -471,4 +471,10 @@ def make_argument_parser() -> argparse.ArgumentParser: default=None, help="""Path of the kv quant calibration config. It can be used for llama and qwen model.""", ) + parser.add_argument( + "--schedule_time_interval", + type=float, + default=0.03, + help="""The interval of the schedule time, default is 30ms.""", + ) return parser diff --git a/lightllm/server/router/manager.py b/lightllm/server/router/manager.py index 96ed65515..24b8a9ddb 100644 --- a/lightllm/server/router/manager.py +++ b/lightllm/server/router/manager.py @@ -41,6 +41,7 @@ def __init__(self, args: StartArgs, router_port, detokenization_port, metric_por self.nnodes = args.nnodes self.node_rank = args.node_rank self.dp_size = args.dp + self.schedule_time_interval = args.schedule_time_interval # 默认30ms 的调度周期 # 兼容多机纯tp的运行模式,这时候 1 // 2 == 0, 需要兼容 self.dp_size_in_node = max(1, args.dp // self.nnodes) self.is_multinode_tp = args.nnodes > 1 and args.dp == 1 @@ -195,6 +196,14 @@ async def wait_to_model_ready(self): return + def _get_schedule_time_interval(self): + if self.running_batch is None: + # 没有运行中的 batch 时,每 10ms 触发一次请求调度 + return 0.01 + + # dp 模式,为了更好的配平,需要更长的调度间隔,以便于能收到更多的请求 + return self.schedule_time_interval + async def loop_for_fwd( self, ): @@ -249,7 +258,7 @@ async def loop_for_fwd( logger.debug(f"dp_i {dp_i} frozen token num: {frozen_token_num} \n") logger.debug(f"dp_i {dp_i} estimated_peak_token_count: {estimated_peak_token_count} \n") - await asyncio.sleep(0.03) # 30ms + await asyncio.sleep(self._get_schedule_time_interval()) async def _step(self): """ diff --git a/lightllm/server/visualserver/manager.py b/lightllm/server/visualserver/manager.py index 7a4557c47..4a3dec826 100644 --- a/lightllm/server/visualserver/manager.py +++ b/lightllm/server/visualserver/manager.py @@ -30,7 +30,7 @@ def __init__( cache_port, visual_model_rpc_ports, ): - context = zmq.asyncio.Context(2) + context = zmq.Context(2) self.send_to_next_module = context.socket(zmq.PUSH) # router or audio server (if --enable_multimodal_audio) self.send_to_next_module.connect(f"{args.zmq_mode}127.0.0.1:{next_module_port}") @@ -150,12 +150,22 @@ async def loop_for_fwd(self): images_need_infer = [] async def loop_for_netio_req(self): + if not hasattr(self, "visual_recv_max_count"): + self.visual_recv_max_count = 64 + while True: - recv_req: GroupReqIndexes = await self.recv_from_httpserver.recv_pyobj() - if isinstance(recv_req, GroupReqIndexes): - self.waiting_reqs.append(recv_req) - else: - assert False, f"Error Req Inf {recv_req}" + try: + for _ in range(self.visual_recv_max_count): + recv_req: GroupReqIndexes = self.recv_from_httpserver.recv_pyobj(zmq.NOBLOCK) + if isinstance(recv_req, GroupReqIndexes): + self.waiting_reqs.append(recv_req) + else: + assert False, f"Error Req Inf {recv_req}" + self.visual_recv_max_count = min(self.visual_recv_max_count * 1.3, 256) + except zmq.ZMQError: + # 当队列已经开始清空的时候,将一次接受数量下调 + self.visual_recv_max_count = 64 + await asyncio.sleep(0.01) def clean_up(self): for model_rpc in self.model_rpcs: