Skip to content

Commit c3ff711

Browse files
JeremyDahlgrenandreidan
authored andcommitted
Accumulate compute() calls and iterations between convergences in DesiredBalanceComputer (elastic#126008)
Add tracking of the number of compute() calls and total iterations between convergences in the DesiredBalanceComputer, along with the time since the last convergence. These are included in the log message when the computer doesn't converge. Closes elastic#100850.
1 parent 49fab57 commit c3ff711

File tree

3 files changed

+255
-22
lines changed

3 files changed

+255
-22
lines changed

docs/changelog/126008.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126008
2+
summary: Accumulate compute() calls and iterations between convergences
3+
area: Allocation
4+
type: enhancement
5+
issues:
6+
- 100850

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,20 @@ public class DesiredBalanceComputer {
7272

7373
private TimeValue progressLogInterval;
7474
private long maxBalanceComputationTimeDuringIndexCreationMillis;
75+
private long numComputeCallsSinceLastConverged;
76+
private long numIterationsSinceLastConverged;
77+
private long lastConvergedTimeMillis;
78+
private long lastNotConvergedLogMessageTimeMillis;
79+
private Level convergenceLogMsgLevel;
7580

7681
public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
7782
this.delegateAllocator = delegateAllocator;
7883
this.timeProvider = timeProvider;
84+
this.numComputeCallsSinceLastConverged = 0;
85+
this.numIterationsSinceLastConverged = 0;
86+
this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis();
87+
this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis;
88+
this.convergenceLogMsgLevel = Level.DEBUG;
7989
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
8090
clusterSettings.initializeAndWatch(
8191
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
@@ -89,6 +99,7 @@ public DesiredBalance compute(
8999
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
90100
Predicate<DesiredBalanceInput> isFresh
91101
) {
102+
numComputeCallsSinceLastConverged += 1;
92103
if (logger.isTraceEnabled()) {
93104
logger.trace(
94105
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
@@ -276,7 +287,7 @@ public DesiredBalance compute(
276287
final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
277288
final long timeWarningInterval = progressLogInterval.millis();
278289
final long computationStartedTime = timeProvider.relativeTimeInMillis();
279-
long nextReportTime = computationStartedTime + timeWarningInterval;
290+
long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval;
280291

281292
int i = 0;
282293
boolean hasChanges = false;
@@ -321,22 +332,46 @@ public DesiredBalance compute(
321332
}
322333
}
323334

324-
i++;
335+
i += 1;
336+
numIterationsSinceLastConverged += 1;
325337
final int iterations = i;
326338
final long currentTime = timeProvider.relativeTimeInMillis();
327339
final boolean reportByTime = nextReportTime <= currentTime;
328-
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
340+
final boolean reportByIterationCount = numIterationsSinceLastConverged % iterationCountReportInterval == 0;
329341
if (reportByTime || reportByIterationCount) {
330342
nextReportTime = currentTime + timeWarningInterval;
331343
}
332344

333345
if (hasChanges == false && hasEnoughIterations(i)) {
334-
logger.debug(
335-
"Desired balance computation for [{}] converged after [{}] and [{}] iterations",
336-
desiredBalanceInput.index(),
337-
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
338-
i
339-
);
346+
if (numComputeCallsSinceLastConverged > 1) {
347+
logger.log(
348+
convergenceLogMsgLevel,
349+
() -> Strings.format(
350+
"""
351+
Desired balance computation for [%d] converged after [%s] and [%d] iterations, \
352+
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
353+
desiredBalanceInput.index(),
354+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
355+
numIterationsSinceLastConverged,
356+
numComputeCallsSinceLastConverged,
357+
iterations,
358+
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
359+
)
360+
);
361+
} else {
362+
logger.log(
363+
convergenceLogMsgLevel,
364+
() -> Strings.format(
365+
"Desired balance computation for [%d] converged after [%s] and [%d] iterations",
366+
desiredBalanceInput.index(),
367+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
368+
numIterationsSinceLastConverged
369+
)
370+
);
371+
}
372+
numComputeCallsSinceLastConverged = 0;
373+
numIterationsSinceLastConverged = 0;
374+
lastConvergedTimeMillis = currentTime;
340375
break;
341376
}
342377
if (isFresh.test(desiredBalanceInput) == false) {
@@ -368,15 +403,37 @@ public DesiredBalance compute(
368403
break;
369404
}
370405

371-
logger.log(
372-
reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE,
373-
() -> Strings.format(
374-
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
375-
desiredBalanceInput.index(),
376-
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
377-
iterations
378-
)
379-
);
406+
final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE;
407+
if (numComputeCallsSinceLastConverged > 1) {
408+
logger.log(
409+
logLevel,
410+
() -> Strings.format(
411+
"""
412+
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \
413+
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
414+
desiredBalanceInput.index(),
415+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
416+
numIterationsSinceLastConverged,
417+
numComputeCallsSinceLastConverged,
418+
iterations,
419+
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
420+
)
421+
);
422+
} else {
423+
logger.log(
424+
logLevel,
425+
() -> Strings.format(
426+
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
427+
desiredBalanceInput.index(),
428+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
429+
numIterationsSinceLastConverged
430+
)
431+
);
432+
}
433+
434+
if (reportByIterationCount || reportByTime) {
435+
lastNotConvergedLogMessageTimeMillis = currentTime;
436+
}
380437
}
381438
iterations.inc(i);
382439

@@ -471,4 +528,21 @@ private static int computeIterationCountReportInterval(RoutingAllocation allocat
471528
}
472529
return iterations;
473530
}
531+
532+
// Package-level getters for testing.
533+
long getNumComputeCallsSinceLastConverged() {
534+
return numComputeCallsSinceLastConverged;
535+
}
536+
537+
long getNumIterationsSinceLastConverged() {
538+
return numIterationsSinceLastConverged;
539+
}
540+
541+
long getLastConvergedTimeMillis() {
542+
return lastConvergedTimeMillis;
543+
}
544+
545+
void setConvergenceLogMsgLevel(Level level) {
546+
convergenceLogMsgLevel = level;
547+
}
474548
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@
4343
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
4444
import org.elasticsearch.common.Randomness;
4545
import org.elasticsearch.common.UUIDs;
46+
import org.elasticsearch.common.settings.ClusterSettings;
4647
import org.elasticsearch.common.settings.Settings;
4748
import org.elasticsearch.common.time.TimeProvider;
4849
import org.elasticsearch.common.time.TimeProviderUtils;
4950
import org.elasticsearch.common.unit.ByteSizeValue;
5051
import org.elasticsearch.common.util.Maps;
52+
import org.elasticsearch.core.Strings;
53+
import org.elasticsearch.core.TimeValue;
5154
import org.elasticsearch.index.IndexVersion;
5255
import org.elasticsearch.index.shard.ShardId;
5356
import org.elasticsearch.repositories.IndexId;
@@ -69,6 +72,10 @@
6972
import java.util.concurrent.atomic.AtomicBoolean;
7073
import java.util.concurrent.atomic.AtomicInteger;
7174
import java.util.concurrent.atomic.AtomicLong;
75+
import java.util.concurrent.atomic.AtomicReference;
76+
import java.util.function.Consumer;
77+
import java.util.function.Function;
78+
import java.util.function.Predicate;
7279

7380
import static java.util.stream.Collectors.toMap;
7481
import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting;
@@ -1231,7 +1238,7 @@ public void testShouldLogComputationIteration() {
12311238
"Should report long computation based on time",
12321239
DesiredBalanceComputer.class.getCanonicalName(),
12331240
Level.INFO,
1234-
"Desired balance computation for [*] is still not converged after [1m] and [60] iterations"
1241+
"Desired balance computation for [*] is still not converged after [1m] and [59] iterations"
12351242
)
12361243
);
12371244
}
@@ -1273,28 +1280,174 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
12731280
}
12741281
});
12751282

