Skip to content

Commit f75697c

Browse files
authored
[Feature] support clear data (#4185)
* fix * fix * fix * [Feature] support clear data * update * fix * fix * fix * fix
1 parent 1e86418 commit f75697c

File tree

11 files changed

+70
-1
lines changed

11 files changed

+70
-1
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,18 @@ def start_cache_service(self, device_ids, ipc_signal_suffix):
751751
def check_and_free_block_tables(self):
752752
self.resource_manager.check_and_free_block_tables()
753753

754+
def clear_data(self):
755+
try:
756+
llm_logger.info("Clear Data: Start")
757+
self.token_processor.clear_data()
758+
self.engine_worker_queue.clear_data()
759+
self.zmq_server.req_dict.clear()
760+
llm_logger.info("Clear Data: Successfully")
761+
return True
762+
except Exception as e:
763+
llm_logger.error(f"Clear data error: {e}")
764+
return False
765+
754766
def _exit_sub_services(self):
755767
"""
756768
exit sub services

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,3 +548,7 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
548548
del self.requests[req_id]
549549
except Exception as e:
550550
llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}")
551+
552+
def clear_data(self):
553+
self.waiting: deque[Request] = deque()
554+
self.to_be_rescheduled_request_id_set = set()

fastdeploy/entrypoints/engine_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,6 @@ def clear_load_weight(self, timeout=300):
359359
return False, "clear model weight timeout"
360360
time.sleep(1)
361361
return True, ""
362+
363+
def check_model_weight_status(self):
364+
return self.model_weights_status_signal.value[0] < 0

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ def reset_scheduler():
478478

479479
if llm_engine is None:
480480
return Response("Engine not loaded", status_code=500)
481+
llm_engine.engine.clear_data()
481482
llm_engine.engine.scheduler.reset()
482483
return Response("Scheduler Reset Successfully", status_code=200)
483484

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ async def chat_completion_stream_generator(
210210
decoder_base_url=self.tokenizer_base_url,
211211
)
212212
while num_choices > 0:
213+
if self.engine_client.check_model_weight_status():
214+
raise ValueError("Engine is clearing model weight")
213215
try:
214216
response = await asyncio.wait_for(response_queue.get(), timeout=10)
215217
current_waiting_time = 0
@@ -425,6 +427,8 @@ async def chat_completion_full_generator(
425427
decoder_base_url=self.tokenizer_base_url,
426428
)
427429
while True:
430+
if self.engine_client.check_model_weight_status():
431+
return ErrorResponse(code=400, message="Model weight cleared")
428432
try:
429433
response = await asyncio.wait_for(response_queue.get(), timeout=10)
430434
current_waiting_time = 0
@@ -513,6 +517,7 @@ async def chat_completion_full_generator(
513517

514518
if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]:
515519
choice.finish_reason = "recover_stop"
520+
516521
choices.append(choice)
517522

518523
num_prompt_tokens = len(prompt_token_ids)

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ async def completion_full_generator(
216216
completion_batched_token_ids = [[] for _ in range(num_choices)]
217217
current_waiting_time = 0
218218
while num_choices > 0:
219+
if self.engine_client.check_model_weight_status():
220+
return ErrorResponse(message="Model weight cleared", code=400)
219221
try:
220222
response = await asyncio.wait_for(response_queue.get(), timeout=10)
221223
current_waiting_time = 0
@@ -270,7 +272,6 @@ async def completion_full_generator(
270272
return res
271273
except Exception as e:
272274
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
273-
raise
274275
finally:
275276
self.engine_client.semaphore.release()
276277
if dealer is not None:
@@ -333,6 +334,8 @@ async def completion_stream_generator(
333334
)
334335
current_waiting_time = 0
335336
while num_choices > 0:
337+
if self.engine_client.check_model_weight_status():
338+
raise ValueError("Engine is clearing model weight")
336339
try:
337340
response = await asyncio.wait_for(response_queue.get(), timeout=10)
338341
current_waiting_time = 0

fastdeploy/inter_communicator/engine_worker_queue.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,13 @@ def get_disaggregated_tasks(self):
392392
llm_logger.debug("get tasks from queue success")
393393
return item
394394

395+
def clear_data(self):
396+
self.lock.acquire()
397+
self.tasks[:] = list()
398+
self.client_read_flag[:] = [1] * self.num_client
399+
self.lock.release()
400+
llm_logger.info("clear data for engine worker queue")
401+
395402
def cleanup(self):
396403
"""
397404
Exit the worker queue gracefully.

fastdeploy/output/token_processor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,31 @@ def _record_speculative_decoding_mertics(self, accept_num):
516516
single_head_acceptance_rate
517517
)
518518

519+
def clear_data(self):
520+
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
521+
self.resource_manager.clear_data()
522+
for i in range(self.cfg.max_num_seqs):
523+
if self.resource_manager.stop_flags[i]:
524+
continue
525+
task = self.resource_manager.tasks_list[i]
526+
result = RequestOutput(
527+
request_id=task.request_id,
528+
outputs=CompletionOutput(
529+
index=i,
530+
send_idx=self.tokens_counter[task.request_id],
531+
token_ids=task.eos_token_ids,
532+
draft_token_ids=[],
533+
),
534+
finished=True,
535+
metrics=RequestMetrics(
536+
arrival_time=time.time(),
537+
request_start_time=task.arrival_time,
538+
),
539+
)
540+
is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill"
541+
self._recycle_resources(task.request_id, i, task, result, is_prefill)
542+
llm_logger.warning(f"clear data for task {task.request_id}")
543+
519544

520545
class WarmUpTokenProcessor(TokenProcessor):
521546
"""

fastdeploy/rl/dynamic_weight_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def check_model_weights_status(model_weights_status, model_runner, pid):
256256
model_runner.update_parameters(pid)
257257
elif model_weights_status.value[0] == -1:
258258
logger.info("infer engine stopped! start to clear checkpoint...")
259+
model_runner.clear_requests()
259260
model_runner.clear_parameters(pid)
260261

261262
while True:

fastdeploy/worker/gcu_model_runner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,10 @@ def clear_parameters(self, pid):
11991199
paddle.device.cuda.empty_cache()
12001200
self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory")
12011201

1202+
def clear_requests(self):
1203+
"""Dynamic model loader use to clear requests use for RL"""
1204+
self.share_inputs["stop_flags"][:] = True
1205+
12021206
def update_parameters(self, pid):
12031207
""" " Dynamic model loader use to update parameters use for RL"""
12041208
self.dynamic_weight_manager.update_parameters(pid)

0 commit comments

Comments
 (0)