Skip to content

Commit bf7ae96

Browse files
authored
[FIX]add shm lock (#905)
1 parent 4948cea commit bf7ae96

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

lightllm/server/core/objs/atomic_array_lock.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import atomics
23
from multiprocessing import shared_memory
34
from lightllm.utils.log_utils import init_logger
@@ -41,18 +42,40 @@ class AtomicLockItem:
4142
def __init__(self, context: AtomicShmArrayLock, index: int):
4243
self.context = context
4344
self.index = index
45+
self._buf = context.shm.buf[index * 4 : (index + 1) * 4]
46+
47+
def try_acquire(self) -> bool:
48+
with atomics.atomicview(self._buf, atype=atomics.INT) as a:
49+
return a.cmpxchg_weak(0, 1)
50+
51+
def release(self):
52+
with atomics.atomicview(self._buf, atype=atomics.INT) as a:
53+
a.store(0)
4454

4555
def __enter__(self):
46-
with atomics.atomicview(
47-
buffer=self.context.shm.buf[self.index * 4 : (self.index + 1) * 4], atype=atomics.INT
48-
) as a:
56+
with atomics.atomicview(buffer=self._buf, atype=atomics.INT) as a:
4957
while not a.cmpxchg_weak(0, 1):
5058
pass
5159

5260
def __exit__(self, exc_type, exc_val, exc_tb):
53-
with atomics.atomicview(
54-
buffer=self.context.shm.buf[self.index * 4 : (self.index + 1) * 4], atype=atomics.INT
55-
) as a:
61+
with atomics.atomicview(buffer=self._buf, atype=atomics.INT) as a:
5662
while not a.cmpxchg_weak(1, 0):
5763
pass
5864
return False
65+
66+
67+
class AsyncLock:
68+
def __init__(self, lock_item, base_delay=0.01):
69+
self._item = lock_item
70+
self._base = base_delay
71+
72+
async def __aenter__(self):
73+
delay = self._base
74+
while True:
75+
if self._item.try_acquire(): # 尝试拿锁;成功立即返回
76+
return
77+
await asyncio.sleep(delay)
78+
79+
async def __aexit__(self, exc_t, exc, tb):
80+
self._item.release()
81+
return False

lightllm/server/httpserver/manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from lightllm.server.core.objs import SamplingParams
2525
from lightllm.server.core.objs.io_objs import GroupReqObjs
2626
from lightllm.server.core.objs.shm_req_manager import ShmReqManager
27+
from lightllm.server.core.objs.atomic_array_lock import AtomicShmArrayLock, AsyncLock, AtomicLockItem
2728
from lightllm.server.router.dynamic_prompt.shared_arr import SharedInt
2829
from lightllm.utils.log_utils import init_logger
2930
from lightllm.server.metrics.manager import MetricClient
@@ -52,7 +53,8 @@ def __init__(
5253

5354
self.multinode_req_manager = None
5455
self.nnodes = args.nnodes
55-
self._resource_lock = asyncio.Lock()
56+
self._shm_lock_pool = AtomicShmArrayLock("lightllm_resource_lock", 1)
57+
self._resource_lock = AsyncLock(self._shm_lock_pool.get_lock_context(0))
5658
self.node_rank = args.node_rank
5759
self.transfer_lock = asyncio.Lock() # the lock for transfer to next module in multi node mode.
5860
self.disable_abort = args.nnodes > 1 and args.dp == 1 # mulitnode dp=1 mode, disable abort

0 commit comments

Comments
 (0)