Skip to content

Commit 51bad1e

Browse files
authored
[remove datanode] Accelerate GCR load balancing implement (#15535)
1 parent c28e50f commit 51bad1e

File tree

2 files changed

+148
-134
lines changed

2 files changed

+148
-134
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java

Lines changed: 122 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Collections;
3030
import java.util.Comparator;
3131
import java.util.HashMap;
32-
import java.util.HashSet;
3332
import java.util.List;
3433
import java.util.Map;
3534
import java.util.Random;
@@ -42,7 +41,7 @@
4241
public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator {
4342

4443
private static final Random RANDOM = new Random();
45-
private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100;
44+
private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10;
4645

4746
private int replicationFactor;
4847
// Sorted available DataNodeIds
@@ -57,14 +56,31 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator
5756
private Map<String, int[]> initialDbLoad;
5857

5958
// First Key: the sum of Regions at the DataNodes in the allocation result is minimal
60-
int optimalRegionSum;
59+
private int optimalRegionSum;
6160
// Second Key: the sum of Regions at the DataNodes within the same Database
6261
// in the allocation result is minimal
63-
int optimalDatabaseRegionSum;
62+
private int optimalDatabaseRegionSum;
6463
// Third Key: the sum of overlapped 2-Region combination Regions with
6564
// other allocated RegionGroups is minimal
66-
int optimalCombinationSum;
67-
List<int[]> optimalReplicaSets;
65+
private int optimalCombinationSum;
66+
private List<int[]> optimalReplicaSets;
67+
68+
// Pre-calculation, scatterDelta[i][j] means the scatter increment between region i and the old
69+
// replica set when region i is placed on node j
70+
private int[][] scatterDelta;
71+
// For each region, the allowed candidate destination node IDs.
72+
private Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap;
73+
// A list of regions that need to be migrated.
74+
private List<TConsensusGroupId> dfsRegionKeys;
75+
// A mapping from each region identifier to its corresponding database name.
76+
private Map<TConsensusGroupId, String> regionDatabaseMap;
77+
// Buffer holding best assignment arrays.
78+
private int[] bestAssignment;
79+
// An int array holding the best metrics found so far: [maxGlobalLoad, maxDatabaseLoad,
80+
// scatterValue].
81+
private int[] bestMetrics;
82+
// dfsRemoveNodeReplica batch size
83+
private static final int BATCH_SIZE = 12;
6884

6985
private static class DataNodeEntry {
7086

@@ -146,15 +162,14 @@ public Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
146162
// availableDataNodeMap
147163
// excluding those already in the remain replica set.
148164
List<TConsensusGroupId> regionKeys = new ArrayList<>(remainReplicasMap.keySet());
149-
Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap = new HashMap<>();
165+
allowedCandidatesMap = new HashMap<>();
166+
this.regionDatabaseMap = regionDatabaseMap;
150167
for (TConsensusGroupId regionId : regionKeys) {
151168
TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
152-
Set<Integer> notAllowedNodes = new HashSet<>();
153-
154-
// Exclude nodes already present in the remain replica set
155-
for (TDataNodeLocation location : remainReplicaSet.getDataNodeLocations()) {
156-
notAllowedNodes.add(location.getDataNodeId());
157-
}
169+
Set<Integer> notAllowedNodes =
170+
remainReplicaSet.getDataNodeLocations().stream()
171+
.map(TDataNodeLocation::getDataNodeId)
172+
.collect(Collectors.toSet());
158173

159174
// Allowed candidates are the nodes not in the exclusion set
160175
List<Integer> candidates =
@@ -163,12 +178,12 @@ public Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
163178
.sorted(
164179
(a, b) -> {
165180
int cmp = Integer.compare(regionCounter[a], regionCounter[b]);
166-
if (cmp == 0) {
167-
cmp = Integer.compare(databaseRegionCounter[a], databaseRegionCounter[b]);
168-
}
169-
return cmp;
181+
return (cmp != 0)
182+
? cmp
183+
: Integer.compare(databaseRegionCounter[a], databaseRegionCounter[b]);
170184
})
171185
.collect(Collectors.toList());
186+
Collections.shuffle(candidates);
172187

173188
// Sort candidates in ascending order of current global load (regionCounter)
174189
allowedCandidatesMap.put(regionId, candidates);
@@ -178,44 +193,63 @@ public Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
178193
// first)
179194
regionKeys.sort(Comparator.comparingInt(id -> allowedCandidatesMap.get(id).size()));
180195

181-
int n = regionKeys.size();
182-
// Each element holds the candidate nodeId chosen for the region at that index
183-
int[] currentAssignment = new int[n];
184-
// additionalLoad holds the number of extra regions assigned to each node in this migration
185-
// solution.
186-
int[] additionalLoad = new int[regionCounter.length];
196+
// 3. Batch DFS
197+
Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
187198

188-
// 3. Create a buffer for candidate solutions
189-
List<int[]> optimalAssignments = new ArrayList<>();
190-
// bestMetrics holds the best found metrics: [maxGlobalLoad, maxDatabaseLoad, scatterValue].
191-
// Initialize to high values.
192-
int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE};
199+
for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) {
200+
dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE, regionKeys.size()));
201+
int batchSize = dfsRegionKeys.size();
202+
203+
// Initialize buffer
204+
bestAssignment = new int[batchSize];
205+
// bestMetrics holds the best found metrics: [maxGlobalLoad, maxDatabaseLoad, scatterValue].
206+
bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE};
207+
// currentAssignment holds the candidate nodeId chosen for the region at that index
208+
int[] currentAssignment = new int[batchSize];
209+
// additionalLoad holds the number of extra regions assigned to each node in this migration
210+
// solution.
211+
int[] additionalLoad = new int[regionCounter.length];
212+
213+
scatterDelta = new int[batchSize][regionCounter.length];
214+
for (int r = 0; r < batchSize; r++) {
215+
TConsensusGroupId regionId = dfsRegionKeys.get(r);
216+
for (int nodeId : allowedCandidatesMap.get(regionId)) {
217+
int inc = 0;
218+
for (TDataNodeLocation loc : remainReplicasMap.get(regionId).getDataNodeLocations()) {
219+
inc += combinationCounter[nodeId][loc.getDataNodeId()];
220+
}
221+
scatterDelta[r][nodeId] = inc;
222+
}
223+
}
193224