1276-
assertThatLogger(() -> {
1283+
assertLoggerExpectationsFor(() -> {
12771284
var iteration = new AtomicInteger(0);
12781285
desiredBalanceComputer.compute(
12791286
DesiredBalance.BECOME_MASTER_INITIAL,
12801287
createInput(createInitialClusterState(3)),
12811288
queue(),
12821289
input -> iteration.incrementAndGet() < iterations
12831290
);
1284-
}, DesiredBalanceComputer.class, expectation);
1291+
}, expectation);
1292+
}
1293+
1294+
private void assertLoggerExpectationsFor(Runnable action, MockLog.LoggingExpectation... expectations) {
1295+
assertThatLogger(action, DesiredBalanceComputer.class, expectations);
1296+
}
1297+
1298+
public void testLoggingOfComputeCallsAndIterationsSinceConvergence() {
1299+
final var clusterSettings = new ClusterSettings(
1300+
Settings.builder().put(DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(5L)).build(),
1301+
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
1302+
);
1303+
final var timeInMillis = new AtomicLong(-1L);
1304+
final var iterationCounter = new AtomicInteger(0);
1305+
final var requiredIterations = new AtomicInteger(2);
1306+
final var desiredBalance = new AtomicReference<DesiredBalance>(DesiredBalance.BECOME_MASTER_INITIAL);
1307+
final var indexSequence = new AtomicLong(0);
1308+
final var clusterState = createInitialClusterState(1, 1, 0);
1309+
1310+
final var computer = new DesiredBalanceComputer(
1311+
clusterSettings,
1312+
TimeProviderUtils.create(timeInMillis::incrementAndGet),
1313+
new BalancedShardsAllocator(Settings.EMPTY)
1314+
) {
1315+
@Override
1316+
boolean hasEnoughIterations(int currentIteration) {
1317+
iterationCounter.incrementAndGet();
1318+
return currentIteration >= requiredIterations.get();
1319+
}
1320+
};
1321+
computer.setConvergenceLogMsgLevel(Level.INFO);
1322+
1323+
record ExpectedLastConvergenceInfo(int numComputeCalls, int numTotalIterations, long timestampMillis) {}
1324+
1325+
Consumer<ExpectedLastConvergenceInfo> assertLastConvergenceInfo = data -> {
1326+
assertEquals(data.numComputeCalls(), computer.getNumComputeCallsSinceLastConverged());
1327+
assertEquals(data.numTotalIterations(), computer.getNumIterationsSinceLastConverged());
1328+
assertEquals(data.timestampMillis(), computer.getLastConvergedTimeMillis());
1329+
};
1330+
1331+
final Function<Predicate<DesiredBalanceInput>, Runnable> getComputeRunnableForIsFreshPredicate = isFreshFunc -> {
1332+
final var input = new DesiredBalanceInput(indexSequence.incrementAndGet(), routingAllocationOf(clusterState), List.of());
1333+
return () -> desiredBalance.set(computer.compute(desiredBalance.get(), input, queue(), isFreshFunc));
1334+
};
1335+
1336+
record LogExpectationData(
1337+
boolean isConverged,
1338+
String timeSinceConverged,
1339+
int totalIterations,
1340+
int totalComputeCalls,
1341+
int currentIterations,
1342+
String currentDuration
1343+
) {
1344+
LogExpectationData(boolean isConverged, String timeSinceConverged, int totalIterations) {
1345+
this(isConverged, timeSinceConverged, totalIterations, 0, 0, "");
1346+
}
1347+
}
1348+
1349+
Function<LogExpectationData, MockLog.SeenEventExpectation> getLogExpectation = data -> {
1350+
final var singleComputeCallMsg = "Desired balance computation for [%d] "
1351+
+ (data.isConverged ? "" : "is still not ")
1352+
+ "converged after [%s] and [%d] iterations";
1353+
return new MockLog.SeenEventExpectation(
1354+
"expected a " + (data.isConverged ? "converged" : "not converged") + " log message",
1355+
DesiredBalanceComputer.class.getCanonicalName(),
1356+
Level.INFO,
1357+
(data.totalComputeCalls > 1
1358+
? Strings.format(
1359+
singleComputeCallMsg + ", resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago",
1360+
indexSequence.get(),
1361+
data.timeSinceConverged,
1362+
data.totalIterations,
1363+
data.totalComputeCalls,
1364+
data.currentIterations,
1365+
data.currentDuration
1366+
)
1367+
: Strings.format(singleComputeCallMsg, indexSequence.get(), data.timeSinceConverged, data.totalIterations))
1368+
);
1369+
};
1370+
1371+
final Consumer<DesiredBalance.ComputationFinishReason> assertFinishReason = reason -> {
1372+
assertEquals(reason, desiredBalance.get().finishReason());
1373+
if (DesiredBalance.ComputationFinishReason.CONVERGED == reason) {
1374+
// Verify the number of compute() calls and total iterations have been reset after converging.
1375+
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get()));
1376+
}
1377+
};
1378+
1379+
// No compute() calls yet, last convergence timestamp is the startup time.
1380+
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get()));
1381+
1382+
// Converges right away, verify the debug level convergence message.
1383+
assertLoggerExpectationsFor(
1384+
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
1385+
getLogExpectation.apply(new LogExpectationData(true, "3ms", 2))
1386+
);
1387+
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);
1388+
final var lastConvergenceTimestampMillis = computer.getLastConvergedTimeMillis();
1389+
1390+
// Test a series of compute() calls that don't converge.
1391+
iterationCounter.set(0);
1392+
requiredIterations.set(10);
1393+
// This INFO is triggered from the interval since last convergence timestamp.
1394+
assertLoggerExpectationsFor(
1395+
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 6),
1396+
getLogExpectation.apply(new LogExpectationData(false, "5ms", 4))
1397+
);
1398+
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT);
1399+
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(1, 6, lastConvergenceTimestampMillis));
1400+
1401+
iterationCounter.set(0);
1402+
// The next INFO is triggered from the interval since last INFO message logged, and then another after the interval period.
1403+
assertLoggerExpectationsFor(
1404+
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 8),
1405+
getLogExpectation.apply(new LogExpectationData(false, "10ms", 8, 2, 2, "2ms")),
1406+
getLogExpectation.apply(new LogExpectationData(false, "15ms", 13, 2, 7, "7ms"))
1407+
);
1408+
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT);
1409+
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(2, 14, lastConvergenceTimestampMillis));
1410+
1411+
assertLoggerExpectationsFor(
1412+
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
1413+
getLogExpectation.apply(new LogExpectationData(false, "20ms", 17, 3, 3, "3ms")),
1414+
getLogExpectation.apply(new LogExpectationData(false, "25ms", 22, 3, 8, "8ms")),
1415+
getLogExpectation.apply(new LogExpectationData(true, "27ms", 24, 3, 10, "10ms"))
1416+
);
1417+
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);
1418+
1419+
// First INFO is triggered from interval since last converged, second is triggered from the inverval since the last INFO log.
1420+
assertLoggerExpectationsFor(
1421+
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
1422+
getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)),
1423+
getLogExpectation.apply(new LogExpectationData(false, "10ms", 9)),
1424+
getLogExpectation.apply(new LogExpectationData(true, "11ms", 10))
1425+
);
1426+
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);
1427+
1428+
// Verify the final assignment mappings after converging.
1429+
final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex();
1430+
final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0));
1431+
assertDesiredAssignments(desiredBalance.get(), expectedAssignmentsMap);
12851432
}
12861433

