Skip to content

Commit df1d500

Browse files
committed
fix
1 parent ff7fa9c commit df1d500

File tree

1 file changed

+9
-17
lines changed

1 file changed

+9
-17
lines changed

lightllm/server/multi_level_kv_cache/manager.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import uvloop
22
import asyncio
3-
import collections
43

54
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
65
import zmq
@@ -9,7 +8,7 @@
98
import time
109
import threading
1110
import concurrent.futures
12-
from typing import List, Deque
11+
from queue import Queue
1312
from lightllm.server.core.objs import ShmReqManager, Req, StartArgs
1413
from lightllm.server.core.objs.io_objs import GroupReqIndexes
1514
from lightllm.utils.graceful_utils import graceful_registry
@@ -30,7 +29,7 @@ def __init__(
3029
self.zmq_recv_socket.bind(f"{args.zmq_mode}127.0.0.1:{args.multi_level_kv_cache_port}")
3130

3231
self.send_to_router = context.socket(zmq.PUSH)
33-
self.send_to_router.bind(f"{args.zmq_mode}127.0.0.1:{args.router_port}")
32+
self.send_to_router.connect(f"{args.zmq_mode}127.0.0.1:{args.router_port}")
3433
logger.info(f"send_to_router sendhwm {self.send_to_router.getsockopt(zmq.SNDHWM)}")
3534
self.cpu_cache_client = CpuKvCacheClient(init_shm_data=True)
3635
self.shm_req_manager = ShmReqManager()
@@ -39,22 +38,15 @@ def __init__(
3938
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=6)
4039
# 控制 cpu cache time out的时间,如果超过这个时间无法获取信号量则直接转发。
4140
self.cpu_cache_time_out = 0.3
42-
# lock 用于控制对 recv_queue 和 transfer_queue 的访问。
43-
self.queue_lock = threading.Lock()
44-
self.recv_queue: Deque[GroupReqIndexes] = collections.deque()
41+
self.recv_queue = Queue(maxsize=1024)
4542
self.cpu_cache_thread = threading.Thread(target=self.cpu_cache_hanle_loop, daemon=True)
4643
self.cpu_cache_thread.start()
4744
return
4845

4946
def cpu_cache_hanle_loop(self):
5047
while True:
5148
try:
52-
if len(self.recv_queue) == 0:
53-
time.sleep(0.003)
54-
continue
55-
56-
with self.queue_lock:
57-
current_group_req = self.recv_queue.popleft()
49+
current_group_req = self.recv_queue.get()
5850

5951
self.executor.submit(self._handle_group_req_cpu_cache_match, current_group_req, time.time())
6052
except BaseException as e:
@@ -75,8 +67,7 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
7567

7668
if self.semaphore.acquire(blocking=False):
7769
break
78-
else:
79-
time.sleep(0.005)
70+
time.sleep(0.005)
8071

8172
reqs_shm_index = group_req_indexes.shm_req_indexes
8273
reqs = [self.shm_req_manager.get_req_obj_by_index(index) for index in reqs_shm_index]
@@ -141,10 +132,11 @@ def recv_loop(self):
141132
# 当队列已经开始清空的时候,将一次接受的数量下调
142133
recv_max_count = 128
143134

144-
with self.queue_lock:
145-
self.recv_queue.extend(recv_objs)
135+
for recv_obj in recv_objs:
136+
self.recv_queue.put(recv_obj)
146137

147-
time.sleep(0.003)
138+
if len(recv_objs) == 0:
139+
time.sleep(0.01)
148140

149141
except Exception as e:
150142
logger.exception(f"detoken process has exception {str(e)}")

0 commit comments

Comments
 (0)