Skip to content

Commit 01f8ac6

Browse files
author
niushengxiao
committed
fix
1 parent 251812d commit 01f8ac6

File tree

7 files changed

+39
-46
lines changed

7 files changed

+39
-46
lines changed

lightllm/server/core/objs/start_args_type.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class StartArgs:
106106
pd_node_id: int = field(default=-1)
107107
enable_cpu_cache: bool = field(default=False)
108108
cpu_cache_storage_size: float = field(default=2)
109-
cpu_cache_token_page_size: int = field(default=256)
109+
cpu_cache_token_page_size: int = field(default=64)
110110
enable_disk_cache: bool = field(default=False)
111111
disk_cache_storage_size: float = field(default=10)
112112
# zmp ports

lightllm/server/multi_level_kv_cache/shm_objs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,11 @@ def init(self):
261261
self.value = -1
262262

263263

264-
def _create_shm(name: str, byte_size: int):
264+
def _create_shm(name: str, byte_size: int, auto_cleanup: bool = False):
265265
try:
266266
shm = shared_memory.SharedMemory(name=name, create=True, size=byte_size)
267-
register_posix_shm_for_cleanup(name)
267+
if auto_cleanup:
268+
register_posix_shm_for_cleanup(name)
268269
logger.info(f"create lock shm {name}")
269270
except:
270271
shm = shared_memory.SharedMemory(name=name, create=False, size=byte_size)

lightllm/server/router/dynamic_prompt/shared_arr.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from multiprocessing import shared_memory
66
from lightllm.utils.log_utils import init_logger
77
from lightllm.utils.shm_utils import create_or_link_shm
8-
from lightllm.utils.auto_shm_cleanup import register_posix_shm_for_cleanup
98

109
logger = init_logger(__name__)
1110

@@ -15,27 +14,6 @@ def __init__(self, name, shape, dtype):
1514
dtype_byte_num = np.array([1], dtype=dtype).dtype.itemsize
1615
dest_size = np.prod(shape) * dtype_byte_num
1716
self.shm = create_or_link_shm(name, dest_size)
18-
try:
19-
shm = shared_memory.SharedMemory(name=name, create=True, size=dest_size)
20-
logger.info(f"create shm {name}")
21-
register_posix_shm_for_cleanup(name)
22-
except:
23-
shm = shared_memory.SharedMemory(name=name, create=False, size=dest_size)
24-
logger.info(f"link shm {name}")
25-
26-
if shm.size != dest_size:
27-
logger.info(f"size not same, unlink shm {name} and create again")
28-
shm.unlink()
29-
shm.close()
30-
try:
31-
shm = shared_memory.SharedMemory(name=name, create=True, size=dest_size)
32-
logger.info(f"create shm {name}")
33-
register_posix_shm_for_cleanup(name)
34-
except Exception as e:
35-
shm = shared_memory.SharedMemory(name=name, create=False, size=dest_size)
36-
logger.info(f"error {str(e)} to link shm {name}")
37-
38-
self.shm = shm # SharedMemory 对象一定要被持有,否则会被释放
3917
self.arr = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf)
4018

4119

lightllm/server/router/model_infer/mode_backend/base_backend.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,9 @@ def _read_reqs_buffer_and_init_reqs(self):
367367
else:
368368
assert False, f"error type {type(obj)}"
369369
if init_reqs:
370-
self._init_reqs(reqs=init_reqs)
371-
if self.args.enable_cpu_cache:
372-
self._fill_cpu_cache_to_reqs(req_ids=init_reqs)
370+
req_ids = self._init_reqs(reqs=init_reqs)
371+
if self.args.enable_cpu_cache and req_ids:
372+
self._fill_cpu_cache_to_reqs(req_ids=req_ids)
373373
return
374374

375375
def _read_nixl_trans_io_buffer_and_update_req_status(self):
@@ -403,7 +403,6 @@ def _read_nixl_trans_io_buffer_and_update_req_status(self):
403403
is_master_in_dp=self.is_master_in_dp,
404404
nixl_prefill_chuncked_handle_func=None,
405405
)
406-
407406
return
408407

409408
# 一些可以复用的通用功能函数

lightllm/server/router/model_infer/mode_backend/multi_level_kv_cache.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@ def wait_for_init(self):
3535
if attach_shm_handle is not None:
3636
attach_shm_handle.wait()
3737

38-
def _compute_full_sequence_hash(self, req: InferReq):
38+
def _compute_sequence_hash(self, req: InferReq):
39+
# 综合考虑后只对prompt做缓存管理,不包含decode内容,这里与radix cache不一致
40+
if not req.shm_req.token_hash_list.is_empty():
41+
return req.shm_req.token_hash_list.get_all()
42+
3943
input_tokens = req.shm_req.get_prompt_ids()
40-
total_len = req.shm_req.input_len + req.shm_req.shm_cur_output_len
41-
if total_len > req.shm_req.input_len:
42-
full_sequence = req.shm_req.shm_prompt_ids.arr[:total_len].tolist()
43-
else:
44-
full_sequence = input_tokens
45-
return compute_token_list_hash(full_sequence, self.args.cpu_cache_token_page_size)
44+
return compute_token_list_hash(input_tokens, self.args.cpu_cache_token_page_size)
4645

