Skip to content

Commit b619af8

Browse files
author
wangzaijun
committed
remove extra status
1 parent bdae2ef commit b619af8

File tree

4 files changed

+49
-65
lines changed

4 files changed

+49
-65
lines changed

lightllm/server/multi_level_kv_cache/cpu_cache_client.py

Lines changed: 37 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -111,67 +111,61 @@ def update_pages_status_to_ready(
111111
):
112112
offload_candidates: List[int] = []
113113
page_items = self.page_items.linked_items
114+
not_exist_none_page = True
114115
for page_index in page_list:
115116
if page_index != -1:
116117
cur_page: _CpuPageStatus = page_items[page_index]
117118
if cur_page.status < _CpuPageStatus.READY:
118119
cur_page.status = _CpuPageStatus.READY
119120

120-
# 全部落盘,已落盘前缀部分会在落盘中自动剔除
121-
if disk_offload_enable:
122-
offload_candidates.append(cur_page.self_index)
123-
124121
if deref:
125122
assert cur_page.ref_count > 0
126123
cur_page.ref_count -= 1
127124

128-
# 进入卸载队列的请求,引用计数加一,等卸载完成后再释放。
129-
if disk_offload_enable:
130-
cur_page.ref_count += 1
125+
# 全部落盘,已落盘前缀部分会在落盘中自动剔除
126+
if disk_offload_enable and not_exist_none_page:
127+
offload_candidates.append(cur_page.self_index)
128+
129+
else:
130+
not_exist_none_page = False
131131

132132
# 控制prompt长度,较短的prompt不进行disk offload
133133
limit_length = get_disk_cache_prompt_limit_length()
134+
134135
if (
135136
disk_offload_enable
136137
and offload_candidates
137-
and len(page_list) * self.args.cpu_cache_token_page_size < limit_length
138+
and len(page_list) * self.args.cpu_cache_token_page_size >= limit_length
138139
):
139-
logger.info(
140-
f"skip disk offload for small page, " f"length = {len(page_list) * self.args.cpu_cache_token_page_size}"
141-
)
142-
self.mark_pages_recyclable(page_list=offload_candidates)
143-
return
144-
145-
if disk_offload_enable and offload_candidates:
140+
# 加引用计数,落盘成功后再减掉
141+
for offload_page_index in offload_candidates:
142+
offload_page_item: _CpuPageStatus = page_items[offload_page_index]
143+
offload_page_item.ref_count += 1
146144
# 写入到 offload_page_indexes 中的数据是分组的,其中
147145
# 开头的元素标记后续多少个元素是一组,便于读取时进行分组处理
148146
# 写入数据为 group_page_size, page_index1, page_index2, ...
149147
self.offload_page_indexes.add_item(len(offload_candidates))
150148
self.offload_page_indexes.add_items(offload_candidates)
151149
return
152150

153-
def mark_pages_recyclable(self, page_list: List[int]):
154-
page_items = self.page_items.linked_items
155-
for page_index in page_list:
156-
if page_index == -1:
157-
continue
158-
cur_page = page_items[page_index]
159-
if cur_page.status >= _CpuPageStatus.READY:
160-
cur_page.status = _CpuPageStatus.READY_RECYCLE
161-
return
162-
163151
def query_one_page(self, hash_key: int) -> Tuple[Optional[int], bool]:
152+
"""
153+
返回的cpu page必然是数据ready可以被复用的。
154+
"""
164155
page_index = self.page_hash_dict.get(hash_key)
165156
if page_index is not None:
166157
page_item: _CpuPageStatus = self.page_items.get_item_by_index(page_index)
167-
page_item.ref_count += 1
168-
# lru 更新
169-
page_item.del_self_from_list()
170-
self.page_items.add_item_to_tail(index=page_index)
171158
if page_item.is_data_ready():
159+
page_item.ref_count += 1
160+
# lru 更新
161+
page_item.del_self_from_list()
162+
self.page_items.add_item_to_tail(index=page_index)
172163
return page_index, True
173164
else:
174-
return page_index, False
165+
# lru 更新
166+
page_item.del_self_from_list()
167+
self.page_items.add_item_to_tail(index=page_index)
168+
return None, False
175169
else:
176170
return None, False
177171

@@ -180,7 +174,7 @@ def check_allpages_ready(self, page_list: List[int]) -> bool:
180174
for page_index in page_list:
181175
if page_index == -1:
182176
continue
183-
page_item = page_items[page_index]
177+
page_item: _CpuPageStatus = page_items[page_index]
184178
if not page_item.is_data_ready():
185179
logger.info("cpu cache page %d not ready, status %d", page_index, page_item.status)
186180
return False
@@ -193,7 +187,7 @@ def deref_pages(self, page_list: List[int]):
193187
page_items = self.page_items.linked_items
194188
for page_index in page_list:
195189
if page_index != -1:
196-
page_item = page_items[page_index]
190+
page_item: _CpuPageStatus = page_items[page_index]
197191
assert page_item.ref_count > 0
198192
page_item.ref_count -= 1
199193
return
@@ -218,46 +212,37 @@ def get_pages_to_offloading(self) -> List[List[int]]:
218212
groups.append(page_list[index + 1 : index + 1 + group_size])
219213
for page_index in groups[-1]:
220214
page_item: _CpuPageStatus = page_items[page_index]
221-
# TODO 这个状态是否存在问题
222-
page_item.status = _CpuPageStatus.OFFLOADING
215+
assert page_item.is_ready()
223216

