@@ -102,39 +102,47 @@ def offload_finished_reqs_to_cpu_cache(self, finished_reqs: List[InferReq]) -> L
102102 cpu_stream = g_infer_context .get_cpu_kv_cache_stream ()
103103 for req in finished_reqs :
104104 # 只有 group_req_id 和 request_id 相同的请求才会被卸载到 cpu cache 中。
105- # 这个限制是为了兼容 diverse 模式下的请求处理。
105+ # 这个限制是为了兼容 diverse 模式下的请求处理, 只有主请求才 offload kv 到 cpu
106+ # cache 中
106107 if req .shm_req .group_req_id != req .shm_req .request_id :
107108 true_finished_reqs .append (req )
108109 continue
109110
110111 # 过滤不适合进行 kv 卸载到 cpu cache 的请求。
111- if req .cur_kv_len < self .args .cpu_cache_token_page_size :
112+ if (
113+ req .cur_kv_len < self .args .cpu_cache_token_page_size
114+ or req .shm_req .input_len <= self .args .cpu_cache_token_page_size
115+ ):
112116 true_finished_reqs .append (req )
113117 continue
114118
115119 # 如果请求已经完成了 cpu cache 的任务,则满足了退出条件
116120 if req .cpu_cache_task_status .is_finished ():
117121 true_finished_reqs .append (req )
118- elif req .cpu_cache_task_status .is_running ():
119- # 如果请求已经发起过卸载任务,则在当前轮不进行处理
120122 continue
121- else :
122- assert req .cpu_cache_task_status .is_not_started ()
123- # 必须等待 overlap stream 上的计算任务完成,不然会崩溃
124- if g_infer_context .overlap_stream is not None :
125- cpu_stream .wait_stream (g_infer_context .overlap_stream )
126- else :
127- cpu_stream .wait_stream (torch .cuda .current_stream ())
128123
129- # 发起将请求的 kv cache 卸载到 cpu cache 中的任务
130- trans_task = self ._start_kv_cache_offload_task (req = req , cpu_kv_cache_stream = cpu_stream )
131- if trans_task is not None :
132- self .cpu_cache_handle_queue .append (trans_task )
133- else :
134- true_finished_reqs .append (req )
124+ # 如果请求已经发起过卸载任务且正在卸载过程中,则在当前轮不进行处理
125+ if req .cpu_cache_task_status .is_running ():
126+ continue
127+
128+ assert req .cpu_cache_task_status .is_not_started ()
129+ # # 必须等待 overlap stream 上的计算任务完成,不然会崩溃
130+ # if g_infer_context.overlap_stream is not None:
131+ # cpu_stream.wait_stream(g_infer_context.overlap_stream)
132+ # else:
133+ # cpu_stream.wait_stream(torch.cuda.current_stream())
134+
135+ # 发起将请求的 kv cache 卸载到 cpu cache 中的任务
136+ trans_task = self ._start_kv_cache_offload_task (req = req , cpu_kv_cache_stream = cpu_stream )
137+
138+ # 根据是否成功创建了卸载任务,决定是否将请求加入到处理队列中
139+ if trans_task is not None :
140+ self .cpu_cache_handle_queue .append (trans_task )
141+ else :
142+ true_finished_reqs .append (req )
135143
136144 # 必须在这里同步,不然会崩溃
137- cpu_stream .synchronize ()
145+ # cpu_stream.synchronize()
138146 return true_finished_reqs
139147
140148 def _start_kv_cache_offload_task (
0 commit comments