From 4c81fb05b5574dcbcd8f4a21911aa55c2272e263 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Mon, 31 Mar 2025 20:43:26 -0400 Subject: [PATCH 1/3] Accumulate compute() calls and iterations between convergences. Add tracking of the number of compute() calls and total iterations between convergences in the DesiredBalanceComputer, along with the time since the last convergence. These are included in the log message when the computer doesn't converge. Closes #100850. --- .../allocator/DesiredBalanceComputer.java | 62 ++++++-- .../DesiredBalanceComputerTests.java | 141 +++++++++++++++++- 2 files changed, 187 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 03630c284fa30..0851fd466dbe1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.time.TimeProvider; import org.elasticsearch.common.util.Maps; -import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -47,8 +46,9 @@ */ public class DesiredBalanceComputer { - private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class); + private static final Logger STATIC_LOGGER = LogManager.getLogger(DesiredBalanceComputer.class); + private final Logger logger; private final ShardsAllocator delegateAllocator; private final TimeProvider timeProvider; @@ -72,10 +72,24 @@ public class DesiredBalanceComputer { private TimeValue progressLogInterval; private long maxBalanceComputationTimeDuringIndexCreationMillis; + private long numComputeCallsSinceLastConverged; + private long numIterationsSinceLastConverged; + private long lastConvergedTimeMillis; + private long lastNotConvergedLogMessageTimeMillis; public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) { + this(clusterSettings, timeProvider, delegateAllocator, STATIC_LOGGER); + } + + // Package access for testing. + DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator, Logger logger) { + this.logger = logger; this.delegateAllocator = delegateAllocator; this.timeProvider = timeProvider; + this.numComputeCallsSinceLastConverged = 0; + this.numIterationsSinceLastConverged = 0; + this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis(); + this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis; clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value); clusterSettings.initializeAndWatch( MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING, @@ -83,12 +97,22 @@ public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider time ); } + public record LastConvergenceInfo(long numComputeCallsSince, long numIterationsSince, long timestampMillis) {} + + /** + * Returns the last convergence timestamp along with the total number of computation calls and iterations since the last convergence. + */ + public LastConvergenceInfo getLastConvergence() { + return new LastConvergenceInfo(numComputeCallsSinceLastConverged, numIterationsSinceLastConverged, lastConvergedTimeMillis); + } + public DesiredBalance compute( DesiredBalance previousDesiredBalance, DesiredBalanceInput desiredBalanceInput, Queue> pendingDesiredBalanceMoves, Predicate isFresh ) { + ++numComputeCallsSinceLastConverged; if (logger.isTraceEnabled()) { logger.trace( "Recomputing desired balance for [{}]: {}, {}, {}, {}", @@ -276,7 +300,7 @@ public DesiredBalance compute( final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation); final long timeWarningInterval = progressLogInterval.millis(); final long computationStartedTime = timeProvider.relativeTimeInMillis(); - long nextReportTime = computationStartedTime + timeWarningInterval; + long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval; int i = 0; boolean hasChanges = false; @@ -322,21 +346,29 @@ public DesiredBalance compute( } i++; + ++numIterationsSinceLastConverged; final int iterations = i; final long currentTime = timeProvider.relativeTimeInMillis(); final boolean reportByTime = nextReportTime <= currentTime; - final boolean reportByIterationCount = i % iterationCountReportInterval == 0; + final boolean reportByIterationCount = numIterationsSinceLastConverged % iterationCountReportInterval == 0; if (reportByTime || reportByIterationCount) { nextReportTime = currentTime + timeWarningInterval; } if (hasChanges == false && hasEnoughIterations(i)) { logger.debug( - "Desired balance computation for [{}] converged after [{}] and [{}] iterations", + "Desired balance computation for [{}] converged after [{}] and [{}] iterations, " + + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", desiredBalanceInput.index(), TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), - i + i, + numComputeCallsSinceLastConverged, + numIterationsSinceLastConverged, + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() ); + numComputeCallsSinceLastConverged = 0; + numIterationsSinceLastConverged = 0; + lastConvergedTimeMillis = currentTime; break; } if (isFresh.test(desiredBalanceInput) == false) { @@ -370,13 +402,19 @@ public DesiredBalance compute( logger.log( reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE, - () -> Strings.format( - "Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations", - desiredBalanceInput.index(), - TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), - iterations - ) + "Desired balance computation for [{}] is still not converged after [{}] and [{}] iterations, " + + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), + iterations, + numComputeCallsSinceLastConverged, + numIterationsSinceLastConverged, + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() ); + + if (reportByIterationCount || reportByTime) { + lastNotConvergedLogMessageTimeMillis = currentTime; + } } iterations.inc(i); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 80e2a1e237e02..2738ca4c54a84 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfo.NodeAndPath; import org.elasticsearch.cluster.ClusterInfo.NodeAndShard; @@ -43,11 +44,13 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.TimeProvider; import org.elasticsearch.common.time.TimeProviderUtils; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; @@ -69,6 +72,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; import static java.util.stream.Collectors.toMap; import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting; @@ -88,6 +93,9 @@ import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class DesiredBalanceComputerTests extends ESAllocationTestCase { @@ -1209,7 +1217,8 @@ public void testShouldLogComputationIteration() { "Should not report long computation too early", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [*] and [*] iterations" + "Desired balance computation for [*] is still not converged after [*] and [*] iterations, " + + "[1] compute() calls with [*] total iterations since last convergence [*] ago" ) ); @@ -1220,7 +1229,8 @@ public void testShouldLogComputationIteration() { "Should report long computation based on iteration count", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [10s] and [1000] iterations" + "Desired balance computation for [*] is still not converged after [10s] and [1000] iterations, " + + "[1] compute() calls with [1000] total iterations since last convergence [10s] ago" ) ); @@ -1231,7 +1241,8 @@ public void testShouldLogComputationIteration() { "Should report long computation based on time", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [1m] and [60] iterations" + "Desired balance computation for [*] is still not converged after [59s] and [59] iterations, " + + "[1] compute() calls with [59] total iterations since last convergence [1m] ago" ) ); } @@ -1284,17 +1295,139 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing }, DesiredBalanceComputer.class, expectation); } + public void testLoggingOfComputeCallsAndIterationsSinceConvergence() { + final var clusterSettings = new ClusterSettings( + Settings.builder().put(DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(5L)).build(), + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS + ); + final var timeInMillis = new AtomicLong(-1L); + final var computerLogger = mock(Logger.class); + final var iterationCounter = new AtomicInteger(0); + final var requiredIterations = new AtomicInteger(2); + final var computer = new DesiredBalanceComputer( + clusterSettings, + TimeProviderUtils.create(timeInMillis::incrementAndGet), + new BalancedShardsAllocator(Settings.EMPTY), + computerLogger + ) { + @Override + boolean hasEnoughIterations(int currentIteration) { + iterationCounter.incrementAndGet(); + return currentIteration >= requiredIterations.get(); + } + }; + + // No compute() calls yet, last convergence timestamp is the startup time. + final var startTimeMillis = timeInMillis.get(); + assertEquals(new DesiredBalanceComputer.LastConvergenceInfo(0L, 0L, startTimeMillis), computer.getLastConvergence()); + + var desiredBalance = DesiredBalance.BECOME_MASTER_INITIAL; + final AtomicLong indexSequence = new AtomicLong(0); + var clusterState = createInitialClusterState(1, 1, 0); + Supplier rebuildInput = () -> { + return new DesiredBalanceInput(indexSequence.incrementAndGet(), routingAllocationOf(clusterState), List.of()); + }; + + record ComputeLogMsgInput( + String currentDuration, + int currentIterations, + long totalComputeCalls, + long totalIterations, + String timeSinceConverged + ) {} + + Consumer verifyNoConvergence = data -> { + verify(computerLogger, times(1)).log( + Level.INFO, + "Desired balance computation for [{}] is still not converged after [{}] and [{}] iterations, " + + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", + indexSequence.get(), + data.currentDuration, + data.currentIterations, + data.totalComputeCalls, + data.totalIterations, + data.timeSinceConverged + ); + }; + Consumer verifyConvergence = data -> { + verify(computerLogger, times(1)).debug( + "Desired balance computation for [{}] converged after [{}] and [{}] iterations, " + + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", + indexSequence.get(), + data.currentDuration, + data.currentIterations, + data.totalComputeCalls, + data.totalIterations, + data.timeSinceConverged + ); + // Verify the number of compute() calls and total iterations have been reset after converging. + assertEquals(new DesiredBalanceComputer.LastConvergenceInfo(0L, 0L, timeInMillis.get()), computer.getLastConvergence()); + }; + + // Converges right away, verify the debug level convergence message. + desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> true); + assertEquals(DesiredBalance.ComputationFinishReason.CONVERGED, desiredBalance.finishReason()); + verifyConvergence.accept(new ComputeLogMsgInput("2ms", 2, 1L, 2L, "3ms")); + var lastConvergenceTimestampMillis = computer.getLastConvergence().timestampMillis(); + + // Test a series of compute() calls that don't converge. + iterationCounter.set(0); + requiredIterations.set(10); + desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> iterationCounter.get() < 6); + assertEquals(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT, desiredBalance.finishReason()); + // This INFO is triggered from the interval since last convergence timestamp. + verifyNoConvergence.accept(new ComputeLogMsgInput("4ms", 4, 1L, 4L, "5ms")); + assertEquals(new DesiredBalanceComputer.LastConvergenceInfo(1L, 6L, lastConvergenceTimestampMillis), computer.getLastConvergence()); + + iterationCounter.set(0); + desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> iterationCounter.get() < 8); + assertEquals(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT, desiredBalance.finishReason()); + // The next INFO is triggered from the interval since last INFO message logged. + verifyNoConvergence.accept(new ComputeLogMsgInput("2ms", 2, 2L, 8L, "10ms")); + // Followed by another log message after the interval. + verifyNoConvergence.accept(new ComputeLogMsgInput("7ms", 7, 2L, 13L, "15ms")); + assertEquals( + new DesiredBalanceComputer.LastConvergenceInfo(2L, 14L, lastConvergenceTimestampMillis), + computer.getLastConvergence() + ); + + desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> true); + verifyNoConvergence.accept(new ComputeLogMsgInput("3ms", 3, 3L, 17L, "20ms")); + verifyNoConvergence.accept(new ComputeLogMsgInput("8ms", 8, 3L, 22L, "25ms")); + verifyConvergence.accept(new ComputeLogMsgInput("10ms", 10, 3L, 24L, "27ms")); + assertEquals(DesiredBalance.ComputationFinishReason.CONVERGED, desiredBalance.finishReason()); + + desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> true); + // First INFO is triggered from interval since last converged. + verifyNoConvergence.accept(new ComputeLogMsgInput("4ms", 4, 1L, 4L, "5ms")); + // Second INFO is triggered from the interval since the last INFO log. + verifyNoConvergence.accept(new ComputeLogMsgInput("9ms", 9, 1L, 9L, "10ms")); + verifyConvergence.accept(new ComputeLogMsgInput("10ms", 10, 1L, 10L, "11ms")); + assertEquals(DesiredBalance.ComputationFinishReason.CONVERGED, desiredBalance.finishReason()); + + // Verify the final assignment mappings after converging. + final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex(); + final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0)); + assertDesiredAssignments(desiredBalance, expectedAssignmentsMap); + } + private static ShardId findShardId(ClusterState clusterState, String name) { return clusterState.getRoutingTable().index(name).shard(0).shardId(); } static ClusterState createInitialClusterState(int dataNodesCount) { + return createInitialClusterState(dataNodesCount, 2, 1); + } + + static ClusterState createInitialClusterState(int dataNodesCount, int numShards, int numReplicas) { var discoveryNodes = DiscoveryNodes.builder().add(newNode("master", Set.of(DiscoveryNodeRole.MASTER_ROLE))); for (int i = 0; i < dataNodesCount; i++) { discoveryNodes.add(newNode("node-" + i, Set.of(DiscoveryNodeRole.DATA_ROLE))); } - var indexMetadata = IndexMetadata.builder(TEST_INDEX).settings(indexSettings(IndexVersion.current(), 2, 1)).build(); + var indexMetadata = IndexMetadata.builder(TEST_INDEX) + .settings(indexSettings(IndexVersion.current(), numShards, numReplicas)) + .build(); return ClusterState.builder(ClusterName.DEFAULT) .nodes(discoveryNodes.masterNodeId("master").localNodeId("master")) From 4078955134c182c9e47cbe5b545dc465d7ffa7e9 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Mon, 31 Mar 2025 22:48:34 -0400 Subject: [PATCH 2/3] Add changelog file --- docs/changelog/126008.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/126008.yaml diff --git a/docs/changelog/126008.yaml b/docs/changelog/126008.yaml new file mode 100644 index 0000000000000..e905bde32e433 --- /dev/null +++ b/docs/changelog/126008.yaml @@ -0,0 +1,6 @@ +pr: 126008 +summary: Accumulate compute() calls and iterations between convergences +area: Allocation +type: enhancement +issues: + - 100850 From ffdea849b3d5d4f96b3a247b841c4380a7be4f5f Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 2 Apr 2025 11:03:24 -0400 Subject: [PATCH 3/3] Address code review comments --- .../allocator/DesiredBalanceComputer.java | 118 +++++++---- .../DesiredBalanceComputerTests.java | 192 ++++++++++-------- 2 files changed, 183 insertions(+), 127 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 0851fd466dbe1..a0e103edf97c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.time.TimeProvider; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -46,9 +47,8 @@ */ public class DesiredBalanceComputer { - private static final Logger STATIC_LOGGER = LogManager.getLogger(DesiredBalanceComputer.class); + private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class); - private final Logger logger; private final ShardsAllocator delegateAllocator; private final TimeProvider timeProvider; @@ -76,20 +76,16 @@ public class DesiredBalanceComputer { private long numIterationsSinceLastConverged; private long lastConvergedTimeMillis; private long lastNotConvergedLogMessageTimeMillis; + private Level convergenceLogMsgLevel; public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) { - this(clusterSettings, timeProvider, delegateAllocator, STATIC_LOGGER); - } - - // Package access for testing. - DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator, Logger logger) { - this.logger = logger; this.delegateAllocator = delegateAllocator; this.timeProvider = timeProvider; this.numComputeCallsSinceLastConverged = 0; this.numIterationsSinceLastConverged = 0; this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis(); this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis; + this.convergenceLogMsgLevel = Level.DEBUG; clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value); clusterSettings.initializeAndWatch( MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING, @@ -97,22 +93,13 @@ public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider time ); } - public record LastConvergenceInfo(long numComputeCallsSince, long numIterationsSince, long timestampMillis) {} - - /** - * Returns the last convergence timestamp along with the total number of computation calls and iterations since the last convergence. - */ - public LastConvergenceInfo getLastConvergence() { - return new LastConvergenceInfo(numComputeCallsSinceLastConverged, numIterationsSinceLastConverged, lastConvergedTimeMillis); - } - public DesiredBalance compute( DesiredBalance previousDesiredBalance, DesiredBalanceInput desiredBalanceInput, Queue> pendingDesiredBalanceMoves, Predicate isFresh ) { - ++numComputeCallsSinceLastConverged; + numComputeCallsSinceLastConverged += 1; if (logger.isTraceEnabled()) { logger.trace( "Recomputing desired balance for [{}]: {}, {}, {}, {}", @@ -345,8 +332,8 @@ public DesiredBalance compute( } } - i++; - ++numIterationsSinceLastConverged; + i += 1; + numIterationsSinceLastConverged += 1; final int iterations = i; final long currentTime = timeProvider.relativeTimeInMillis(); final boolean reportByTime = nextReportTime <= currentTime; @@ -356,16 +343,32 @@ public DesiredBalance compute( } if (hasChanges == false && hasEnoughIterations(i)) { - logger.debug( - "Desired balance computation for [{}] converged after [{}] and [{}] iterations, " - + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", - desiredBalanceInput.index(), - TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), - i, - numComputeCallsSinceLastConverged, - numIterationsSinceLastConverged, - TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() - ); + if (numComputeCallsSinceLastConverged > 1) { + logger.log( + convergenceLogMsgLevel, + () -> Strings.format( + """ + Desired balance computation for [%d] converged after [%s] and [%d] iterations, \ + resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged, + numComputeCallsSinceLastConverged, + iterations, + TimeValue.timeValueMillis(currentTime - computationStartedTime).toString() + ) + ); + } else { + logger.log( + convergenceLogMsgLevel, + () -> Strings.format( + "Desired balance computation for [%d] converged after [%s] and [%d] iterations", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged + ) + ); + } numComputeCallsSinceLastConverged = 0; numIterationsSinceLastConverged = 0; lastConvergedTimeMillis = currentTime; @@ -400,17 +403,33 @@ public DesiredBalance compute( break; } - logger.log( - reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE, - "Desired balance computation for [{}] is still not converged after [{}] and [{}] iterations, " - + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", - desiredBalanceInput.index(), - TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), - iterations, - numComputeCallsSinceLastConverged, - numIterationsSinceLastConverged, - TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() - ); + final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE; + if (numComputeCallsSinceLastConverged > 1) { + logger.log( + logLevel, + () -> Strings.format( + """ + Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \ + resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged, + numComputeCallsSinceLastConverged, + iterations, + TimeValue.timeValueMillis(currentTime - computationStartedTime).toString() + ) + ); + } else { + logger.log( + logLevel, + () -> Strings.format( + "Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations", + desiredBalanceInput.index(), + TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), + numIterationsSinceLastConverged + ) + ); + } if (reportByIterationCount || reportByTime) { lastNotConvergedLogMessageTimeMillis = currentTime; @@ -509,4 +528,21 @@ private static int computeIterationCountReportInterval(RoutingAllocation allocat } return iterations; } + + // Package-level getters for testing. + long getNumComputeCallsSinceLastConverged() { + return numComputeCallsSinceLastConverged; + } + + long getNumIterationsSinceLastConverged() { + return numIterationsSinceLastConverged; + } + + long getLastConvergedTimeMillis() { + return lastConvergedTimeMillis; + } + + void setConvergenceLogMsgLevel(Level level) { + convergenceLogMsgLevel = level; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 2738ca4c54a84..6e496b85ede97 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfo.NodeAndPath; import org.elasticsearch.cluster.ClusterInfo.NodeAndShard; @@ -50,6 +49,7 @@ import org.elasticsearch.common.time.TimeProviderUtils; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; @@ -72,8 +72,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.function.Function; +import java.util.function.Predicate; import static java.util.stream.Collectors.toMap; import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting; @@ -93,9 +95,6 @@ import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; public class DesiredBalanceComputerTests extends ESAllocationTestCase { @@ -1217,8 +1216,7 @@ public void testShouldLogComputationIteration() { "Should not report long computation too early", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [*] and [*] iterations, " - + "[1] compute() calls with [*] total iterations since last convergence [*] ago" + "Desired balance computation for [*] is still not converged after [*] and [*] iterations" ) ); @@ -1229,8 +1227,7 @@ public void testShouldLogComputationIteration() { "Should report long computation based on iteration count", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [10s] and [1000] iterations, " - + "[1] compute() calls with [1000] total iterations since last convergence [10s] ago" + "Desired balance computation for [*] is still not converged after [10s] and [1000] iterations" ) ); @@ -1241,8 +1238,7 @@ public void testShouldLogComputationIteration() { "Should report long computation based on time", DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [*] is still not converged after [59s] and [59] iterations, " - + "[1] compute() calls with [59] total iterations since last convergence [1m] ago" + "Desired balance computation for [*] is still not converged after [1m] and [59] iterations" ) ); } @@ -1284,7 +1280,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }); - assertThatLogger(() -> { + assertLoggerExpectationsFor(() -> { var iteration = new AtomicInteger(0); desiredBalanceComputer.compute( DesiredBalance.BECOME_MASTER_INITIAL, @@ -1292,7 +1288,11 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing queue(), input -> iteration.incrementAndGet() < iterations ); - }, DesiredBalanceComputer.class, expectation); + }, expectation); + } + + private void assertLoggerExpectationsFor(Runnable action, MockLog.LoggingExpectation... expectations) { + assertThatLogger(action, DesiredBalanceComputer.class, expectations); } public void testLoggingOfComputeCallsAndIterationsSinceConvergence() { @@ -1301,14 +1301,16 @@ public void testLoggingOfComputeCallsAndIterationsSinceConvergence() { ClusterSettings.BUILT_IN_CLUSTER_SETTINGS ); final var timeInMillis = new AtomicLong(-1L); - final var computerLogger = mock(Logger.class); final var iterationCounter = new AtomicInteger(0); final var requiredIterations = new AtomicInteger(2); + final var desiredBalance = new AtomicReference(DesiredBalance.BECOME_MASTER_INITIAL); + final var indexSequence = new AtomicLong(0); + final var clusterState = createInitialClusterState(1, 1, 0); + final var computer = new DesiredBalanceComputer( clusterSettings, TimeProviderUtils.create(timeInMillis::incrementAndGet), - new BalancedShardsAllocator(Settings.EMPTY), - computerLogger + new BalancedShardsAllocator(Settings.EMPTY) ) { @Override boolean hasEnoughIterations(int currentIteration) { @@ -1316,99 +1318,117 @@ boolean hasEnoughIterations(int currentIteration) { return currentIteration >= requiredIterations.get(); } }; + computer.setConvergenceLogMsgLevel(Level.INFO); - // No compute() calls yet, last convergence timestamp is the startup time. - final var startTimeMillis = timeInMillis.get(); - assertEquals(new DesiredBalanceComputer.LastConvergenceInfo(0L, 0L, startTimeMillis), computer.getLastConvergence()); - - var desiredBalance = DesiredBalance.BECOME_MASTER_INITIAL; - final AtomicLong indexSequence = new AtomicLong(0); - var clusterState = createInitialClusterState(1, 1, 0); - Supplier rebuildInput = () -> { - return new DesiredBalanceInput(indexSequence.incrementAndGet(), routingAllocationOf(clusterState), List.of()); + record ExpectedLastConvergenceInfo(int numComputeCalls, int numTotalIterations, long timestampMillis) {} + + Consumer assertLastConvergenceInfo = data -> { + assertEquals(data.numComputeCalls(), computer.getNumComputeCallsSinceLastConverged()); + assertEquals(data.numTotalIterations(), computer.getNumIterationsSinceLastConverged()); + assertEquals(data.timestampMillis(), computer.getLastConvergedTimeMillis()); + }; + + final Function, Runnable> getComputeRunnableForIsFreshPredicate = isFreshFunc -> { + final var input = new DesiredBalanceInput(indexSequence.incrementAndGet(), routingAllocationOf(clusterState), List.of()); + return () -> desiredBalance.set(computer.compute(desiredBalance.get(), input, queue(), isFreshFunc)); }; - record ComputeLogMsgInput( - String currentDuration, + record LogExpectationData( + boolean isConverged, + String timeSinceConverged, + int totalIterations, + int totalComputeCalls, int currentIterations, - long totalComputeCalls, - long totalIterations, - String timeSinceConverged - ) {} + String currentDuration + ) { + LogExpectationData(boolean isConverged, String timeSinceConverged, int totalIterations) { + this(isConverged, timeSinceConverged, totalIterations, 0, 0, ""); + } + } - Consumer verifyNoConvergence = data -> { - verify(computerLogger, times(1)).log( + Function getLogExpectation = data -> { + final var singleComputeCallMsg = "Desired balance computation for [%d] " + + (data.isConverged ? "" : "is still not ") + + "converged after [%s] and [%d] iterations"; + return new MockLog.SeenEventExpectation( + "expected a " + (data.isConverged ? "converged" : "not converged") + " log message", + DesiredBalanceComputer.class.getCanonicalName(), Level.INFO, - "Desired balance computation for [{}] is still not converged after [{}] and [{}] iterations, " - + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", - indexSequence.get(), - data.currentDuration, - data.currentIterations, - data.totalComputeCalls, - data.totalIterations, - data.timeSinceConverged + (data.totalComputeCalls > 1 + ? Strings.format( + singleComputeCallMsg + ", resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago", + indexSequence.get(), + data.timeSinceConverged, + data.totalIterations, + data.totalComputeCalls, + data.currentIterations, + data.currentDuration + ) + : Strings.format(singleComputeCallMsg, indexSequence.get(), data.timeSinceConverged, data.totalIterations)) ); }; - Consumer verifyConvergence = data -> { - verify(computerLogger, times(1)).debug( - "Desired balance computation for [{}] converged after [{}] and [{}] iterations, " - + "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago", - indexSequence.get(), - data.currentDuration, - data.currentIterations, - data.totalComputeCalls, - data.totalIterations, - data.timeSinceConverged - ); - // Verify the number of compute() calls and total iterations have been reset after converging. - assertEquals(new DesiredBalanceComputer.LastConvergenceInfo(0L, 0L, timeInMillis.get()), computer.getLastConvergence()); + + final Consumer assertFinishReason = reason -> { + assertEquals(reason, desiredBalance.get().finishReason()); + if (DesiredBalance.ComputationFinishReason.CONVERGED == reason) { + // Verify the number of compute() calls and total iterations have been reset after converging. + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get())); + } }; + // No compute() calls yet, last convergence timestamp is the startup time. + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get())); + // Converges right away, verify the debug level convergence message. - desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> true); - assertEquals(DesiredBalance.ComputationFinishReason.CONVERGED, desiredBalance.finishReason()); - verifyConvergence.accept(new ComputeLogMsgInput("2ms", 2, 1L, 2L, "3ms")); - var lastConvergenceTimestampMillis = computer.getLastConvergence().timestampMillis(); + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> true), + getLogExpectation.apply(new LogExpectationData(true, "3ms", 2)) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); + final var lastConvergenceTimestampMillis = computer.getLastConvergedTimeMillis(); // Test a series of compute() calls that don't converge. iterationCounter.set(0); requiredIterations.set(10); - desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> iterationCounter.get() < 6); - assertEquals(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT, desiredBalance.finishReason()); // This INFO is triggered from the interval since last convergence timestamp. - verifyNoConvergence.accept(new ComputeLogMsgInput("4ms", 4, 1L, 4L, "5ms")); - assertEquals(new DesiredBalanceComputer.LastConvergenceInfo(1L, 6L, lastConvergenceTimestampMillis), computer.getLastConvergence()); + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 6), + getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT); + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(1, 6, lastConvergenceTimestampMillis)); iterationCounter.set(0); - desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> iterationCounter.get() < 8); - assertEquals(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT, desiredBalance.finishReason()); - // The next INFO is triggered from the interval since last INFO message logged. - verifyNoConvergence.accept(new ComputeLogMsgInput("2ms", 2, 2L, 8L, "10ms")); - // Followed by another log message after the interval. - verifyNoConvergence.accept(new ComputeLogMsgInput("7ms", 7, 2L, 13L, "15ms")); - assertEquals( - new DesiredBalanceComputer.LastConvergenceInfo(2L, 14L, lastConvergenceTimestampMillis), - computer.getLastConvergence() + // The next INFO is triggered from the interval since last INFO message logged, and then another after the interval period. + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 8), + getLogExpectation.apply(new LogExpectationData(false, "10ms", 8, 2, 2, "2ms")), + getLogExpectation.apply(new LogExpectationData(false, "15ms", 13, 2, 7, "7ms")) ); - - desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> true); - verifyNoConvergence.accept(new ComputeLogMsgInput("3ms", 3, 3L, 17L, "20ms")); - verifyNoConvergence.accept(new ComputeLogMsgInput("8ms", 8, 3L, 22L, "25ms")); - verifyConvergence.accept(new ComputeLogMsgInput("10ms", 10, 3L, 24L, "27ms")); - assertEquals(DesiredBalance.ComputationFinishReason.CONVERGED, desiredBalance.finishReason()); - - desiredBalance = computer.compute(desiredBalance, rebuildInput.get(), queue(), ignored -> true); - // First INFO is triggered from interval since last converged. - verifyNoConvergence.accept(new ComputeLogMsgInput("4ms", 4, 1L, 4L, "5ms")); - // Second INFO is triggered from the interval since the last INFO log. - verifyNoConvergence.accept(new ComputeLogMsgInput("9ms", 9, 1L, 9L, "10ms")); - verifyConvergence.accept(new ComputeLogMsgInput("10ms", 10, 1L, 10L, "11ms")); - assertEquals(DesiredBalance.ComputationFinishReason.CONVERGED, desiredBalance.finishReason()); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT); + assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(2, 14, lastConvergenceTimestampMillis)); + + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> true), + getLogExpectation.apply(new LogExpectationData(false, "20ms", 17, 3, 3, "3ms")), + getLogExpectation.apply(new LogExpectationData(false, "25ms", 22, 3, 8, "8ms")), + getLogExpectation.apply(new LogExpectationData(true, "27ms", 24, 3, 10, "10ms")) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); + + // First INFO is triggered from interval since last converged, second is triggered from the inverval since the last INFO log. + assertLoggerExpectationsFor( + getComputeRunnableForIsFreshPredicate.apply(ignored -> true), + getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)), + getLogExpectation.apply(new LogExpectationData(false, "10ms", 9)), + getLogExpectation.apply(new LogExpectationData(true, "11ms", 10)) + ); + assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); // Verify the final assignment mappings after converging. final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex(); final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0)); - assertDesiredAssignments(desiredBalance, expectedAssignmentsMap); + assertDesiredAssignments(desiredBalance.get(), expectedAssignmentsMap); } private static ShardId findShardId(ClusterState clusterState, String name) {