Skip to content

Commit d14fcea

Browse files
authored
fix
1 parent 068663a commit d14fcea

File tree

7 files changed

+30
-18
lines changed

7 files changed

+30
-18
lines changed

lightllm/server/router/manager.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ async def wait_to_model_ready(self):
200200

201201
return
202202

203-
async def add_req(self, group_req_indexes: GroupReqIndexes):
203+
def add_req(self, group_req_indexes: GroupReqIndexes):
204204
req_group = []
205205
for req_index in group_req_indexes.shm_req_indexes:
206206
req = self.shm_req_manager.get_req_obj_by_index(req_index)
@@ -211,6 +211,7 @@ async def add_req(self, group_req_indexes: GroupReqIndexes):
211211
logger.info(f"router recive req id {req.request_id} cost time {time.time() - req.start_time} s")
212212
self.req_queue.extend(req_group)
213213
self.send_to_detokenization.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
214+
214215
return
215216

216217
async def loop_for_fwd(
@@ -262,18 +263,18 @@ async def get_schedule_result(self, running_batch: Batch):
262263
if self.schedule_task is None:
263264

264265
def get_new_batch():
265-
current_waiting_num = None
266+
limit_router_queue_length = None
266267
if self.nnodes > 1 and self.args.dp == 1:
267268
# 使用 all_reduce 获取最小值
268-
current_waiting_num = len(self.req_queue.waiting_req_list)
269-
current_waiting_num_tensor = torch.tensor(current_waiting_num, dtype=torch.int32, device="cpu")
270-
dist.all_reduce(current_waiting_num_tensor, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
271-
current_waiting_num = current_waiting_num_tensor.item()
269+
limit_router_queue_length = len(self.req_queue.waiting_req_list)
270+
limit_router_queue_length_tensor = torch.tensor(limit_router_queue_length, dtype=torch.int32, device="cpu")
271+
dist.all_reduce(limit_router_queue_length_tensor, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
272+
limit_router_queue_length = limit_router_queue_length_tensor.item()
272273

273274
self.overlap_event.wait(timeout=0.020)
274275
self.overlap_event.clear()
275276
time.sleep(0.003)
276-
new_batch = self.req_queue.generate_new_batch(running_batch, current_waiting_num)
277+
new_batch = self.req_queue.generate_new_batch(running_batch, limit_router_queue_length)
277278
return new_batch
278279

279280
self.schedule_task = asyncio.get_running_loop().run_in_executor(self.overlap_thread_pool, get_new_batch)
@@ -399,7 +400,7 @@ async def loop_for_netio_req(self):
399400
while True:
400401
recv_req: GroupReqIndexes = await self.recv_from_httpserver.recv_pyobj()
401402
if isinstance(recv_req, GroupReqIndexes):
402-
await self.add_req(recv_req)
403+
self.add_req(recv_req)
403404
else:
404405
assert False, f"Error Req Inf {recv_req}"
405406

@@ -408,7 +409,6 @@ def clean_up(self):
408409

409410

410411
def start_router_process(args, router_port, detokenization_port, model_rpc_ports, metric_port, pipe_writer):
411-
412412
# 注册 graceful 退出的处理
413413
graceful_registry(inspect.currentframe().f_code.co_name)
414414
start_parent_check_thread()

lightllm/server/router/req_queue/base_queue.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ def get_batch_dp_req_size(self, current_batch: Batch):
6969

7070
return len([req for req in current_batch.reqs if req.sample_params.suggested_dp_index == self.dp_index])
7171

72-
def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = None):
72+
def generate_new_batch(self, current_batch: Batch, limit_router_queue_length: int = None):
7373
"""
7474
args:
7575
current_batch: current batch
76-
current_waiting_num: the least length of waiting list across all nodes.
76+
limit_router_queue_length: the least length of waiting list across all nodes.
7777
It only works when nnodes > 1 and dp_size == 1.
7878
return:
7979
new batch

lightllm/server/router/req_queue/chunked_prefill/impl.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def _can_add_new_req(self, req: Req, is_busy, new_batch_first_router_need_tokens
5656
return False, new_batch_first_router_need_tokens
5757

5858
# @calculate_time(show=True, min_cost_ms=10)
59-
def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = None):
59+
def generate_new_batch(self, current_batch: Batch, limit_router_queue_length: int = None):
6060

6161
# 如果当前已经被调度的请求数量超过了上限,直接不调度新的请求了。
6262
exist_req_num = self.get_batch_dp_req_size(current_batch) + len(self.pause_req_dict)
@@ -74,7 +74,13 @@ def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = No
7474
can_run_list = []
7575
abort_req_list = []
7676
aborted_count = 0
77-
for req in self.waiting_req_list[:current_waiting_num]:
77+
78+
if limit_router_queue_length is None:
79+
waiting_queue = self.waiting_req_list
80+
else:
81+
waiting_queue = self.waiting_req_list[:limit_router_queue_length]
82+
83+
for req in waiting_queue:
7884
if req.is_aborted and not req.is_paused:
7985
# 由于管理的复杂性,只有没有被调度运行过的请求可以因为abort直接在队列中忽略掉.
8086
# 暂停的请求需要恢复后,由 router manager 部分来过滤。暂时保持这种处理方法, 否则会导致管理token的泄漏

lightllm/server/router/req_queue/continues_batch/beam_impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def _can_add_new_group_reqs(self, cur_handle_group_reqs: List[Req], is_busy, new
7676
return False, new_batch_first_router_need_tokens
7777

7878
# @calculate_time(show=True, min_cost_ms=10)
79-
def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = None):
79+
def generate_new_batch(self, current_batch: Batch, limit_router_queue_length: int = None):
8080
# 如果当前已经被调度的请求数量超过了上限,直接不调度新的请求了。
8181
exist_req_num = self.get_batch_dp_req_size(current_batch) + len(self.pause_req_dict)
8282
req_is_full = exist_req_num >= self.running_max_req_size

lightllm/server/router/req_queue/continues_batch/impl.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def _can_add_new_req(self, req: Req, is_busy, new_batch_first_router_need_tokens
6161
return False, new_batch_first_router_need_tokens
6262

6363
# @calculate_time(show=True, min_cost_ms=10)
64-
def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = None):
64+
def generate_new_batch(self, current_batch: Batch, limit_router_queue_length: int = None):
6565
# 如果当前已经被调度的请求数量超过了上限,直接不调度新的请求了。
6666
exist_req_num = self.get_batch_dp_req_size(current_batch) + len(self.pause_req_dict)
6767
req_is_full = exist_req_num >= self.running_max_req_size
@@ -76,7 +76,13 @@ def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = No
7676
abort_req_list = []
7777
new_batch_first_router_need_tokens = 0 # 主要是对 prefill 大块计算时候的token数量限制
7878
aborted_count = 0
79-
for req in self.waiting_req_list[:current_waiting_num]:
79+
80+
if limit_router_queue_length is None:
81+
waiting_queue = self.waiting_req_list
82+
else:
83+
waiting_queue = self.waiting_req_list[:limit_router_queue_length]
84+
85+
for req in waiting_queue:
8086
if req.is_aborted and not req.is_paused:
8187
# 由于管理的复杂性,只有没有被调度运行过的请求可以因为abort直接在队列中忽略掉.
8288
# 暂停的请求需要恢复后,由 router manager 部分来过滤。暂时保持这种处理方法, 否则会导致管理token和管理req对象的泄漏

lightllm/server/router/req_queue/continues_batch/pd_decode_impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def _init_cache_list(self, current_batch: Batch, is_busy):
2424
return
2525

2626
# @calculate_time(show=True, min_cost_ms=10)
27-
def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = None):
27+
def generate_new_batch(self, current_batch: Batch, limit_router_queue_length: int = None):
2828
# 如果当前已经被调度的请求数量超过了上限,直接不调度新的请求了。
2929
exist_req_num = self.get_batch_dp_req_size(current_batch) + len(self.pause_req_dict)
3030
req_is_full = exist_req_num >= self.running_max_req_size

lightllm/server/router/req_queue/dp_base_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def get_wait_req_num(self):
2727
return sum(queue.get_wait_req_num() for queue in self.inner_queues)
2828

2929
# @calculate_time(show=True, min_cost_ms=10)
30-
def generate_new_batch(self, current_batch: Batch, current_waiting_num: int = None):
30+
def generate_new_batch(self, current_batch: Batch, limit_router_queue_length: int = None):
3131
batches = [self.inner_queues[dp_index].generate_new_batch(current_batch) for dp_index in range(self.dp_size)]
3232
return self._merge_batch(batches)
3333

0 commit comments

Comments
 (0)