Skip to content

Commit 7f2d30f

Browse files
Fix endless creation of best possible nodes triggered by unintended modification of cached best possible map. (#2970)
The baseline and bestPossible maps are type Map<String, ResourceAssignment>. When this map is stored in the cache by persistBaseline() and persistBestPossibleAssignment(), the ResourceAssignment objects can still be accessed and modified after calling the relevant persist method. This change creates a deep copy of the assignment map with new ResourceAssignment objects to prevent unintended modification of the cached assignment after the call to persist.
1 parent 892fc27 commit 7f2d30f

File tree

2 files changed

+109
-4
lines changed

2 files changed

+109
-4
lines changed

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.Map;
2626

27+
import java.util.stream.Collectors;
2728
import org.apache.helix.BucketDataAccessor;
2829
import org.apache.helix.HelixException;
2930
import org.apache.helix.HelixProperty;
@@ -133,11 +134,15 @@ private void persistAssignmentToMetadataStore(Map<String, ResourceAssignment> ne
133134
* @param globalBaseline
134135
*/
135136
public synchronized void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
137+
// Create defensive copy so that the in-memory assignment is not modified after it is persisted
138+
Map<String, ResourceAssignment> baselineCopy = globalBaseline.entrySet().stream().collect(
139+
Collectors.toMap(Map.Entry::getKey,
140+
entry -> new ResourceAssignment(entry.getValue().getRecord())));
136141
// write to metadata store
137-
persistAssignmentToMetadataStore(globalBaseline, _baselinePath, BASELINE_KEY);
142+
persistAssignmentToMetadataStore(baselineCopy, _baselinePath, BASELINE_KEY);
138143
// write to memory
139144
getBaseline().clear();
140-
getBaseline().putAll(globalBaseline);
145+
getBaseline().putAll(baselineCopy);
141146
}
142147

143148
/**
@@ -146,11 +151,15 @@ public synchronized void persistBaseline(Map<String, ResourceAssignment> globalB
146151
* @param bestPossibleAssignment
147152
*/
148153
public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
154+
// Create defensive copy so that the in-memory assignment is not modified after it is persisted
155+
Map<String, ResourceAssignment> bestPossibleAssignmentCopy = bestPossibleAssignment.entrySet().stream().collect(
156+
Collectors.toMap(Map.Entry::getKey,
157+
entry -> new ResourceAssignment(entry.getValue().getRecord())));
149158
// write to metadata store
150-
persistAssignmentToMetadataStore(bestPossibleAssignment, _bestPossiblePath, BEST_POSSIBLE_KEY);
159+
persistAssignmentToMetadataStore(bestPossibleAssignmentCopy, _bestPossiblePath, BEST_POSSIBLE_KEY);
151160
// write to memory
152161
getBestPossibleAssignment().clear();
153-
getBestPossibleAssignment().putAll(bestPossibleAssignment);
162+
getBestPossibleAssignment().putAll(bestPossibleAssignmentCopy);
154163
_bestPossibleVersion++;
155164
_lastPersistedBestPossibleVersion = _bestPossibleVersion;
156165
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package org.apache.helix.integration;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.List;
6+
import org.apache.helix.ConfigAccessor;
7+
import org.apache.helix.TestHelper;
8+
import org.apache.helix.common.ZkTestBase;
9+
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
10+
import org.apache.helix.integration.manager.ClusterControllerManager;
11+
import org.apache.helix.integration.manager.MockParticipantManager;
12+
import org.apache.helix.model.ClusterConfig;
13+
import org.apache.helix.model.IdealState;
14+
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
15+
import org.testng.Assert;
16+
import org.testng.annotations.BeforeClass;
17+
import org.testng.annotations.Test;
18+
19+
20+
public class TestEndlessBestPossibleNodes extends ZkTestBase {
21+
22+
public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
23+
public static int PARTICIPANT_COUNT = 12;
24+
public static List<MockParticipantManager> _participants = new ArrayList<>();
25+
public static ClusterControllerManager _controller;
26+
public static ConfigAccessor _configAccessor;
27+
28+
@BeforeClass
29+
public void beforeClass() {
30+
System.out.println("Start test " + TestHelper.getTestClassName());
31+
_gSetupTool.addCluster(CLUSTER_NAME, true);
32+
for (int i = 0; i < PARTICIPANT_COUNT; i++) {
33+
addParticipant("localhost_" + i);
34+
}
35+
36+
String controllerName = CONTROLLER_PREFIX + "_0";
37+
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
38+
_controller.syncStart();
39+
40+
_configAccessor = new ConfigAccessor(_gZkClient);
41+
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
42+
clusterConfig.setInstanceCapacityKeys(Collections.singletonList("partcount"));
43+
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap("partcount", 10000));
44+
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap("partcount", 1));
45+
clusterConfig.setDelayRebalaceEnabled(true);
46+
clusterConfig.setRebalanceDelayTime(57600000);
47+
clusterConfig.setPersistBestPossibleAssignment(true);
48+
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
49+
}
50+
51+
// This test was constructed to capture the bug described in issue 2971
52+
// https://github.com/apache/helix/issues/2971
53+
@Test
54+
public void testEndlessBestPossibleNodes() throws Exception {
55+
int numPartition = 10;
56+
BestPossibleExternalViewVerifier verifier =
57+
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
58+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
59+
60+
// Create 1 WAGED Resource
61+
String firstDB = "InPlaceMigrationTestDB3";
62+
_gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
63+
IdealState.RebalanceMode.FULL_AUTO.name(), null);
64+
IdealState idealStateOne =
65+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
66+
idealStateOne.setMinActiveReplicas(2);
67+
idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
68+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
69+
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
70+
Assert.assertTrue(verifier.verifyByPolling());
71+
72+
// Disable instances so delay rebalance overwrite is required due to min active
73+
for (int i = 0; i < PARTICIPANT_COUNT/2; i++) {
74+
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants.get(i).getInstanceName(), false);
75+
}
76+
77+
// Add new instance to cause partial rebalance to calculate a new best possible
78+
addParticipant("newInstance_0");
79+
80+
// Sleep to let pipeline run and ZK writes occur
81+
Thread.sleep(5000);
82+
83+
// There should only be 2 best possibles created (children will be 0, 1, LAST_WRITE, and LAST_SUCCESSFUL_WRITE)
84+
int childCount = _gZkClient.getChildren("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/BEST_POSSIBLE").size();
85+
Assert.assertTrue(childCount > 0);
86+
Assert.assertTrue(childCount < 5, "Child count was " + childCount);
87+
}
88+
89+
public MockParticipantManager addParticipant(String instanceName) {
90+
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
91+
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
92+
participant.syncStart();
93+
_participants.add(participant);
94+
return participant;
95+
}
96+
}

0 commit comments

Comments
 (0)