12871434
private static ShardId findShardId(ClusterState clusterState, String name) {
12881435
return clusterState.getRoutingTable().index(name).shard(0).shardId();
12891436
}
12901437

12911438
static ClusterState createInitialClusterState(int dataNodesCount) {
1439+
return createInitialClusterState(dataNodesCount, 2, 1);
1440+
}
1441+
1442+
static ClusterState createInitialClusterState(int dataNodesCount, int numShards, int numReplicas) {
12921443
var discoveryNodes = DiscoveryNodes.builder().add(newNode("master", Set.of(DiscoveryNodeRole.MASTER_ROLE)));
12931444
for (int i = 0; i < dataNodesCount; i++) {
12941445
discoveryNodes.add(newNode("node-" + i, Set.of(DiscoveryNodeRole.DATA_ROLE)));
12951446
}
12961447

1297-
var indexMetadata = IndexMetadata.builder(TEST_INDEX).settings(indexSettings(IndexVersion.current(), 2, 1)).build();
1448+
var indexMetadata = IndexMetadata.builder(TEST_INDEX)
1449+
.settings(indexSettings(IndexVersion.current(), numShards, numReplicas))
1450+
.build();
12981451

12991452
return ClusterState.builder(ClusterName.DEFAULT)
13001453
.nodes(discoveryNodes.masterNodeId("master").localNodeId("master"))

0 commit comments

Comments
 (0)