Skip to content

Conversation

Skywalker-EP
Copy link

What this PR does / why we need it?

Topology-Aware Expert Load Balancing(TA-ELB) optimazation for EPLB

Co-authored-by: shiyuan680 [email protected]
Co-authored-by: yechao237 [email protected]
Co-authored-by: walterchenchn [email protected]
Co-authored-by: jiishy [email protected]

EPLB commits #1943

Does this PR introduce any user-facing change?

How was this patch tested?


Signed-off-by: Skywalker-EP [email protected]

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces topology-aware expert load balancing (TA-ELB) optimizations for EPLB. The changes are extensive, touching policy logic, the EPLB updater, and MoE operations. While the goal of topology awareness is beneficial, the implementation has several issues that need addressing. I've identified a critical bug in fused_moe.py that will cause a runtime error, along with several high-severity issues related to hardcoded values, overly complex and difficult-to-maintain logic, performance inefficiencies from repeated buffer allocations, and the presence of dead code. Addressing these points will significantly improve the robustness and maintainability of the new functionality.

@@ -1200,7 +1200,11 @@ def __init__(
expert_load_balancer.get_rank_placement_map(
self.moe_instance_id, self.ep_rank))
self.log2phy = expert_load_balancer.get_rank_log2phy_map(
self.moe_instance_id, self.ep_rank).npu()
self.moe_instance_id, get_ep_group().rank_in_group).npu()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The method get_rank_log2phy_map now returns a tuple of tensors (log2phy_map, num_experts). Calling .npu() on this tuple will raise an AttributeError. The .npu() call should be removed from this line. The subsequent lines correctly move the individual tensors from the tuple to the NPU device.

Suggested change
self.moe_instance_id, get_ep_group().rank_in_group).npu()
self.moe_instance_id, get_ep_group().rank_in_group)

Comment on lines +110 to +113
if (expert_in_hosts[item_id][host_id] > expert_in_hosts[item_id][check_pair[host_id][0]]
or expert_in_hosts[item_id][host_id] > expert_in_hosts[item_id][check_pair[host_id][1]]
or (max_count != min_count and expert_in_hosts[item_id][host_id] != min_count)
or item_id in boxes[i]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The condition to filter eligible boxes (cards) for expert placement is very complex and difficult to understand. This level of complexity can make the code hard to maintain and debug. Consider refactoring this logic for clarity and simplicity. The core goal seems to be balancing experts across hosts. A simpler condition might be to only allow placing an expert on a host that has the minimum number of instances of that expert.

Comment on lines +169 to +182
with torch.npu.stream(self.compute_moe_load_async):
self.world_size = dist.get_world_size()
self.device = local_load.device
if self._gather_buffer is None:
shape = (self.world_size, *local_load.shape)
self._gather_buffer = torch.empty(shape,
dtype=local_load.dtype,
device=self.device)

dist.all_gather_into_tensor(self._gather_buffer, local_load)

moe_load = self._gather_buffer.permute(1, 0, 2)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic within this with block relies on self._gather_buffer for caching, but it is reset to None on every call to compute_and_set_moe_load at line 167. This leads to inefficient re-allocation of the buffer because the if self._gather_buffer is None: check at line 172 will always be true. To fix this, self._gather_buffer should be initialized as an instance attribute in init_eplb and not reset within this function.

Comment on lines +83 to +112
def update_expert_map(self, expert_loc, log2phy_map, max_num_dups, rank_id):
ep_size = get_ep_group().world_size
redundancy_shared_expert_num = self.get_global_redundant_expert_num()
n_total_experts = self.global_expert_num + redundancy_shared_expert_num

for i in range(self.global_expert_num):
same_rank_candidates = []
same_node_candidates = []
experts_per_device = n_total_experts // ep_size
all_candidates = []
phy_list = expert_loc[i]
current_device = rank_id
for phy in phy_list:
phy_device = phy // experts_per_device
if phy_device == current_device:
same_rank_candidates.append(phy)
elif (phy_device // self.ranks_num) == (current_device // self.ranks_num):
same_node_candidates.append(phy)
else:
all_candidates.append(phy)
tmp_expert_loc_map = torch.zeros([max_num_dups], dtype=torch.int32)

if same_rank_candidates:
expert_loc[i] = same_rank_candidates
elif same_node_candidates:
expert_loc[i] = same_node_candidates
tmp_expert_loc_map[: len(expert_loc[i])] = torch.tensor(expert_loc[i], dtype=torch.int32)

log2phy_map[i] = tmp_expert_loc_map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function update_expert_map appears to be dead code, as its only call site is commented out. It also seems to contain a bug where self.ranks_num is used to determine the node, which is likely incorrect. This unused and potentially buggy code should be removed to improve maintainability and reduce clutter.


def update_expert_loc_map_v1(self, expert_loc, current_rank):

device_per_host = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The device_per_host is hardcoded to 16. This makes the logic less flexible and difficult to adapt to different hardware configurations. This value should be passed in as a parameter or read from a configuration to improve portability.

Comment on lines +135 to +151
is_imbalanced = False
if num_replicas > num_hosts and num_replicas % num_hosts != 0:
replica_per_node = {}
for phy in phy_list:
phy_device = phy // experts_per_device
phy_node = phy_device // device_per_host
local_rank = phy_device % device_per_host
if phy_node not in replica_per_node:
replica_per_node[phy_node] = []
replica_per_node[phy_node].append(local_rank)
base_replicas_per_host = num_replicas // num_hosts
if len(replica_per_node[current_node]) == base_replicas_per_host:
available_ranks = list(set(range(device_per_host)) - set(replica_per_node[current_node]))
expected_load = round(device_per_host / (base_replicas_per_host + 1))
if current_rank_in_node in available_ranks:
if available_ranks.index(current_rank_in_node) >= (expected_load - 1) * base_replicas_per_host:
is_imbalanced = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic to determine is_imbalanced is very complex and difficult to understand, which impacts maintainability. The purpose of these calculations is not clear from the code alone. Please consider refactoring this section for simplicity and adding comments to explain the heuristic being implemented for handling imbalanced expert replica distributions.

@Yikun
Copy link
Collaborator

Yikun commented Aug 28, 2025

v0.9.1-dev are code freezing, can you make sure is this still needed? or just move to main branch. Thanks.

@Skywalker-EP
Copy link
Author

Skywalker-EP commented Aug 28, 2025

v0.9.1-dev are code freezing, can you make sure is this still needed? or just move to main branch. Thanks.

We can move TA-ELB to main branch later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants