Skip to content

Commit 2711e8b

Browse files
author
liujiacheng
committed
fix
1 parent 151333f commit 2711e8b

File tree

1 file changed

+2
-23
lines changed

1 file changed

+2
-23
lines changed

lightllm/server/multi_level_kv_cache/manager.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ def __init__(
4242
# lock 用于控制对 recv_queue 和 transfer_queue 的访问。
4343
self.queue_lock = threading.Lock()
4444
self.recv_queue: Deque[GroupReqIndexes] = collections.deque()
45-
self.transfer_queue: List[GroupReqIndexes] = []
46-
self.transfer_thread = threading.Thread(target=self.transfer_loop, daemon=True)
47-
self.transfer_thread.start()
4845
self.cpu_cache_thread = threading.Thread(target=self.cpu_cache_hanle_loop, daemon=True)
4946
self.cpu_cache_thread.start()
5047
return
@@ -73,8 +70,7 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
7370
while True:
7471
current_time = time.time()
7572
if current_time - start_time >= self.cpu_cache_time_out:
76-
with self.queue_lock:
77-
self.transfer_queue.append(group_req_indexes)
73+
self.send_to_router.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
7874
return
7975

8076
if self.semaphore.acquire(blocking=False):
@@ -84,7 +80,6 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
8480

8581
reqs_shm_index = group_req_indexes.shm_req_indexes
8682
reqs = [self.shm_req_manager.get_req_obj_by_index(index) for index in reqs_shm_index]
87-
req: Req = reqs[0]
8883

8984
# 对每个请求进行cpu cache page 的匹配操作。
9085
for req in reqs:
@@ -118,23 +113,7 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
118113
# 释放信号量
119114
self.semaphore.release()
120115

121-
# 将请求放入转发队列
122-
with self.queue_lock:
123-
self.transfer_queue.append(group_req_indexes)
124-
return
125-
126-
def transfer_loop(self):
127-
while True:
128-
try:
129-
if len(self.transfer_queue) != 0:
130-
with self.queue_lock:
131-
for e in self.transfer_queue:
132-
self.send_to_router.send_pyobj(e, protocol=pickle.HIGHEST_PROTOCOL)
133-
self.transfer_queue.clear()
134-
else:
135-
time.sleep(0.005)
136-
except BaseException as e:
137-
logger.exception(str(e))
116+
self.send_to_router.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
138117
return
139118

140119
def recv_loop(self):

0 commit comments

Comments
 (0)