11import uvloop
22import asyncio
3+ import collections
34
45asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
56import zmq
89import time
910import threading
1011import concurrent .futures
11- from typing import List
12+ from typing import List , Deque
1213from lightllm .server .core .objs import ShmReqManager , Req , StartArgs
1314from lightllm .server .core .objs .io_objs import GroupReqIndexes
1415from lightllm .utils .graceful_utils import graceful_registry
2122class MultiLevelKVCacheManager :
2223 def __init__ (
2324 self ,
24- args ,
25- detokenization_port ,
26- router_port ,
25+ args : StartArgs ,
2726 ):
2827 self .args : StartArgs = args
2928 context = zmq .Context (2 )
30- self .recv_from_pre_module = context .socket (zmq .PULL )
31- self .recv_from_pre_module .bind (f"{ args .zmq_mode } 127.0.0.1:{ detokenization_port } " )
29+ self .zmq_recv_socket = context .socket (zmq .PULL )
30+ self .zmq_recv_socket .bind (f"{ args .zmq_mode } 127.0.0.1:{ args . multi_level_kv_cache_port } " )
3231
3332 self .send_to_router = context .socket (zmq .PUSH )
34- self .send_to_router .bind (f"{ args .zmq_mode } 127.0.0.1:{ router_port } " )
35- logger .info (f"pub_to_httpserver sendhwm { self .send_to_router .getsockopt (zmq .SNDHWM )} " )
33+ self .send_to_router .bind (f"{ args .zmq_mode } 127.0.0.1:{ args . router_port } " )
34+ logger .info (f"send_to_router sendhwm { self .send_to_router .getsockopt (zmq .SNDHWM )} " )
3635 self .cpu_cache_client = CpuKvCacheClient (init_shm_data = True )
3736 self .shm_req_manager = ShmReqManager ()
3837 # 控制同时进行cpu cache 匹配操作的数量。
@@ -42,7 +41,7 @@ def __init__(
4241 self .cpu_cache_time_out = 0.3
4342 # lock 用于控制对 recv_queue 和 transfer_queue 的访问。
4443 self .queue_lock = threading .Lock ()
45- self .recv_queue : List [GroupReqIndexes ] = []
44+ self .recv_queue : Deque [GroupReqIndexes ] = collections . deque ()
4645 self .transfer_queue : List [GroupReqIndexes ] = []
4746 self .transfer_thread = threading .Thread (target = self .transfer_loop , daemon = True )
4847 self .transfer_thread .start ()
@@ -58,8 +57,7 @@ def cpu_cache_hanle_loop(self):
5857 continue
5958
6059 with self .queue_lock :
61- current_group_req = self .recv_queue [0 ]
62- self .recv_queue = self .recv_queue [1 :]
60+ current_group_req = self .recv_queue .popleft ()
6361
6462 self .executor .submit (self ._handle_group_req_cpu_cache_match , current_group_req , time .time ())
6563 except BaseException as e :
@@ -146,7 +144,7 @@ def recv_loop(self):
146144 try :
147145 # 一次最多从 zmq 中取 recv_max_count 个请求,防止 zmq 队列中请求数量过多导致阻塞了主循环。
148146 for _ in range (recv_max_count ):
149- recv_obj : GroupReqIndexes = self .recv_from_pre_module .recv_pyobj (zmq .NOBLOCK )
147+ recv_obj : GroupReqIndexes = self .zmq_recv_socket .recv_pyobj (zmq .NOBLOCK )
150148 assert isinstance (recv_obj , GroupReqIndexes )
151149 recv_objs .append (recv_obj )
152150
@@ -166,15 +164,13 @@ def recv_loop(self):
166164 return
167165
168166
169- def start_detokenization_process (args , detokenization_port , router_port , pipe_writer ):
167+ def start_multi_level_kv_cache_manager (args , pipe_writer ):
170168 # 注册graceful 退出的处理
171169 graceful_registry (inspect .currentframe ().f_code .co_name )
172170
173171 try :
174172 manager = MultiLevelKVCacheManager (
175173 args = args ,
176- detokenization_port = detokenization_port ,
177- router_port = router_port ,
178174 )
179175 except Exception as e :
180176 pipe_writer .send (str (e ))
0 commit comments