224217
index = index + 1 + group_size
225218

226219
return groups
227220

228-
def update_pages_status_to_ready_recycle(self, page_list: List[int], deref: bool = True):
229-
page_items = self.page_items.linked_items
230-
for page_index in page_list:
231-
if page_index != -1:
232-
cur_page = page_items[page_index]
233-
cur_page.status = _CpuPageStatus.READY_RECYCLE
234-
if deref:
235-
assert cur_page.ref_count > 0
236-
cur_page.ref_count -= 1
237-
return
238-
239221
def recycle_pages(self, page_list: List[int]):
222+
"""
223+
当从硬盘cache中读取数据失败时,调用此函数回收页面
224+
"""
240225
page_items = self.page_items.linked_items
241226
for page_index in page_list:
242227
if page_index == -1:
243228
continue
244-
cur_page = page_items[page_index]
229+
cur_page: _CpuPageStatus = page_items[page_index]
245230

246231
if cur_page.ref_count > 0:
247232
cur_page.ref_count -= 1
248233

249234
if cur_page.ref_count != 0:
250235
continue
251236

252-
if cur_page.hash_key != 0:
237+
if cur_page.is_loading():
253238
existing_index = self.page_hash_dict.get(cur_page.hash_key)
254239
if existing_index is not None and existing_index == cur_page.self_index:
255240
self.page_hash_dict.remove(cur_page.hash_key)
256241

257-
cur_page.del_self_from_list()
258-
cur_page.hash_key = 0
259-
cur_page.status = _CpuPageStatus.EMPTY
260-
self.page_items.add_item_to_tail(cur_page.self_index)
242+
cur_page.del_self_from_list()
243+
cur_page.hash_key = 0
244+
cur_page.status = _CpuPageStatus.EMPTY
245+
self.page_items.add_item_to_head(cur_page.self_index)
261246
return
262247

263248
def _create_cpu_status_list(self, init_shm_data: bool):
@@ -346,20 +331,11 @@ def is_loading(self):
346331
def is_ready(self):
347332
return self.status == self.READY
348333

349-
def is_offloading(self):
350-
return self.status == self.OFFLOADING
351-
352-
def is_ready_recycle(self):
353-
return self.status == self.READY_RECYCLE
354-
355334
def is_data_ready(self):
356335
"""
357336
判断数据是否是填充ok的,可能包含多种状态下属于数据是可填充的状态。
358337
"""
359338
return self.status >= self.READY
360339

361340
def can_realloc(self, disk_offload_enable: bool):
362-
if disk_offload_enable:
363-
return (self.is_empty() or self.is_ready_recycle()) and self.ref_count == 0
364-
else:
365-
return (self.is_empty() or self.is_data_ready()) and self.ref_count == 0
341+
return (self.is_empty() or self.is_data_ready()) and self.ref_count == 0

lightllm/server/multi_level_kv_cache/disk_cache_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def _persist_pages_to_disk(self, payloads: List[_PagePayload]) -> None:
134134
time.sleep(0.001)
135135

136136
self.cpu_cache_client.lock.acquire_sleep1ms()
137-
self.cpu_cache_client.update_pages_status_to_ready_recycle(page_list=page_indexes, deref=True)
137+
self.cpu_cache_client.deref_pages(page_list=page_indexes)
138138
self.cpu_cache_client.lock.release()
139139

140140
def query_loadable_pages(self, tokens: List[int], start_pos: int) -> int:

lightllm/server/multi_level_kv_cache/manager.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ def __init__(
3434
logger.info(f"send_to_router sendhwm {self.send_to_router.getsockopt(zmq.SNDHWM)}")
3535
self.cpu_cache_client = CpuKvCacheClient(only_create_meta_data=False, init_shm_data=True)
3636
self.shm_req_manager = ShmReqManager()
37+
self.only_cpu_cache_enable = args.enable_cpu_cache and not args.enable_disk_cache
3738
# 磁盘io在NVMe SSD上需要大量并发才能发挥性能
38-
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=500)
39+
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=6 if self.only_cpu_cache_enable else 500)
3940
# 控制进行 cpu cache 页面匹配的时间,超过时间则不再匹配,直接转发。
4041
self.cpu_cache_time_out = 0.5
4142
self.recv_queue = Queue(maxsize=1024)
@@ -128,8 +129,6 @@ def _disk_cache_match(self, token_hash_list: List[int], all_pages: List[int]) ->
128129
deref=False,
129130
disk_offload_enable=False,
130131
)
131-
if self.args.enable_disk_cache:
132-
self.cpu_cache_client.mark_pages_recyclable(new_page_indexes)
133132
self.cpu_cache_client.lock.release()
134133
return all_pages, len(new_page_indexes)
135134

lightllm/server/multi_level_kv_cache/shm_objs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ def add_item_to_tail(self, index: int):
102102
self.tail.pre_index = item.self_index
103103
return
104104

105+
def add_item_to_head(self, index: int):
106+
item = self.linked_items[index]
107+
next_node = self.linked_items[self.head.next_index]
108+
next_node.pre_index = item.self_index
109+
item.next_index = next_node.self_index
110+
item.pre_index = self.head.self_index
111+
self.head.next_index = item.self_index
112+
return
113+
105114
def get_item_by_index(self, index: int) -> "_LinkedListItem":
106115
item = self.linked_items[index]
107116
return item

0 commit comments

Comments
 (0)