@@ -33,11 +33,9 @@ def __init__(
3333 logger .info (f"send_to_router sendhwm { self .send_to_router .getsockopt (zmq .SNDHWM )} " )
3434 self .cpu_cache_client = CpuKvCacheClient (init_shm_data = True )
3535 self .shm_req_manager = ShmReqManager ()
36- # 控制同时进行cpu cache 匹配操作的数量。
37- self .semaphore = threading .Semaphore (3 )
3836 self .executor = concurrent .futures .ThreadPoolExecutor (max_workers = 6 )
39- # 控制 cpu cache time out的时间,如果超过这个时间无法获取信号量则直接转发 。
40- self .cpu_cache_time_out = 0.3
37+ # 控制进行 cpu cache 页面匹配的时间,超过时间则不再匹配,直接转发 。
38+ self .cpu_cache_time_out = 0.5
4139 self .recv_queue = Queue (maxsize = 1024 )
4240 self .cpu_cache_thread = threading .Thread (target = self .cpu_cache_hanle_loop , daemon = True )
4341 self .cpu_cache_thread .start ()
@@ -57,17 +55,14 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
5755 """
5856 match cpu cache pages
5957 """
60- # 进行超时判定,如果太长时间拿不到信号量,则说明匹配任务繁忙,
61- # 放弃进行 cpu cache page 的匹配。
62- while True :
63- current_time = time .time ()
64- if current_time - start_time >= self .cpu_cache_time_out :
65- self .send_to_router .send_pyobj (group_req_indexes , protocol = pickle .HIGHEST_PROTOCOL )
66- return
67-
68- if self .semaphore .acquire (blocking = False ):
69- break
70- time .sleep (0.005 )
58+ # 超时时,放弃进行 cpu cache page 的匹配。
59+ current_time = time .time ()
60+ if current_time - start_time >= self .cpu_cache_time_out :
61+ self .send_to_router .send_pyobj (group_req_indexes , protocol = pickle .HIGHEST_PROTOCOL )
62+ logger .warning (
63+ f"cpu cache match time out { current_time - start_time } s, group_req_id: { group_req_indexes .group_req_id } "
64+ )
65+ return
7166
7267 reqs_shm_index = group_req_indexes .shm_req_indexes
7368 reqs = [self .shm_req_manager .get_req_obj_by_index (index ) for index in reqs_shm_index ]
@@ -77,6 +72,7 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
7772 # diverse_mode 只有主请求一个初始化 cpu cache 信息。
7873 if self .args .diverse_mode and req .request_id != req .group_req_id :
7974 continue
75+
8076 if req .is_aborted :
8177 continue
8278
@@ -92,7 +88,7 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
9288 break
9389 self .cpu_cache_client .lock .release ()
9490
95- # 等待所有的cpu cache 页面ready
91+ # 等待所有的 cpu cache 页面ready
9692 while not self .cpu_cache_client .check_allpages_ready (finded_page_indexes ):
9793 time .sleep (0.01 )
9894
@@ -101,9 +97,6 @@ def _handle_group_req_cpu_cache_match(self, group_req_indexes: GroupReqIndexes,
10197 for req in reqs :
10298 self .shm_req_manager .put_back_req_obj (req )
10399
104- # 释放信号量
105- self .semaphore .release ()
106-
107100 self .send_to_router .send_pyobj (group_req_indexes , protocol = pickle .HIGHEST_PROTOCOL )
108101 return
109102
0 commit comments