194-
dfsRemoveNodeReplica(
195-
0,
196-
regionKeys,
197-
allowedCandidatesMap,
198-
currentAssignment,
199-
additionalLoad,
200-
optimalAssignments,
201-
bestMetrics,
202-
remainReplicasMap,
203-
regionDatabaseMap);
204-
205-
// 4. Randomly select one solution from the candidate buffer
206-
if (optimalAssignments.isEmpty()) {
207-
// This should not happen if there is at least one valid assignment
208-
return Collections.emptyMap();
209-
}
210-
Collections.shuffle(optimalAssignments);
211-
int[] bestAssignment = optimalAssignments.get(0);
225+
int currentMaxGlobalLoad = 0;
226+
for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
227+
currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, regionCounter[nodeId]);
228+
}
212229

213-
// 5. Build and return the result mapping: region -> chosen destination TDataNodeConfiguration
214-
Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
215-
for (int i = 0; i < n; i++) {
216-
TConsensusGroupId regionId = regionKeys.get(i);
217-
int chosenNodeId = bestAssignment[i];
218-
result.put(regionId, availableDataNodeMap.get(chosenNodeId));
230+
dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment, additionalLoad);
231+
232+
if (bestMetrics[0] == Integer.MAX_VALUE) {
233+
// This should not happen if there is at least one valid assignment
234+
return Collections.emptyMap();
235+
}
236+
237+
for (int i = 0; i < batchSize; i++) {
238+
TConsensusGroupId regionId = dfsRegionKeys.get(i);
239+
int chosenNodeId = bestAssignment[i];
240+
result.put(regionId, availableDataNodeMap.get(chosenNodeId));
241+
242+
regionCounter[chosenNodeId]++;
243+
String db = regionDatabaseMap.get(regionId);
244+
if (db != null) {
245+
int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new int[regionCounter.length]);
246+
dbLoad[chosenNodeId]++;
247+
}
248+
for (TDataNodeLocation loc : remainReplicasMap.get(regionId).getDataNodeLocations()) {
249+
combinationCounter[chosenNodeId][loc.getDataNodeId()]++;
250+
combinationCounter[loc.getDataNodeId()][chosenNodeId]++;
251+
}
252+
}
219253
}
220254
return result;
221255
} finally {
@@ -244,99 +278,64 @@ public Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
244278
* <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
245279
*
246280
* @param index Current DFS level, corresponding to regionKeys.get(index)
247-
* @param regionKeys A list of regions that need to be migrated.
248-
* @param allowedCandidatesMap For each region, the allowed candidate destination node IDs.
281+
* @param currentMaxGlobalLoad The maximum global load across all data nodes.
282+
* @param currentScatter The scatter value for the complete assignment.
249283
* @param currentAssignment Current partial assignment; its length equals the number of regions.
250284
* @param additionalLoad Extra load currently assigned to each node.
251-
* @param optimalAssignments Buffer holding candidate assignment arrays.
252-
* @param bestMetrics An int array holding the best metrics found so far: [maxGlobalLoad,
253-
* maxDatabaseLoad, scatterValue].
254-
* @param remainReplicasMap Mapping from region to its current remain replica set.
255285
*/
256286
private void dfsRemoveNodeReplica(
257287
int index,
258-
List<TConsensusGroupId> regionKeys,
259-
Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap,
288+
int currentMaxGlobalLoad,
289+
int currentScatter,
260290
int[] currentAssignment,
261-
int[] additionalLoad,
262-
List<int[]> optimalAssignments,
263-
int[] bestMetrics,
264-
Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap,
265-
Map<TConsensusGroupId, String> regionDatabaseMap) {
266-
int n = regionKeys.size();
267-
if (index == n) {
268-
// A complete assignment has been generated.
269-
// Compute metrics for this complete migration assignment.
270-
271-
// Compute the scatter value for the complete assignment.
272-
int currentScatter = 0;
273-
// For each region, calculate the scatter based on the combinationCounter among all nodes
274-
// in the full replica set (which includes the nodes in the remain replica and the new
275-
// candidate).
276-
for (int r = 0; r < n; r++) {
277-
TConsensusGroupId regionId = regionKeys.get(r);
278-
for (TDataNodeLocation location : remainReplicasMap.get(regionId).getDataNodeLocations()) {
279-
int nodeA = currentAssignment[r];
280-
int nodeB = location.getDataNodeId();
281-
currentScatter += combinationCounter[nodeA][nodeB];
282-
}
291+
int[] additionalLoad) {
292+
// Compute the maximum global load and maximum database load among all nodes that received
293+
// additional load.
294+
int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter, currentAssignment);
295+
// Lexicographically compare currentMetrics with bestMetrics.
296+
// If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate
297+
// buffer.
298+
boolean isBetter = false;
299+
boolean isEqual = true;
300+
for (int i = 0; i < 3; i++) {
301+
if (currentMetrics[i] < bestMetrics[i]) {
302+
isBetter = true;
303+
isEqual = false;
304+
break;
305+
} else if (currentMetrics[i] > bestMetrics[i]) {
306+
isEqual = false;
307+
break;
283308
}
309+
}
310+
if (!isBetter && !isEqual) {
311+
return;
312+
}
284313

285-
// Compute the maximum global load and maximum database load among all nodes that received
286-
// additional load.
287-
int[] currentMetrics =
288-
getCurrentMetrics(
289-
additionalLoad, currentScatter, regionKeys, regionDatabaseMap, currentAssignment);
290-
291-
// Lexicographically compare currentMetrics with bestMetrics.
292-
// If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate
293-
// buffer.
294-
boolean isBetter = false;
295-
boolean isEqual = true;
296-
for (int i = 0; i < 3; i++) {
297-
if (currentMetrics[i] < bestMetrics[i]) {
298-
isBetter = true;
299-
isEqual = false;
300-
break;
301-
} else if (currentMetrics[i] > bestMetrics[i]) {
302-
isEqual = false;
303-
break;
304-
}
305-
}
314+
if (index == dfsRegionKeys.size()) {
306315
if (isBetter) {
307316
bestMetrics[0] = currentMetrics[0];
308317
bestMetrics[1] = currentMetrics[1];
309318
bestMetrics[2] = currentMetrics[2];
310-
optimalAssignments.clear();
311-
optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
312-
} else if (isEqual) {
313-
optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
314-
// Prune search if we already have enough candidate solutions
315-
if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) {
316-
return;
317-
}
319+
System.arraycopy(currentAssignment, 0, bestAssignment, 0, dfsRegionKeys.size());
318320
}
319321
return;
320322
}
321323

322324
// Process the region at the current index.
323-
TConsensusGroupId regionId = regionKeys.get(index);
325+
TConsensusGroupId regionId = dfsRegionKeys.get(index);
324326
List<Integer> candidates = allowedCandidatesMap.get(regionId);
325327
for (Integer candidate : candidates) {
326328
currentAssignment[index] = candidate;
329+
currentScatter += scatterDelta[index][currentAssignment[index]];
327330
additionalLoad[candidate]++;
331+
int nextMaxGlobalLoad =
332+
Math.max(currentMaxGlobalLoad, regionCounter[candidate] + additionalLoad[candidate]);
333+
328334
dfsRemoveNodeReplica(
329-
index + 1,
330-
regionKeys,
331-
allowedCandidatesMap,
332-
currentAssignment,
333-
additionalLoad,
334-
optimalAssignments,
335-
bestMetrics,
336-
remainReplicasMap,
337-
regionDatabaseMap);
335+
index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment, additionalLoad);
338336
// Backtrack
339337
additionalLoad[candidate]--;
338+
currentScatter -= scatterDelta[index][currentAssignment[index]];
340339
}
341340
}
342341

