@@ -85,10 +85,14 @@ def run(self) -> None:
8585 payload_groups = self ._gather_offload_payloads ()
8686 if not payload_groups :
8787 continue
88+ tasks_and_pages = []
8889 for payloads in payload_groups :
8990 if not payloads :
9091 continue
91- self ._persist_pages_to_disk (payloads )
92+ result = self ._persist_pages_to_disk (payloads )
93+ if result is not None :
94+ tasks_and_pages .append (result )
95+ self ._wait_and_update_pages (tasks_and_pages )
9296
9397 def _gather_offload_payloads (self ) -> List [List [_PagePayload ]]:
9498 self .cpu_cache_client .lock .acquire_sleep1ms ()
@@ -111,13 +115,13 @@ def _gather_offload_payloads(self) -> List[List[_PagePayload]]:
111115 return payload_groups
112116
113117 # 数据写入磁盘
114- def _persist_pages_to_disk (self , payloads : List [_PagePayload ]) -> None :
118+ def _persist_pages_to_disk (self , payloads : List [_PagePayload ]):
115119 if not payloads :
116- return
120+ return None
117121 page_indexes = [payload .index for payload in payloads ]
118122 tokens = [payload .hash_key for payload in payloads ]
119123 if not page_indexes :
120- return
124+ return None
121125
122126 kv_indexer = torch .tensor (page_indexes , dtype = torch .int32 , device = "cpu" )
123127 query_result = self .service .query (tokens )
@@ -129,13 +133,16 @@ def _persist_pages_to_disk(self, payloads: List[_PagePayload]) -> None:
129133 time .sleep (0.001 )
130134
131135 task = self .service .create (tokens = tokens , kv_page_indexer = kv_indexer , mode = "w" )
132- # 数据安全即可结束等待,无需写入完成
136+ return (task , page_indexes )
137+ return None
138+
139+ def _wait_and_update_pages (self , tasks_and_pages : List ) -> None :
140+ for task , page_indexes in tasks_and_pages :
133141 while not task .data_safe ():
134142 time .sleep (0.001 )
135-
136- self .cpu_cache_client .lock .acquire_sleep1ms ()
137- self .cpu_cache_client .update_pages_status_to_ready_recycle (page_list = page_indexes , deref = True )
138- self .cpu_cache_client .lock .release ()
143+ self .cpu_cache_client .lock .acquire_sleep1ms ()
144+ self .cpu_cache_client .update_pages_status_to_ready_recycle (page_list = page_indexes , deref = True )
145+ self .cpu_cache_client .lock .release ()
139146
140147 def query_loadable_pages (self , tokens : List [int ], start_pos : int ) -> int :
141148 """
0 commit comments