Skip to content

Commit 02a170c

Browse files
Prevent controller from dropping replicas when best possible fails to calculate assignment (#3034)
* prevent DROPPED messages when mapping cannot be computed * add test for CRUSHED resource * add message generation phase tst * fix test * respond feedback
1 parent 62f0e9c commit 02a170c

File tree

3 files changed

+171
-1
lines changed

3 files changed

+171
-1
lines changed

helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,13 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
154154
// resourceStateMap. This instance may not have had been dropped by the rebalance strategy.
155155
// This check is required to ensure that the instances removed from the ideal state stateMap
156156
// are properly dropped.
157+
// This should only solve for instance operation case where the instance is removed from the statemap but there
158+
// are still valid assignments in the mapping. We should not consider case where there is no mapping at all for
159+
// the resource, which can occur on a rebalance failure. If the resource has been removed, the partition has
160+
// been removed, or the replication factor has been reduced the BP will contain the DROPPED states.See method
161+
// AbstractRebalancer.computeBestPossibleMap - drops replica that is in current state but not in preference list.
157162
for (String instance : currentStateMap.keySet()) {
158-
if (!instanceStateMap.containsKey(instance)) {
163+
if (!instanceStateMap.isEmpty() && !instanceStateMap.containsKey(instance)) {
159164
instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
160165
}
161166
}

helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
3838
import org.apache.helix.controller.pipeline.Pipeline;
3939
import org.apache.helix.controller.pipeline.StageException;
40+
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
41+
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
4042
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
4143
import org.apache.helix.integration.manager.ClusterControllerManager;
4244
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -645,6 +647,71 @@ public void testNoMessageSentOnControllerLeadershipLoss() throws Exception {
645647
deleteCluster(clusterName);
646648
}
647649

650+
@Test
651+
public void testNoMessagesSentOnNoResourceMapping() throws Exception {
652+
String methodName = TestHelper.getTestMethodName();
653+
String clusterName = _className + "_" + methodName;
654+
HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
655+
656+
admin.addCluster(clusterName);
657+
658+
final String resourceName = "testResource_" + methodName;
659+
final String partitionName = resourceName + "_0";
660+
661+
setupStateModel(clusterName);
662+
setupInstances(clusterName, new int[]{0});
663+
List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
664+
0
665+
});
666+
int numPartition = 3;
667+
_gSetupTool.addResourceToCluster(clusterName, resourceName, numPartition, "LeaderStandby",
668+
IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
669+
IdealState idealStateOne =
670+
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, resourceName);
671+
idealStateOne.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
672+
_gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, resourceName, idealStateOne);
673+
_gSetupTool.rebalanceStorageCluster(clusterName, resourceName, 1);
674+
675+
HelixDataAccessor accessor =
676+
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
677+
DummyClusterManager manager =
678+
new DummyClusterManager(clusterName, accessor, Long.toHexString(_gZkClient.getSessionId()));
679+
ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.OnDemandRebalance);
680+
event.addAttribute(AttributeName.helixmanager.name(), manager);
681+
event.addAttribute(AttributeName.ControllerDataProvider.name(),
682+
new ResourceControllerDataProvider());
683+
684+
// cluster data cache refresh pipeline
685+
Pipeline dataRefresh = new Pipeline();
686+
dataRefresh.addStage(new ReadClusterDataStage());
687+
688+
// rebalance pipeline
689+
Pipeline rebalancePipeline = new Pipeline();
690+
rebalancePipeline.addStage(new ResourceComputationStage());
691+
rebalancePipeline.addStage(new CurrentStateComputationStage());
692+
// Add empty best possible output to mimic no calculations being made
693+
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), new BestPossibleStateOutput());
694+
rebalancePipeline.addStage(new MessageGenerationPhase());
695+
rebalancePipeline.addStage(new MessageSelectionStage());
696+
rebalancePipeline.addStage(new IntermediateStateCalcStage());
697+
rebalancePipeline.addStage(new MessageThrottleStage());
698+
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
699+
700+
// Set currentState
701+
setCurrentState(clusterName, "localhost_0", resourceName, partitionName,
702+
liveInstances.get(0).getEphemeralOwner(), "LEADER");
703+
704+
runPipeline(event, dataRefresh, false);
705+
706+
runPipeline(event, rebalancePipeline, true);
707+
708+
// Assert no messages are being sent
709+
MessageOutput msgThrottleOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
710+
List<Message> messages =
711+
msgThrottleOutput.getMessages(resourceName, new Partition(partitionName));
712+
Assert.assertTrue(messages.isEmpty());
713+
}
714+
648715
protected void setCurrentState(String clusterName, String instance, String resourceGroupName,
649716
String resourceKey, String sessionId, String state) {
650717
setCurrentState(clusterName, instance, resourceGroupName, resourceKey, sessionId, state, false);
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.apache.helix.integration;
2+
3+
import org.apache.helix.ConfigAccessor;
4+
import org.apache.helix.TestHelper;
5+
import org.apache.helix.common.ZkTestBase;
6+
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
7+
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
8+
import org.apache.helix.integration.manager.ClusterControllerManager;
9+
import org.apache.helix.model.ClusterConfig;
10+
import org.apache.helix.model.ExternalView;
11+
import org.apache.helix.model.IdealState;
12+
import org.apache.helix.model.InstanceConfig;
13+
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
14+
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
15+
import org.testng.Assert;
16+
import org.testng.annotations.BeforeClass;
17+
import org.testng.annotations.Test;
18+
19+
20+
public class TestPreserveAssignmentsOnRebalanceFailure extends ZkTestBase {
21+
22+
private final String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
23+
public final int PARTICIPANT_COUNT = 3;
24+
private ClusterControllerManager _controller;
25+
private ConfigAccessor _configAccessor;
26+
private StrictMatchExternalViewVerifier _externalViewVerifier;
27+
private BestPossibleExternalViewVerifier _bestPossibleVerifier;
28+
29+
@BeforeClass
30+
public void setup() {
31+
System.out.println("Start test " + TestHelper.getTestClassName());
32+
_configAccessor = new ConfigAccessor(_gZkClient);
33+
_gSetupTool.addCluster(CLUSTER_NAME, true);
34+
for (int i = 0; i < PARTICIPANT_COUNT; i++) {
35+
String instanceName = "localhost_" + i;
36+
addParticipant(CLUSTER_NAME, instanceName);
37+
InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
38+
instanceConfig.setDomain("zone=zone" + i);
39+
_configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
40+
}
41+
42+
// Enable topology aware rebalance and set expcted topology
43+
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
44+
clusterConfig.setFaultZoneType("zone");
45+
clusterConfig.setTopology("/zone");
46+
clusterConfig.setTopologyAwareEnabled(true);
47+
clusterConfig.setPersistBestPossibleAssignment(true);
48+
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
49+
50+
String controllerName = CONTROLLER_PREFIX + "_0";
51+
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
52+
_controller.syncStart();
53+
54+
_externalViewVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
55+
.setDeactivatedNodeAwareness(true)
56+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
57+
.build();
58+
_bestPossibleVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
59+
.setZkAddr(ZK_ADDR)
60+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
61+
.build();
62+
}
63+
64+
// This test verifies that when a mapping cannot be generated for a resource (failureResources in
65+
// BestPossibleStateCalcStage), the replicas are not dropped
66+
@Test
67+
public void testPreserveAssignmentsOnRebalanceFailure() {
68+
System.out.println("Start test: " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName());
69+
70+
// Create a CRUSHED resource
71+
int numPartition = 3;
72+
String firstDB = "firstDB";
73+
_gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
74+
IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
75+
IdealState idealStateOne =
76+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
77+
idealStateOne.setMinActiveReplicas(2);
78+
idealStateOne.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
79+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
80+
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
81+
82+
// Wait for cluster to converge and take a snapshot of the ExternalView
83+
Assert.assertTrue(_bestPossibleVerifier.verifyByPolling());
84+
ExternalView oldEV = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB);
85+
86+
// Add an instance with no domain set to the cluster, this will cause the topology aware assignment to fail
87+
String badInstance = "bad_instance";
88+
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, badInstance);
89+
90+
// Assert EV = IS
91+
Assert.assertTrue(_externalViewVerifier.verifyByPolling());
92+
93+
// Check that the new EV (after bad instance added) is the same as the old EV (before bad instance added)
94+
ExternalView newEV = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB);
95+
Assert.assertEquals(oldEV, newEV);
96+
System.out.println("End test: " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName());
97+
}
98+
}

0 commit comments

Comments
 (0)