Skip to content

Commit 83720da

Browse files
authored
[Feature] support clear data (#3601)
* [Feature] support clear data * update * fix * fix * fix * fix * fix * fix * fix
1 parent 772f015 commit 83720da

File tree

12 files changed

+85
-1
lines changed

12 files changed

+85
-1
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,18 @@ def start_cache_service(self, device_ids, ipc_signal_suffix):
934934
def check_and_free_block_tables(self):
935935
self.resource_manager.check_and_free_block_tables()
936936

937+
def clear_data(self):
938+
try:
939+
llm_logger.info("Clear Data: Start")
940+
self.token_processor.clear_data()
941+
self.engine_worker_queue.clear_data()
942+
self.zmq_server.req_dict.clear()
943+
llm_logger.info("Clear Data: Successfully")
944+
return True
945+
except Exception as e:
946+
llm_logger.error(f"Clear data error: {e}")
947+
return False
948+
937949
def _exit_sub_services(self):
938950
"""
939951
exit sub services

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,3 +752,7 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
752752
del self.req_dict[req_id]
753753
except Exception as e:
754754
llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}")
755+
756+
def clear_data(self):
757+
self.waiting: deque[Request] = deque()
758+
self.to_be_rescheduled_request_id_set = set()

fastdeploy/entrypoints/engine_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,3 +346,6 @@ def clear_load_weight(self, timeout=300):
346346
return False, "clear model weight timeout"
347347
time.sleep(1)
348348
return True, ""
349+
350+
def check_model_weight_status(self):
351+
return self.model_weights_status_signal.value[0] < 0

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ def reset_scheduler():
495495

496496
if llm_engine is None:
497497
return Response("Engine not loaded", status_code=500)
498+
499+
llm_engine.engine.clear_data()
498500
llm_engine.engine.scheduler.reset()
499501
return Response("Scheduler Reset Successfully", status_code=200)
500502

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ async def chat_completion_stream_generator(
220220
decoder_base_url=self.tokenizer_base_url,
221221
)
222222
while num_choices > 0:
223+
if self.engine_client.check_model_weight_status():
224+
raise ValueError("Engine is clearing model weight")
223225
try:
224226
response = await asyncio.wait_for(response_queue.get(), timeout=10)
225227
current_waiting_time = 0
@@ -435,6 +437,14 @@ async def chat_completion_full_generator(
435437
decoder_base_url=self.tokenizer_base_url,
436438
)
437439
while True:
440+
if self.engine_client.check_model_weight_status():
441+
return ErrorResponse(
442+
error=ErrorInfo(
443+
message="Model weight cleared",
444+
code=ErrorCode.INVALID_VALUE,
445+
type=ErrorType.INVALID_REQUEST_ERROR,
446+
)
447+
)
438448
try:
439449
response = await asyncio.wait_for(response_queue.get(), timeout=10)
440450
current_waiting_time = 0
@@ -523,6 +533,7 @@ async def chat_completion_full_generator(
523533

524534
if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]:
525535
choice.finish_reason = "recover_stop"
536+
526537
choices.append(choice)
527538

528539
num_prompt_tokens = len(prompt_token_ids)

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,14 @@ async def completion_full_generator(
227227
completion_batched_token_ids = [[] for _ in range(num_choices)]
228228
current_waiting_time = 0
229229
while num_choices > 0:
230+
if self.engine_client.check_model_weight_status():
231+
return ErrorResponse(
232+
error=ErrorInfo(
233+
message="Model weight cleared",
234+
code=ErrorCode.INVALID_VALUE,
235+
type=ErrorType.INVALID_REQUEST_ERROR,
236+
)
237+
)
230238
try:
231239
response = await asyncio.wait_for(response_queue.get(), timeout=10)
232240
current_waiting_time = 0
@@ -281,7 +289,6 @@ async def completion_full_generator(
281289
return res
282290
except Exception as e:
283291
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
284-
raise
285292
finally:
286293
self.engine_client.semaphore.release()
287294
if dealer is not None:
@@ -360,6 +367,8 @@ async def completion_stream_generator(
360367
)
361368
current_waiting_time = 0
362369
while num_choices > 0:
370+
if self.engine_client.check_model_weight_status():
371+
raise ValueError("Engine is clearing model weight")
363372
try:
364373
response = await asyncio.wait_for(response_queue.get(), timeout=10)
365374
current_waiting_time = 0
@@ -447,6 +456,7 @@ async def completion_stream_generator(
447456
choices[-1].finish_reason = self.calc_finish_reason(
448457
request.max_tokens, output_tokens[idx], output, tool_called[idx]
449458
)
459+
450460
send_idx = output.get("send_idx")
451461
# 只有当 send_idx 明确为 0 时才记录日志
452462
if send_idx == 0 and not request.return_token_ids:

fastdeploy/inter_communicator/engine_worker_queue.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,13 @@ def get_disaggregated_tasks(self):
503503
llm_logger.debug("get tasks from queue success")
504504
return item
505505

506+
def clear_data(self):
507+
self.lock.acquire()
508+
self.tasks[:] = list()
509+
self.client_read_flag[:] = [1] * self.num_client
510+
self.lock.release()
511+
llm_logger.info("clear data for engine worker queue")
512+
506513
def cleanup(self):
507514
"""
508515
Exit the worker queue gracefully.

fastdeploy/output/token_processor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,31 @@ def _record_speculative_decoding_mertics(self, accept_num):
677677
single_head_acceptance_rate
678678
)
679679

680+
def clear_data(self):
681+
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
682+
self.resource_manager.clear_data()
683+
for i in range(self.cfg.max_num_seqs):
684+
if self.resource_manager.stop_flags[i]:
685+
continue
686+
task = self.resource_manager.tasks_list[i]
687+
result = RequestOutput(
688+
request_id=task.request_id,
689+
outputs=CompletionOutput(
690+
index=i,
691+
send_idx=self.tokens_counter[task.request_id],
692+
token_ids=task.eos_token_ids,
693+
draft_token_ids=[],
694+
),
695+
finished=True,
696+
metrics=RequestMetrics(
697+
arrival_time=time.time(),
698+
request_start_time=task.arrival_time,
699+
),
700+
)
701+
is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill"
702+
self._recycle_resources(task.request_id, i, task, result, is_prefill)
703+
llm_logger.warning(f"clear data for task {task.request_id}")
704+
680705

681706
class WarmUpTokenProcessor(TokenProcessor):
682707
"""

fastdeploy/rl/dynamic_weight_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def check_model_weights_status(model_weights_status, model_runner, pid):
259259
model_runner.update_parameters(pid)
260260
elif model_weights_status.value[0] == -1:
261261
logger.info("infer engine stopped! start to clear checkpoint...")
262+
model_runner.clear_requests()
262263
model_runner.clear_parameters(pid)
263264

264265
while True:

fastdeploy/worker/gcu_model_runner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,10 @@ def clear_parameters(self, pid):
12321232
paddle.device.cuda.empty_cache()
12331233
self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory")
12341234

1235+
def clear_requests(self):
1236+
"""Dynamic model loader use to clear requests use for RL"""
1237+
self.share_inputs["stop_flags"][:] = True
1238+
12351239
def update_parameters(self, pid):
12361240
""" " Dynamic model loader use to update parameters use for RL"""
12371241
self.dynamic_weight_manager.update_parameters(pid)

0 commit comments

Comments
 (0)