Skip to content

Commit 6e6092f

Browse files
author
wangzaijun
committed
fix cpu cache aborted
1 parent 1bc6a65 commit 6e6092f

File tree

5 files changed

+14
-7
lines changed

5 files changed

+14
-7
lines changed

lightllm/server/router/req_queue/base_queue.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
from lightllm.server.core.objs import FinishStatus
55
from lightllm.common.basemodel.infer_lock import g_router_lock
66
from lightllm.utils.config_utils import get_fixed_kv_len
7+
from lightllm.server.core.objs import StartArgs
78

89

910
class BaseQueue:
10-
def __init__(self, args, router, dp_index, dp_size_in_node) -> None:
11+
def __init__(self, args: StartArgs, router, dp_index, dp_size_in_node) -> None:
1112
self.args = args
1213
self.dp_index = dp_index
1314
self.dp_size_in_node = dp_size_in_node
@@ -26,6 +27,13 @@ def __init__(self, args, router, dp_index, dp_size_in_node) -> None:
2627
self.router_token_ratio = args.router_token_ratio # ratio to determine whether the router is busy
2728
self.router_max_new_token_len = args.router_max_new_token_len
2829

30+
def free_aborted_req_cpu_cache_pages(self, req: Req):
31+
if self.args.enable_cpu_cache:
32+
self.router.cpu_cache_client.lock.acquire_sleep1ms()
33+
self.router.cpu_cache_client.deref_pages(req.cpu_cache_match_page_indexes.get_all())
34+
req.cpu_cache_match_page_indexes.clear()
35+
self.router.cpu_cache_client.lock.release()
36+
2937
def extend(self, req_group: List[Req]):
3038
for req in req_group:
3139
req.sample_params.suggested_dp_index = self.dp_index

lightllm/server/router/req_queue/chunked_prefill/beam_impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def generate_new_batch(self, current_batch: Batch):
127127
new_batch = Batch(uuid.uuid4().int, can_run_list, dp_size_in_node=self.dp_size_in_node)
128128

129129
for req in abort_req_list:
130+
self.free_aborted_req_cpu_cache_pages(req)
130131
self.router.shm_req_manager.put_back_req_obj(req)
131132
self.waiting_req_list = self.waiting_req_list[len(can_run_list) + aborted_count :]
132133
return new_batch

lightllm/server/router/req_queue/chunked_prefill/impl.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,8 @@ def generate_new_batch(self, current_batch: Batch):
7878
aborted_count = 0
7979

8080
waiting_queue = self.waiting_req_list
81-
# 在开启 cpu cache 功能的情况下,由于multi_level_kv_cache 模块会对请求申请一些cpu kv cache
82-
# 页面,这些页面的释放是在推理进程中完成的,所以如果直接在调度的时候就退出,会导致这些页面无法回收
83-
# ,所以在使能 cpu cache 的情况下,不在调度的过程中进行 cpu cache页面的释放,而是延迟到推理的
84-
# 过程中进行回收
85-
disable_queue_aborted = get_env_start_args().enable_cpu_cache
8681
for req in waiting_queue:
87-
if req.is_aborted and not disable_queue_aborted:
82+
if req.is_aborted:
8883
# 由于管理的复杂性,只有没有被调度运行过的请求可以因为abort直接在队列中忽略掉.
8984
# 暂停的请求需要恢复后,由 router manager 部分来过滤。暂时保持这种处理方法, 否则会导致管理token的泄漏
9085
aborted_count += 1
@@ -101,6 +96,7 @@ def generate_new_batch(self, current_batch: Batch):
10196
if len(can_run_list) != 0:
10297
new_batch = Batch(uuid.uuid4().int, can_run_list, dp_size_in_node=self.dp_size_in_node)
10398
for req in abort_req_list:
99+
self.free_aborted_req_cpu_cache_pages(req)
104100
self.router.shm_req_manager.put_back_req_obj(req)
105101
self.waiting_req_list = self.waiting_req_list[len(can_run_list) + aborted_count :]
106102
return new_batch

lightllm/server/router/req_queue/chunked_prefill/impl_for_nixl_pd.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def generate_new_batch(self, current_batch: Batch):
8787
if len(can_run_list) != 0:
8888
new_batch = Batch(uuid.uuid4().int, can_run_list, dp_size_in_node=self.dp_size_in_node)
8989
for req in abort_req_list:
90+
self.free_aborted_req_cpu_cache_pages(req)
9091
self.router.shm_req_manager.put_back_req_obj(req)
9192
self.waiting_req_list = self.waiting_req_list[len(can_run_list) + aborted_count :]
9293
return new_batch

lightllm/server/router/req_queue/chunked_prefill/impl_for_pd_decode.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def generate_new_batch(self, current_batch: Batch):
5252
if len(can_run_list) != 0:
5353
new_batch = Batch(uuid.uuid4().int, can_run_list, dp_size_in_node=self.dp_size_in_node)
5454
for req in abort_req_list:
55+
self.free_aborted_req_cpu_cache_pages(req)
5556
self.router.shm_req_manager.put_back_req_obj(req)
5657
self.waiting_req_list = self.waiting_req_list[len(can_run_list) + aborted_count :]
5758
return new_batch

0 commit comments

Comments
 (0)