Skip to content

Commit 10768a4

Browse files
authored
[NewFeture]add ep rollout model init and update/clear ep buffer (#3927)
* add ep rollout model init && add deep update/clear * fix test
1 parent c64ceac commit 10768a4

13 files changed

+359
-299
lines changed

fastdeploy/config.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,12 @@ def __init__(
337337
else:
338338
self.pd_disaggregation_mode = "None"
339339

340-
def set_tp_group(self):
340+
def set_communicate_group(self):
341341
# different tp group id
342342
# prevent different tp_groups using the same group_id
343343
tp_gid_offset = envs.FD_TP_GROUP_GID_OFFSET
344344
dist.collective._set_custom_gid(self.data_parallel_rank + tp_gid_offset)
345+
345346
self.tp_group = dist.new_group(
346347
range(
347348
self.data_parallel_rank * self.tensor_parallel_size,
@@ -350,8 +351,11 @@ def set_tp_group(self):
350351
)
351352
dist.collective._set_custom_gid(None)
352353
# same ep group id
353-
dist.collective._set_custom_gid(self.data_parallel_size + tp_gid_offset)
354-
self.ep_group = dist.new_group(range(self.expert_parallel_size))
354+
if self.enable_expert_parallel:
355+
dist.collective._set_custom_gid(self.data_parallel_size + tp_gid_offset)
356+
self.ep_group = dist.new_group(range(self.expert_parallel_size))
357+
dist.collective._set_custom_gid(None)
358+
355359
logger.info(
356360
f"data_parallel_size: {self.data_parallel_size}, tensor_parallel_size: {self.tensor_parallel_size}, expert_parallel_size: {self.expert_parallel_size}, data_parallel_rank: {self.data_parallel_rank}, tensor_parallel_rank: {self.tensor_parallel_rank}, expert_parallel_rank: {self.expert_parallel_rank}, tp_group: {self.tp_group}."
357361
)
@@ -830,6 +834,7 @@ class LoadConfig:
830834
load_strategy: Specifies the weight loading method when enabled:
831835
- 'ipc': Real-time IPC streaming with automatic resharding
832836
- 'ipc_snapshot': Load from disk snapshot of IPC weights
837+
- 'meta': Only model meta messages
833838
- None: No dynamic loading
834839
"""
835840

@@ -840,7 +845,7 @@ def __init__(
840845
self.load_choices: Union[str, LoadChoices] = LoadChoices.DEFAULT.value
841846
self.use_fastsafetensor = int(envs.FD_USE_FASTSAFETENSOR) == 1
842847
self.dynamic_load_weight: bool = False
843-
self.load_strategy: Optional[Literal["ipc", "ipc_snapshot"]] = None
848+
self.load_strategy: Optional[Literal["ipc", "ipc_snapshot", "meta", "normal"]] = "normal"
844849
for key, value in args.items():
845850
if hasattr(self, key):
846851
setattr(self, key, value)
@@ -1198,12 +1203,10 @@ def __init__(
11981203

11991204
num_ranks = self.parallel_config.tensor_parallel_size * self.parallel_config.data_parallel_size
12001205
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
1201-
if num_ranks > self.max_chips_per_node:
1206+
if num_ranks > self.max_chips_per_node and self.load_config.load_strategy != "meta":
12021207
self.worker_num_per_node = self.max_chips_per_node
12031208
nnode = ceil_div(num_ranks, self.worker_num_per_node)
12041209
assert nnode == self.nnode, f"nnode: {nnode}, but got {self.nnode}"
1205-
1206-
# assert nnode == self.nnode, f"nnode: {nnode}, but got {self.nnode}"
12071210
else:
12081211
self.worker_num_per_node = num_ranks
12091212

fastdeploy/engine/args_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class EngineArgs:
134134
"""
135135
dynamic load weight
136136
"""
137-
load_strategy: str = "ipc_snapshot"
137+
load_strategy: str = "normal"
138138
"""
139139
dynamic load weight strategy
140140
"""

0 commit comments

Comments
 (0)