4746
def handle_finished_reqs(self, finished_reqs: List[InferReq]) -> List[InferReq]:
4847
"""
@@ -98,9 +97,9 @@ def _start_kv_cache_offload_task(
9897
) -> Optional["TransTask"]:
9998
with torch.cuda.stream(cpu_kv_cache_stream):
10099
if self.backend.is_master_in_dp:
101-
all_token_hash_list = self._compute_full_sequence_hash(req)
100+
token_hash_list = self._compute_sequence_hash(req)
102101
block_size = req.cur_kv_len // self.args.cpu_cache_token_page_size
103-
move_block_size = min(block_size, len(all_token_hash_list))
102+
move_block_size = min(block_size, len(token_hash_list))
104103

105104
if move_block_size == 0:
106105
dist.broadcast_object_list([0], group=self.gloo_group, group_src=0)
@@ -110,7 +109,7 @@ def _start_kv_cache_offload_task(
110109
try:
111110
self.cpu_cache_client.lock.acquire_sleep1ms()
112111
page_list, ready_list = self.cpu_cache_client.allocate_pages(
113-
all_token_hash_list[:move_block_size],
112+
token_hash_list[:move_block_size],
114113
disk_offload_enable=self.args.enable_disk_cache,
115114
)
116115
finally:

lightllm/utils/auto_shm_cleanup.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import ctypes
33
import atexit
44
import signal
5+
import threading
56
import psutil
67
from multiprocessing import shared_memory
78
from typing import Set, Optional
@@ -11,6 +12,10 @@
1112

1213

1314
class AutoShmCleanup:
15+
"""
16+
自动清理 System V 和 POSIX 共享内存
17+
shared_memory.SharedMemory虽然有自动请理功能,但如果自动清理时仍有进程占用会清理失败,这里可做最后兜底清理
18+
"""
1419
def __init__(self):
1520
self.libc = None
1621
self._init_libc()
@@ -19,6 +24,7 @@ def __init__(self):
1924
self.registered_shm_ids = []
2025
# POSIX
2126
self.registered_posix_shm_names = []
27+
self.signal_handlers_registered = False
2228
self._register_handlers_for_cleanup()
2329

2430
def _init_libc(self):
@@ -34,8 +40,14 @@ def _init_libc(self):
3440

3541
def _register_handlers_for_cleanup(self):
3642
atexit.register(self._cleanup)
43+
self.register_signal_handlers()
44+
45+
def register_signal_handlers(self):
46+
if self.signal_handlers_registered or not threading.current_thread() is threading.main_thread():
47+
return
3748
for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP):
3849
signal.signal(sig, self._signal_cleanup_handler)
50+
self.signal_handlers_registered = True
3951

4052
def _signal_cleanup_handler(self, signum, frame):
4153
self._cleanup()
@@ -110,6 +122,7 @@ def get_auto_cleanup() -> AutoShmCleanup:
110122
global _auto_cleanup
111123
if _auto_cleanup is None:
112124
_auto_cleanup = AutoShmCleanup()
125+
_auto_cleanup.register_signal_handlers()
113126
return _auto_cleanup
114127

115128

@@ -118,4 +131,4 @@ def register_sysv_shm_for_cleanup(key: int, shmid: Optional[int] = None):
118131

119132

120133
def register_posix_shm_for_cleanup(name: str):
121-
get_auto_cleanup().register_posix_shm(name)
134+
get_auto_cleanup().register_posix_shm(name)

lightllm/utils/shm_utils.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from multiprocessing import shared_memory
22
from filelock import FileLock
33
from lightllm.utils.log_utils import init_logger
4+
from lightllm.utils.auto_shm_cleanup import register_posix_shm_for_cleanup
45

56
logger = init_logger(__name__)
67

78

8-
def create_or_link_shm(name, expected_size, force_mode=None):
9+
def create_or_link_shm(name, expected_size, force_mode=None, auto_cleanup=False):
910
"""
1011
Args:
1112
name: name of the shared memory
@@ -26,15 +27,15 @@ def create_or_link_shm(name, expected_size, force_mode=None):
2627

2728
if force_mode == "create":
2829
with FileLock(lock_name):
29-
return _force_create_shm(name, expected_size)
30+
return _force_create_shm(name, expected_size, auto_cleanup)
3031
elif force_mode == "link":
3132
return _force_link_shm(name, expected_size)
3233
else:
3334
with FileLock(lock_name):
34-
return _smart_create_or_link_shm(name, expected_size)
35+
return _smart_create_or_link_shm(name, expected_size, auto_cleanup)
3536

3637

37-
def _force_create_shm(name, expected_size):
38+
def _force_create_shm(name, expected_size, auto_cleanup):
3839
"""强制创建新的共享内存"""
3940
try:
4041
existing_shm = shared_memory.SharedMemory(name=name)
@@ -45,6 +46,8 @@ def _force_create_shm(name, expected_size):
4546

4647
# 创建新的共享内存
4748
shm = shared_memory.SharedMemory(name=name, create=True, size=expected_size)
49+
if auto_cleanup:
50+
register_posix_shm_for_cleanup(name)
4851
return shm
4952

5053

@@ -62,12 +65,12 @@ def _force_link_shm(name, expected_size):
6265
raise e
6366

6467

65-
def _smart_create_or_link_shm(name, expected_size):
68+
def _smart_create_or_link_shm(name, expected_size, auto_cleanup):
6669
"""优先连接,不存在则创建"""
6770
try:
6871
shm = _force_link_shm(name=name, expected_size=expected_size)
6972
return shm
7073
except:
7174
pass
7275

73-
return _force_create_shm(name=name, expected_size=expected_size)
76+
return _force_create_shm(name=name, expected_size=expected_size, auto_cleanup=auto_cleanup)

0 commit comments

Comments
 (0)