Skip to content

Commit 3cef851

Browse files
authored
[Bug fix] Fix bug for running ep (#4245)
* fix bug for ep * fix bug
1 parent 17e00d9 commit 3cef851

File tree

5 files changed

+54
-18
lines changed

5 files changed

+54
-18
lines changed

fastdeploy/cache_manager/cache_messager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def prefill_layerwise_send_cache_thread(self):
218218
try:
219219
prefilled_step_idx_data = np.zeros(shape=[1], dtype=np.int32)
220220
prefilled_layer_idx_data = np.zeros(shape=[1], dtype=np.int32)
221-
prefilled_layer_name = f"splitwise_complete_prefilled_step_{self.rank_id}.{self.gpu_id}"
221+
prefilled_layer_name = f"splitwise_complete_prefilled_layer_{self.rank_id}.{self.gpu_id}"
222222
prefilled_step_name = f"splitwise_complete_prefilled_step_{self.rank_id}.{self.gpu_id}"
223223
step_shm_value = IPCSignal(
224224
name=f"splitwise_complete_prefilled_step_{self.rank_id}",

fastdeploy/engine/common_engine.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import zmq
3131
from opentelemetry import trace
3232

33-
from fastdeploy.engine.request import Request, RequestOutput
33+
from fastdeploy.engine.request import Request, RequestOutput, RequestType
3434
from fastdeploy.engine.resource_manager import ResourceManager
3535
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
3636
from fastdeploy.inter_communicator import (
@@ -77,6 +77,7 @@ def __init__(self, cfg, start_queue=True):
7777
self.llm_logger = llm_logger
7878

7979
self.scheduler = cfg.scheduler_config.scheduler()
80+
self.enable_decode_cache_task = envs.FD_ENABLE_CACHE_TASK == "1"
8081

8182
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
8283
self.resource_manager = ResourceManagerV1(
@@ -623,7 +624,7 @@ def _fetch_request():
623624
for tmp_task in need_delete_tasks:
624625
tasks.remove(tmp_task)
625626
# release resource in P
626-
self.resource_manager.prerelease_resource(task)
627+
self.resource_manager.prerelease_resource(tmp_task)
627628
if self.cfg.scheduler_config.splitwise_role == "prefill":
628629
# to send cache info to cache messager
629630
if tasks:
@@ -673,6 +674,21 @@ def _fetch_request():
673674
tasks = self.resource_manager.schedule()
674675
# 3. Send to engine
675676
if tasks:
677+
if self.cfg.scheduler_config.splitwise_role == "decode":
678+
for task in tasks:
679+
if task.task_type == RequestType.PREEMPTED:
680+
msg = f"{task.request_id} decode not enough blocks, need to be rescheduled."
681+
self.llm_logger.error(msg)
682+
self.scheduler.put_results(
683+
[
684+
RequestOutput(
685+
request_id=task.request_id,
686+
finished=True,
687+
error_code=500,
688+
error_msg=msg,
689+
)
690+
]
691+
)
676692
self.resource_manager.get_real_bsz()
677693
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
678694
else:

fastdeploy/engine/engine.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,8 @@ def launch_components(self):
651651
role = self.cfg.scheduler_config.splitwise_role
652652
host_ip = self.cfg.host_ip
653653
disaggregate = self.cfg.disaggregate_info
654+
request_queues_for_dp_ipc = None
655+
result_queue_for_dp_ipc = None
654656
if self.cfg.scheduler_config.name == "splitwise":
655657
self.engine.scheduler.start(role, host_ip, disaggregate)
656658
elif self.cfg.scheduler_config.name == "dp":

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,23 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
137137
preempted_req = self.running.pop()
138138
preempted_req.status = RequestStatus.PREEMPTED
139139
preempted_req.num_computed_tokens = 0
140-
self._free_blocks(preempted_req)
141-
preempted_req.cached_block_num = 0
142-
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
140+
if self.config.scheduler_config.splitwise_role == "decode":
141+
self.tasks_list[preempted_req.idx] = None
142+
self.stop_flags[preempted_req.idx] = True
143+
if preempted_req.request_id in self.requests:
144+
del self.requests[preempted_req.request_id]
145+
if preempted_req.request_id in self.req_dict:
146+
del self.req_dict[preempted_req.request_id]
147+
self._free_blocks(preempted_req)
148+
main_process_metrics.num_requests_running.dec(1)
149+
else:
150+
self._free_blocks(preempted_req)
151+
preempted_req.cached_block_num = 0
152+
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
153+
main_process_metrics.num_requests_waiting.inc(1)
154+
main_process_metrics.num_requests_running.dec(1)
143155
preempted_reqs.append(preempted_req)
144156
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
145-
main_process_metrics.num_requests_waiting.inc(1)
146-
main_process_metrics.num_requests_running.dec(1)
147157
if preempted_req == request:
148158
# No more request to preempt.
149159
can_schedule = False
@@ -588,8 +598,10 @@ def prerelease_resource(self, request: Request):
588598
with self.lock:
589599
self.tasks_list[request.idx] = None
590600
self.stop_flags[request.idx] = True
591-
del self.requests[request.request_id]
592-
del self.req_dict[request.request_id]
601+
if request.request_id in self.requests:
602+
del self.requests[request.request_id]
603+
if request.request_id in self.req_dict:
604+
del self.req_dict[request.request_id]
593605
self._free_blocks(request)
594606

595607
def add_request_in_p(self, requests: list[Request]):

fastdeploy/splitwise/splitwise_connector.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -387,14 +387,20 @@ def send_cache_infos(self, tasks, current_id):
387387
f"{tasks[i].disaggregate_info['cache_info']['rdma']['ip']}:"
388388
+ f"{tasks[i].disaggregate_info['cache_info']['rdma']['port']}"
389389
)
390-
cache_info = {
391-
"request_id": tasks[i].request_id,
392-
"device_ids": self.cfg.device_ids.split(","),
393-
"ip": self.cfg.host_ip,
394-
"rdma_ports": self.cfg.disaggregate_info["cache_info"]["rdma"]["rdma_port"],
395-
"transfer_protocol": "rdma",
396-
"dest_block_ids": tasks[i].disaggregate_info["block_tables"],
397-
}
390+
if tasks[i].get("error_msg", None) is not None:
391+
cache_info = {
392+
"request_id": tasks[i].request_id,
393+
"error_msg": tasks[i].get("error_msg"),
394+
}
395+
else:
396+
cache_info = {
397+
"request_id": tasks[i].request_id,
398+
"device_ids": self.cfg.device_ids.split(","),
399+
"ip": self.cfg.host_ip,
400+
"rdma_ports": self.cfg.disaggregate_info["cache_info"]["rdma"]["rdma_port"],
401+
"transfer_protocol": "rdma",
402+
"dest_block_ids": tasks[i].disaggregate_info["block_tables"],
403+
}
398404
if addr not in temp_cache_info:
399405
temp_cache_info[addr] = []
400406

0 commit comments

Comments
 (0)