@@ -411,20 +410,12 @@ private int computeDatabaseLoadSquaredSum(
411410
* @param additionalLoad an array representing the additional load assigned to each node during
412411
* migration.
413412
* @param currentScatter the current scatter value metric.
414-
* @param regionKeys a list of region identifiers (TConsensusGroupId) for which migration is being
415-
* computed.
416-
* @param regionDatabaseMap a mapping from each region identifier to its corresponding database
417-
* name.
418413
* @param currentAssignment an array where each element is the nodeId assigned for the
419414
* corresponding region in {@code regionKeys}.
420415
* @return an integer array of size 3: [maxGlobalLoad, databaseLoadSquaredSum, scatterValue].
421416
*/
422417
private int[] getCurrentMetrics(
423-
int[] additionalLoad,
424-
int currentScatter,
425-
List<TConsensusGroupId> regionKeys,
426-
Map<TConsensusGroupId, String> regionDatabaseMap,
427-
int[] currentAssignment) {
418+
int[] additionalLoad, int currentScatter, int[] currentAssignment) {
428419
int currentMaxGlobalLoad = 0;
429420
// Calculate the maximum global load across all data nodes.
430421
for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
@@ -433,7 +424,7 @@ private int[] getCurrentMetrics(
433424
}
434425
// Compute the database load squared sum using the helper method.
435426
int dbLoadSquaredSum =
436-
computeDatabaseLoadSquaredSum(currentAssignment, regionKeys, regionDatabaseMap);
427+
computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys, regionDatabaseMap);
437428
// Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, scatterValue]
438429
return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
439430
}

0 commit comments

Comments
 (0)