Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/CN/source/tutorial/api_server_args_zh.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ attention类型选择参数

多结果输出模式

.. option:: --schedule_time_interval

调度时间间隔,默认为 ``0.03``,单位为秒


输出约束参数
-----------
Expand Down
4 changes: 4 additions & 0 deletions docs/EN/source/tutorial/api_server_args_zh.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ Scheduling Parameters

Multi-result output mode

.. option:: --schedule_time_interval

Schedule time interval, default is ``0.03``, unit is seconds

Output Constraint Parameters
---------------------------

Expand Down
6 changes: 6 additions & 0 deletions lightllm/server/api_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,10 @@ def make_argument_parser() -> argparse.ArgumentParser:
default=None,
help="""Path of the kv quant calibration config. It can be used for llama and qwen model.""",
)
parser.add_argument(
"--schedule_time_interval",
type=float,
default=0.03,
help="""The interval of the schedule time, default is 30ms.""",
)
return parser
11 changes: 10 additions & 1 deletion lightllm/server/router/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, args: StartArgs, router_port, detokenization_port, metric_por
self.nnodes = args.nnodes
self.node_rank = args.node_rank
self.dp_size = args.dp
self.schedule_time_interval = args.schedule_time_interval # 默认30ms 的调度周期
# 兼容多机纯tp的运行模式,这时候 1 // 2 == 0, 需要兼容
self.dp_size_in_node = max(1, args.dp // self.nnodes)
self.is_multinode_tp = args.nnodes > 1 and args.dp == 1
Expand Down Expand Up @@ -195,6 +196,14 @@ async def wait_to_model_ready(self):

return

def _get_schedule_time_interval(self):
if self.running_batch is None:
# 没有运行中的 batch 时,每 10ms 触发一次请求调度
return 0.01

# dp 模式,为了更好的配平,需要更长的调度间隔,以便于能收到更多的请求
return self.schedule_time_interval

async def loop_for_fwd(
self,
):
Expand Down Expand Up @@ -249,7 +258,7 @@ async def loop_for_fwd(
logger.debug(f"dp_i {dp_i} frozen token num: {frozen_token_num} \n")
logger.debug(f"dp_i {dp_i} estimated_peak_token_count: {estimated_peak_token_count} \n")

await asyncio.sleep(0.03) # 30ms
await asyncio.sleep(self._get_schedule_time_interval())

async def _step(self):
"""
Expand Down
22 changes: 16 additions & 6 deletions lightllm/server/visualserver/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
cache_port,
visual_model_rpc_ports,
):
context = zmq.asyncio.Context(2)
context = zmq.Context(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The ZMQ context has been changed from zmq.asyncio.Context to zmq.Context. This is a critical issue because the sockets created from this context are used in an async environment.

  • self.recv_from_httpserver is used with await self.recv_from_httpserver.recv_pyobj(). A socket from a synchronous context is not awaitable and will block the event loop or raise an error.
  • self.send_to_next_module.send_pyobj() is called from an async function. Using a blocking socket here will block the entire event loop, severely impacting performance and responsiveness.
    Please revert this change to use the async context.
context = zmq.asyncio.Context(2)

self.send_to_next_module = context.socket(zmq.PUSH) # router or audio server (if --enable_multimodal_audio)
self.send_to_next_module.connect(f"{args.zmq_mode}127.0.0.1:{next_module_port}")

Expand Down Expand Up @@ -150,12 +150,22 @@ async def loop_for_fwd(self):
images_need_infer = []

async def loop_for_netio_req(self):
if not hasattr(self, "visual_recv_max_count"):
self.visual_recv_max_count = 64
Comment on lines +153 to +154
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is better practice to initialize instance attributes like visual_recv_max_count within the __init__ method. This makes the code more organized and easier to understand, as all initial state is defined in one place. Lazily initializing it here can make it harder to track the state of the object.

Consider moving self.visual_recv_max_count = 64 to the __init__ method.


while True:
recv_req: GroupReqIndexes = await self.recv_from_httpserver.recv_pyobj()
if isinstance(recv_req, GroupReqIndexes):
self.waiting_reqs.append(recv_req)
else:
assert False, f"Error Req Inf {recv_req}"
try:
for _ in range(self.visual_recv_max_count):
recv_req: GroupReqIndexes = self.recv_from_httpserver.recv_pyobj(zmq.NOBLOCK)
if isinstance(recv_req, GroupReqIndexes):
self.waiting_reqs.append(recv_req)
else:
assert False, f"Error Req Inf {recv_req}"
self.visual_recv_max_count = min(self.visual_recv_max_count * 1.3, 256)
except zmq.ZMQError:
# 当队列已经开始清空的时候,将一次接受数量下调
self.visual_recv_max_count = 64
await asyncio.sleep(0.01)

def clean_up(self):
for model_rpc in self.model_rpcs:
Expand Down