@@ -96,50 +96,46 @@ def offload_finished_reqs_to_cpu_cache(self, finished_reqs: List[InferReq]) -> L
9696 """
9797 将满足cpu kv cache 卸载条件的请求进行处理, 并返回真的满足退出条件的请求list。
9898 """
99+ # 如果开启了cpu cache,将达到finished状态的请求开启将gpu kv cache 卸载到 cpu cache中的操作。
100+ # 当 kv cache 卸载完成后,才会进行请求的真实退出操作。
101+ true_finished_reqs = []
102+ cpu_stream = g_infer_context .get_cpu_kv_cache_stream ()
103+ for req in finished_reqs :
104+ # 只有 group_req_id 和 request_id 相同的请求才会被卸载到 cpu cache 中。
105+ # 这个限制是为了兼容 diverse 模式下的请求处理。
106+ if req .shm_req .group_req_id != req .shm_req .request_id :
107+ true_finished_reqs .append (req )
108+ continue
109+
110+ # 过滤不适合进行 kv 卸载到 cpu cache 的请求。
111+ if req .cur_kv_len < self .args .cpu_cache_token_page_size :
112+ true_finished_reqs .append (req )
113+ continue
114+
115+ # 如果请求已经完成了 cpu cache 的任务,则满足了退出条件
116+ if req .cpu_cache_task_status .is_finished ():
117+ true_finished_reqs .append (req )
118+ elif req .cpu_cache_task_status .is_running ():
119+ # 如果请求已经发起过卸载任务,则在当前轮不进行处理
120+ continue
121+ else :
122+ assert req .cpu_cache_task_status .is_not_started ()
123+ # 必须等待 overlap stream 上的计算任务完成,不然会崩溃
124+ if g_infer_context .overlap_stream is not None :
125+ cpu_stream .wait_stream (g_infer_context .overlap_stream )
126+ else :
127+ cpu_stream .wait_stream (torch .cuda .current_stream ())
99128
100- if self .args .enable_cpu_cache :
101- # 如果开启了cpu cache,将达到finished状态的请求开启将gpu kv cache 卸载到 cpu cache中的操作。
102- # 当 kv cache 卸载完成后,才会进行请求的真实退出操作。
103- true_finished_reqs = []
104- cpu_stream = g_infer_context .get_cpu_kv_cache_stream ()
105- for req in finished_reqs :
106- # 只有 group_req_id 和 request_id 相同的请求才会被卸载到 cpu cache 中。
107- # 这个限制是为了兼容 diverse 模式下的请求处理。
108- if req .shm_req .group_req_id != req .shm_req .request_id :
109- true_finished_reqs .append (req )
110- continue
111-
112- # 过滤不适合进行 kv 卸载到 cpu cache 的请求。
113- if req .cur_kv_len < self .args .cpu_cache_token_page_size :
129+ # 发起将请求的 kv cache 卸载到 cpu cache 中的任务
130+ trans_task = self ._start_kv_cache_offload_task (req = req , cpu_kv_cache_stream = cpu_stream )
131+ if trans_task is not None :
132+ self .cpu_cache_handle_queue .append (trans_task )
133+ else :
114134 true_finished_reqs .append (req )
115- continue
116135
117- # 如果请求已经完成了 cpu cache 的任务,则满足了退出条件
118- if req .cpu_cache_task_status .is_finished ():
119- true_finished_reqs .append (req )
120- elif req .cpu_cache_task_status .is_running ():
121- # 如果请求已经发起过卸载任务,则在当前轮不进行处理
122- continue
123- else :
124- assert req .cpu_cache_task_status .is_not_started ()
125- # 必须等待 overlap stream 上的计算任务完成,不然会崩溃
126- if g_infer_context .overlap_stream is not None :
127- cpu_stream .wait_stream (g_infer_context .overlap_stream )
128- else :
129- cpu_stream .wait_stream (torch .cuda .current_stream ())
130-
131- # 发起将请求的 kv cache 卸载到 cpu cache 中的任务
132- trans_task = self ._start_kv_cache_offload_task (req = req , cpu_kv_cache_stream = cpu_stream )
133- if trans_task is not None :
134- self .cpu_cache_handle_queue .append (trans_task )
135- else :
136- true_finished_reqs .append (req )
137-
138- # 必须在这里同步,不然会崩溃
139- cpu_stream .synchronize ()
140- return true_finished_reqs
141- else :
142- return finished_reqs
136+ # 必须在这里同步,不然会崩溃
137+ cpu_stream .synchronize ()
138+ return true_finished_reqs
143139
144140 def _start_kv_cache_offload_task (
145141 self , req : InferReq , cpu_kv_cache_stream : torch .cuda .Stream
0 commit comments