diff --git a/docs/changelog/126008.yaml b/docs/changelog/126008.yaml new file mode 100644 index 0000000000000..e905bde32e433 --- /dev/null +++ b/docs/changelog/126008.yaml @@ -0,0 +1,6 @@ +pr: 126008 +summary: Accumulate compute() calls and iterations between convergences +area: Allocation +type: enhancement +issues: + - 100850 diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 03630c284fa30..a0e103edf97c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -72,10 +72,20 @@ public class DesiredBalanceComputer { private TimeValue progressLogInterval; private long maxBalanceComputationTimeDuringIndexCreationMillis; + private long numComputeCallsSinceLastConverged; + private long numIterationsSinceLastConverged; + private long lastConvergedTimeMillis; + private long lastNotConvergedLogMessageTimeMillis; + private Level convergenceLogMsgLevel; public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) { this.delegateAllocator = delegateAllocator; this.timeProvider = timeProvider; + this.numComputeCallsSinceLastConverged = 0; + this.numIterationsSinceLastConverged = 0; + this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis(); + this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis; + this.convergenceLogMsgLevel = Level.DEBUG; clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value); clusterSettings.initializeAndWatch( MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING, @@ -89,6 +99,7 @@ public DesiredBalance compute( Queue> pendingDesiredBalanceMoves, Predicate isFresh ) { + numComputeCallsSinceLastConverged += 1; if (logger.isTraceEnabled()) { logger.trace( "Recomputing desired balance for [{}]: {}, {}, {}, {}", @@ -276,7 +287,7 @@ public DesiredBalance compute( final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation); final long timeWarningInterval = progressLogInterval.millis(); final long computationStartedTime = timeProvider.relativeTimeInMillis(); - long nextReportTime = computationStartedTime + timeWarningInterval; + long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval; int i = 0; boolean hasChanges = false; @@ -321,22 +332,46 @@ public DesiredBalance compute( } } - i++; + i += 1; + numIterationsSinceLastConverged += 1; final int iterations = i; final long currentTime = timeProvider.relativeTimeInMillis(); final boolean reportByTime = nextReportTime <= currentTime; - final boolean reportByIterationCount = i % iterationCountReportInterval == 0; + final boolean reportByIterationCount = numIterationsSinceLastConverged % iterationCountReportInterval == 0; if (reportByTime || reportByIterationCount) { nextReportTime = currentTime + timeWarningInterval; } if (hasChanges == false && hasEnoughIterations(i)) { - logger.debug( - "Desired balance computation for [{}] converged after [{}] and [{}] iterations", - desiredBalanceInput.index(), - TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), - i - ); + if (numComputeCallsSinceLastConverged > 1) { + logger.log( + convergenceLogMsgLevel, + () -> Strings.format( + """ + Desired balance computation for [%d] converged after [%s] and [%d] iterations, \ + resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged, + numComputeCallsSinceLastConverged, + iterations, + TimeValue.timeValueMillis(currentTime - computationStartedTime).toString() + ) + ); + } else { + logger.log( + convergenceLogMsgLevel, + () -> Strings.format( + "Desired balance computation for [%d] converged after [%s] and [%d] iterations", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged + ) + ); + } + numComputeCallsSinceLastConverged = 0; + numIterationsSinceLastConverged = 0; + lastConvergedTimeMillis = currentTime; break; } if (isFresh.test(desiredBalanceInput) == false) { @@ -368,15 +403,37 @@ public DesiredBalance compute( break; } - logger.log( - reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE, - () -> Strings.format( - "Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations", - desiredBalanceInput.index(), - TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), - iterations - ) - ); + final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE; + if (numComputeCallsSinceLastConverged > 1) { + logger.log( + logLevel, + () -> Strings.format( + """ + Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \ + resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged, + numComputeCallsSinceLastConverged, + iterations, + TimeValue.timeValueMillis(currentTime - computationStartedTime).toString() + ) + ); + } else { + logger.log( + logLevel, + () -> Strings.format( + "Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged + ) + ); + } + + if (reportByIterationCount || reportByTime) { + lastNotConvergedLogMessageTimeMillis = currentTime; + } } iterations.inc(i); @@ -471,4 +528,21 @@ private static int computeIterationCountReportInterval(RoutingAllocation allocat } return iterations; } + + // Package-level getters for testing. + long getNumComputeCallsSinceLastConverged() { + return numComputeCallsSinceLastConverged; + } + + long getNumIterationsSinceLastConverged() { + return numIterationsSinceLastConverged; + } + + long getLastConvergedTimeMillis() { + return lastConvergedTimeMillis; + } + + void setConvergenceLogMsgLevel(Level level) { + convergenceLogMsgLevel = level; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 80e2a1e237e02..6e496b85ede97 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -43,11 +43,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.TimeProvider; import org.elasticsearch.common.time.TimeProviderUtils; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Strings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; @@ -69,6 +72,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; import static java.util.stream.Collectors.toMap; import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting; @@ -1231,7 +1238,7 @@ public void testShouldLogComputationIteration() { "Should report long computation based on time", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [1m] and [60] iterations" + "Desired balance computation for [*] is still not converged after [1m] and [59] iterations" ) ); } @@ -1273,7 +1280,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }); - assertThatLogger(() -> { + assertLoggerExpectationsFor(() -> { var iteration = new AtomicInteger(0); desiredBalanceComputer.compute( DesiredBalance.BECOME_MASTER_INITIAL, @@ -1281,7 +1288,147 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing queue(), input -> iteration.incrementAndGet() < iterations ); - }, DesiredBalanceComputer.class, expectation); + }, expectation); + } + + private void assertLoggerExpectationsFor(Runnable action, MockLog.LoggingExpectation... expectations) { + assertThatLogger(action, DesiredBalanceComputer.class, expectations); + } + + public void testLoggingOfComputeCallsAndIterationsSinceConvergence() { + final var clusterSettings = new ClusterSettings( + Settings.builder().put(DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(5L)).build(), + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS + ); + final var timeInMillis = new AtomicLong(-1L); + final var iterationCounter = new AtomicInteger(0); + final var requiredIterations = new AtomicInteger(2); + final var desiredBalance = new AtomicReference(DesiredBalance.BECOME_MASTER_INITIAL); + final var indexSequence = new AtomicLong(0); + final var clusterState = createInitialClusterState(1, 1, 0); + + final var computer = new DesiredBalanceComputer( + clusterSettings, + TimeProviderUtils.create(timeInMillis::incrementAndGet), + new BalancedShardsAllocator(Settings.EMPTY) + ) { + @Override + boolean hasEnoughIterations(int currentIteration) { + iterationCounter.incrementAndGet(); + return currentIteration >= requiredIterations.get(); + } + }; + computer.setConvergenceLogMsgLevel(Level.INFO); + + record ExpectedLastConvergenceInfo(int numComputeCalls, int numTotalIterations, long timestampMillis) {} + + Consumer assertLastConvergenceInfo = data -> { + assertEquals(data.numComputeCalls(), computer.getNumComputeCallsSinceLastConverged()); + assertEquals(data.numTotalIterations(), computer.getNumIterationsSinceLastConverged()); + assertEquals(data.timestampMillis(), computer.getLastConvergedTimeMillis()); + }; + + final Function, Runnable> getComputeRunnableForIsFreshPredicate = isFreshFunc -> { + final var input = new DesiredBalanceInput(indexSequence.incrementAndGet(), routingAllocationOf(clusterState), List.of()); + return () -> desiredBalance.set(computer.compute(desiredBalance.get(), input, queue(), isFreshFunc)); + }; + + record LogExpectationData( + boolean isConverged, + String timeSinceConverged, + int totalIterations, + int totalComputeCalls, + int currentIterations, + String currentDuration + ) { + LogExpectationData(boolean isConverged, String timeSinceConverged, int totalIterations) { + this(isConverged, timeSinceConverged, totalIterations, 0, 0, ""); + } + } + + Function getLogExpectation = data -> { + final var singleComputeCallMsg = "Desired balance computation for [%d] " + + (data.isConverged ? "" : "is still not ") + + "converged after [%s] and [%d] iterations"; + return new MockLog.SeenEventExpectation( + "expected a " + (data.isConverged ? "converged" : "not converged") + " log message", + DesiredBalanceComputer.class.getCanonicalName(), + Level.INFO, + (data.totalComputeCalls > 1 + ? Strings.format( + singleComputeCallMsg + ", resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago", + indexSequence.get(), + data.timeSinceConverged, + data.totalIterations, + data.totalComputeCalls, + data.currentIterations, + data.currentDuration + ) + : Strings.format(singleComputeCallMsg, indexSequence.get(), data.timeSinceConverged, data.totalIterations)) + ); + }; + + final Consumer assertFinishReason = reason -> { + assertEquals(reason, desiredBalance.get().finishReason()); + if (DesiredBalance.ComputationFinishReason.CONVERGED == reason) { + // Verify the number of compute() calls and total iterations have been reset after converging. + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get())); + } + }; + + // No compute() calls yet, last convergence timestamp is the startup time. + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get())); + + // Converges right away, verify the debug level convergence message. + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> true), + getLogExpectation.apply(new LogExpectationData(true, "3ms", 2)) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); + final var lastConvergenceTimestampMillis = computer.getLastConvergedTimeMillis(); + + // Test a series of compute() calls that don't converge. + iterationCounter.set(0); + requiredIterations.set(10); + // This INFO is triggered from the interval since last convergence timestamp. + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 6), + getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT); + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(1, 6, lastConvergenceTimestampMillis)); + + iterationCounter.set(0); + // The next INFO is triggered from the interval since last INFO message logged, and then another after the interval period. + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 8), + getLogExpectation.apply(new LogExpectationData(false, "10ms", 8, 2, 2, "2ms")), + getLogExpectation.apply(new LogExpectationData(false, "15ms", 13, 2, 7, "7ms")) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT); + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(2, 14, lastConvergenceTimestampMillis)); + + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> true), + getLogExpectation.apply(new LogExpectationData(false, "20ms", 17, 3, 3, "3ms")), + getLogExpectation.apply(new LogExpectationData(false, "25ms", 22, 3, 8, "8ms")), + getLogExpectation.apply(new LogExpectationData(true, "27ms", 24, 3, 10, "10ms")) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); + + // First INFO is triggered from interval since last converged, second is triggered from the inverval since the last INFO log. + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> true), + getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)), + getLogExpectation.apply(new LogExpectationData(false, "10ms", 9)), + getLogExpectation.apply(new LogExpectationData(true, "11ms", 10)) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); + + // Verify the final assignment mappings after converging. + final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex(); + final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0)); + assertDesiredAssignments(desiredBalance.get(), expectedAssignmentsMap); } private static ShardId findShardId(ClusterState clusterState, String name) { @@ -1289,12 +1436,18 @@ private static ShardId findShardId(ClusterState clusterState, String name) { } static ClusterState createInitialClusterState(int dataNodesCount) { + return createInitialClusterState(dataNodesCount, 2, 1); + } + + static ClusterState createInitialClusterState(int dataNodesCount, int numShards, int numReplicas) { var discoveryNodes = DiscoveryNodes.builder().add(newNode("master", Set.of(DiscoveryNodeRole.MASTER_ROLE))); for (int i = 0; i < dataNodesCount; i++) { discoveryNodes.add(newNode("node-" + i, Set.of(DiscoveryNodeRole.DATA_ROLE))); } - var indexMetadata = IndexMetadata.builder(TEST_INDEX).settings(indexSettings(IndexVersion.current(), 2, 1)).build(); + var indexMetadata = IndexMetadata.builder(TEST_INDEX) + .settings(indexSettings(IndexVersion.current(), numShards, numReplicas)) + .build(); return ClusterState.builder(ClusterName.DEFAULT) .nodes(discoveryNodes.masterNodeId("master").localNodeId("master"))