Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalance
for (Resource resource : wagedRebalancedResourceMap.values()) {
IdealState is = newIdealStates.get(resource.getResourceName());
// Check if the WAGED rebalancer has calculated the result for this resource or not.
if (is != null && checkBestPossibleStateCalculation(is)) {
if (is != null && checkBestPossibleStateCalculation(is, resource, currentStateOutput)) {
// The WAGED rebalancer calculates a valid result, record in the output
updateBestPossibleStateOutput(output, resource, is);
} else {
Expand Down Expand Up @@ -540,7 +540,7 @@ private boolean computeSingleResourceBestPossibleState(ClusterEvent event,
rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);

// Check if calculation is done successfully
if (!checkBestPossibleStateCalculation(idealState)) {
if (!checkBestPossibleStateCalculation(idealState, resource, currentStateOutput)) {
LogUtil.logWarn(logger, _eventId,
"The calculated idealState is not valid, resource: " + resourceName);
return false;
Expand Down Expand Up @@ -577,28 +577,62 @@ private boolean computeSingleResourceBestPossibleState(ClusterEvent event,
return false;
}

private boolean checkBestPossibleStateCalculation(IdealState idealState) {
private boolean checkBestPossibleStateCalculation(IdealState idealState, Resource resource,
CurrentStateOutput currentStateOutput) {
// If replicas is 0, indicate the resource is not fully initialized or ready to be rebalanced
if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO && !idealState
.getReplicas().equals("0")) {
Map<String, List<String>> preferenceLists = idealState.getPreferenceLists();
if (preferenceLists == null || preferenceLists.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will the preference list be null?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this end up in a situation where rebalancer could fail to calculate preference list due to some other issue and we end up dropping all partitions which could be a risky behaviour?

return false;
// Empty preference lists: allow only when there are existing replicas to clean up
// (e.g., all nodes disabled). Reject when resource is not initialized (no current state).
return hasCurrentStateForResource(resource, currentStateOutput);
}
int emptyListCount = 0;
for (List<String> preferenceList : preferenceLists.values()) {
if (preferenceList.isEmpty()) {
emptyListCount++;
}
}
// If all lists are empty, rebalance fails completely
return emptyListCount != preferenceLists.values().size();
if (emptyListCount == preferenceLists.values().size()) {
// All lists empty: allow only when there are replicas to clean up (all nodes disabled).
return hasCurrentStateForResource(resource, currentStateOutput);
}
// Some but not all lists empty: this is valid when maxPartitionsPerInstance limits capacity.
// Only reject when maxPartitionsPerInstance is NOT set and we have inconsistent empty lists.
if (emptyListCount > 0 && idealState.getMaxPartitionsPerInstance() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirm the default value for getMaxPartitionsPerInstance once.

// Empty lists are expected when capacity is limited by maxPartitionsPerInstance
return true;
}
// No maxPartitionsPerInstance configured: empty lists indicate inconsistent state, reject.
return emptyListCount == 0;
} else {
// For non FULL_AUTO RebalanceMode, rebalancing is not controlled by Helix
return true;
}
}

/**
* Returns true if the resource has at least one partition with existing replicas in current state.
* Used to distinguish "all nodes disabled" (replicas exist, need cleanup) from "resource not
* initialized" (no replicas, should skip).
*/
private boolean hasCurrentStateForResource(Resource resource,
CurrentStateOutput currentStateOutput) {
if (resource == null || currentStateOutput == null) {
return false;
}
String resourceName = resource.getResourceName();
for (Partition partition : resource.getPartitions()) {
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
if (currentStateMap != null && !currentStateMap.isEmpty()) {
return true;
}
}
return false;
}

private Rebalancer<ResourceControllerDataProvider> getCustomizedRebalancer(
String rebalancerClassName, String resourceName) {
Rebalancer<ResourceControllerDataProvider> customizedRebalancer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixDefinedState;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState.RebalanceMode;
Expand Down Expand Up @@ -156,6 +158,150 @@ public void testAutoEnterMaintenanceWhenExceedingOfflineNodes() {
.get("localhost_2"));
}

/**
* Tests that when all instances are disabled, the pipeline continues and computes DROPPED
* transitions for existing replicas. This verifies the fix for the bug where the last instance
* would stay stuck as LEADER when all nodes are disabled.
*/
@Test
public void testAllNodesDisabledComputesDroppedForExistingReplicas() {
String[] resources = new String[]{"resource_1"};
int numInstances = 3;
int numPartitions = 1;

setupIdealState(numInstances, resources, numPartitions, 1, RebalanceMode.FULL_AUTO,
BuiltInStateModelDefinitions.LeaderStandby.name(),
DelayedAutoRebalancer.class.getName());
setupInstances(numInstances);
List<String> liveInstances = setupLiveInstances(numInstances);
setupStateModel();

// Short delay so disabled instances are immediately inactive (no delay window)
ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
clusterConfig.setRebalanceDelayTime(0);
clusterConfig.setDelayRebalaceEnabled(true);
setClusterConfig(clusterConfig);

Map<String, Resource> resourceMap =
getResourceMap(resources, numPartitions, BuiltInStateModelDefinitions.LeaderStandby.name());
CurrentStateOutput currentStateOutput = new CurrentStateOutput();

// Simulate existing replica: localhost_2 holds LEADER for resource_1_0
Partition partition = new Partition("resource_1_0");
currentStateOutput.setCurrentState("resource_1", partition, "localhost_2", "LEADER");

// Disable ALL instances
for (String instance : liveInstances) {
admin.enableInstance(_clusterName, instance, false);
}

event.addAttribute(AttributeName.helixmanager.name(), manager);
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());

runStage(event, new ReadClusterDataStage());
runStage(event, new BestPossibleStateCalcStage());

BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());

// Pipeline should continue and compute DROPPED for the instance that had LEADER
Assert.assertTrue(output.containsResource("resource_1"),
"Resource should be in output when all nodes disabled but replicas exist");
Assert.assertEquals(
output.getInstanceStateMap("resource_1", partition).get("localhost_2"),
HelixDefinedState.DROPPED.name(),
"Instance with LEADER should transition to DROPPED when all nodes are disabled");
}

/**
* Tests that when all instances are disabled AND no current state exists (resource not
* initialized), the pipeline correctly rejects and does not add the resource to output.
*/
@Test
public void testAllNodesDisabledRejectsWhenNoCurrentState() {
String[] resources = new String[]{"resource_1"};
int numInstances = 3;
int numPartitions = 1;

setupIdealState(numInstances, resources, numPartitions, 1, RebalanceMode.FULL_AUTO,
BuiltInStateModelDefinitions.LeaderStandby.name(),
DelayedAutoRebalancer.class.getName());
setupInstances(numInstances);
List<String> liveInstances = setupLiveInstances(numInstances);
setupStateModel();

ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
clusterConfig.setRebalanceDelayTime(0);
clusterConfig.setDelayRebalaceEnabled(true);
setClusterConfig(clusterConfig);

Map<String, Resource> resourceMap =
getResourceMap(resources, numPartitions, BuiltInStateModelDefinitions.LeaderStandby.name());
// No current state - resource not initialized
CurrentStateOutput currentStateOutput = new CurrentStateOutput();

// Disable ALL instances
for (String instance : liveInstances) {
admin.enableInstance(_clusterName, instance, false);
}

event.addAttribute(AttributeName.helixmanager.name(), manager);
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());

runStage(event, new ReadClusterDataStage());
runStage(event, new BestPossibleStateCalcStage());

BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());

// Resource should NOT be in output when no current state exists (not initialized)
Assert.assertFalse(output.containsResource("resource_1"),
"Resource should not be in output when all nodes disabled and no replicas exist");
}

/**
* Tests that SEMI_AUTO mode is unaffected - empty preference lists are allowed
* (rebalancing is not controlled by Helix) and the pipeline continues.
*/
@Test
public void testSemiAutoModeUnaffectedByEmptyPreferenceList() {
String[] resources = new String[]{"resource_1"};
int numInstances = 3;
int numPartitions = 1;

setupIdealState(numInstances, resources, numPartitions, 1, RebalanceMode.SEMI_AUTO,
BuiltInStateModelDefinitions.MasterSlave.name());
setupInstances(numInstances);
setupLiveInstances(numInstances);
setupStateModel();

Map<String, Resource> resourceMap =
getResourceMap(resources, numPartitions, BuiltInStateModelDefinitions.MasterSlave.name());
CurrentStateOutput currentStateOutput = new CurrentStateOutput();

event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());

runStage(event, new ReadClusterDataStage());
runStage(event, new BestPossibleStateCalcStage());

BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
// SEMI_AUTO uses preference list from IdealState; pipeline should complete
Assert.assertNotNull(output, "Output should not be null");
}

/**
* Test parallel computation with multiple FULL_AUTO resources.
* Verifies that parallel computation using StageThreadPoolHelper produces correct results.
Expand Down
Loading