Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/126008.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 126008
summary: Accumulate compute() calls and iterations between convergences
area: Allocation
type: enhancement
issues:
- 100850
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,20 @@ public class DesiredBalanceComputer {

private TimeValue progressLogInterval;
private long maxBalanceComputationTimeDuringIndexCreationMillis;
private long numComputeCallsSinceLastConverged;
private long numIterationsSinceLastConverged;
private long lastConvergedTimeMillis;
private long lastNotConvergedLogMessageTimeMillis;
private Level convergenceLogMsgLevel;

public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
this.delegateAllocator = delegateAllocator;
this.timeProvider = timeProvider;
this.numComputeCallsSinceLastConverged = 0;
this.numIterationsSinceLastConverged = 0;
this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis();
this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis;
this.convergenceLogMsgLevel = Level.DEBUG;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
Expand All @@ -89,6 +99,7 @@ public DesiredBalance compute(
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
Predicate<DesiredBalanceInput> isFresh
) {
numComputeCallsSinceLastConverged += 1;
if (logger.isTraceEnabled()) {
logger.trace(
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
Expand Down Expand Up @@ -276,7 +287,7 @@ public DesiredBalance compute(
final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = timeProvider.relativeTimeInMillis();
long nextReportTime = computationStartedTime + timeWarningInterval;
long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval;

int i = 0;
boolean hasChanges = false;
Expand Down Expand Up @@ -321,22 +332,46 @@ public DesiredBalance compute(
}
}

i++;
i += 1;
numIterationsSinceLastConverged += 1;
final int iterations = i;
final long currentTime = timeProvider.relativeTimeInMillis();
final boolean reportByTime = nextReportTime <= currentTime;
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
final boolean reportByIterationCount = numIterationsSinceLastConverged % iterationCountReportInterval == 0;
if (reportByTime || reportByIterationCount) {
nextReportTime = currentTime + timeWarningInterval;
}

if (hasChanges == false && hasEnoughIterations(i)) {
logger.debug(
"Desired balance computation for [{}] converged after [{}] and [{}] iterations",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i
);
if (numComputeCallsSinceLastConverged > 1) {
logger.log(
convergenceLogMsgLevel,
() -> Strings.format(
"""
Desired balance computation for [%d] converged after [%s] and [%d] iterations, \
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged,
numComputeCallsSinceLastConverged,
iterations,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
)
);
} else {
logger.log(
convergenceLogMsgLevel,
() -> Strings.format(
"Desired balance computation for [%d] converged after [%s] and [%d] iterations",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged
)
);
}
numComputeCallsSinceLastConverged = 0;
numIterationsSinceLastConverged = 0;
lastConvergedTimeMillis = currentTime;
break;
}
if (isFresh.test(desiredBalanceInput) == false) {
Expand Down Expand Up @@ -368,15 +403,37 @@ public DesiredBalance compute(
break;
}

logger.log(
reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE,
() -> Strings.format(
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
iterations
)
);
final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE;
if (numComputeCallsSinceLastConverged > 1) {
logger.log(
logLevel,
() -> Strings.format(
"""
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged,
numComputeCallsSinceLastConverged,
iterations,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
)
);
} else {
logger.log(
logLevel,
() -> Strings.format(
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged
)
);
}

if (reportByIterationCount || reportByTime) {
lastNotConvergedLogMessageTimeMillis = currentTime;
}
}
iterations.inc(i);

Expand Down Expand Up @@ -471,4 +528,21 @@ private static int computeIterationCountReportInterval(RoutingAllocation allocat
}
return iterations;
}

// Package-level getters for testing.
long getNumComputeCallsSinceLastConverged() {
return numComputeCallsSinceLastConverged;
}

long getNumIterationsSinceLastConverged() {
return numIterationsSinceLastConverged;
}

long getLastConvergedTimeMillis() {
return lastConvergedTimeMillis;
}

void setConvergenceLogMsgLevel(Level level) {
convergenceLogMsgLevel = level;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.time.TimeProviderUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
Expand All @@ -69,6 +72,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting;
Expand Down Expand Up @@ -1231,7 +1238,7 @@ public void testShouldLogComputationIteration() {
"Should report long computation based on time",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
"Desired balance computation for [*] is still not converged after [1m] and [60] iterations"
"Desired balance computation for [*] is still not converged after [1m] and [59] iterations"
)
);
}
Expand Down Expand Up @@ -1273,28 +1280,174 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}
});

assertThatLogger(() -> {
assertLoggerExpectationsFor(() -> {
var iteration = new AtomicInteger(0);
desiredBalanceComputer.compute(
DesiredBalance.BECOME_MASTER_INITIAL,
createInput(createInitialClusterState(3)),
queue(),
input -> iteration.incrementAndGet() < iterations
);
}, DesiredBalanceComputer.class, expectation);
}, expectation);
}

private void assertLoggerExpectationsFor(Runnable action, MockLog.LoggingExpectation... expectations) {
assertThatLogger(action, DesiredBalanceComputer.class, expectations);
}

public void testLoggingOfComputeCallsAndIterationsSinceConvergence() {
final var clusterSettings = new ClusterSettings(
Settings.builder().put(DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(5L)).build(),
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
);
final var timeInMillis = new AtomicLong(-1L);
final var iterationCounter = new AtomicInteger(0);
final var requiredIterations = new AtomicInteger(2);
final var desiredBalance = new AtomicReference<DesiredBalance>(DesiredBalance.BECOME_MASTER_INITIAL);
final var indexSequence = new AtomicLong(0);
final var clusterState = createInitialClusterState(1, 1, 0);

final var computer = new DesiredBalanceComputer(
clusterSettings,
TimeProviderUtils.create(timeInMillis::incrementAndGet),
new BalancedShardsAllocator(Settings.EMPTY)
) {
@Override
boolean hasEnoughIterations(int currentIteration) {
iterationCounter.incrementAndGet();
return currentIteration >= requiredIterations.get();
}
};
computer.setConvergenceLogMsgLevel(Level.INFO);

record ExpectedLastConvergenceInfo(int numComputeCalls, int numTotalIterations, long timestampMillis) {}

Consumer<ExpectedLastConvergenceInfo> assertLastConvergenceInfo = data -> {
assertEquals(data.numComputeCalls(), computer.getNumComputeCallsSinceLastConverged());
assertEquals(data.numTotalIterations(), computer.getNumIterationsSinceLastConverged());
assertEquals(data.timestampMillis(), computer.getLastConvergedTimeMillis());
};

final Function<Predicate<DesiredBalanceInput>, Runnable> getComputeRunnableForIsFreshPredicate = isFreshFunc -> {
final var input = new DesiredBalanceInput(indexSequence.incrementAndGet(), routingAllocationOf(clusterState), List.of());
return () -> desiredBalance.set(computer.compute(desiredBalance.get(), input, queue(), isFreshFunc));
};

record LogExpectationData(
boolean isConverged,
String timeSinceConverged,
int totalIterations,
int totalComputeCalls,
int currentIterations,
String currentDuration
) {
LogExpectationData(boolean isConverged, String timeSinceConverged, int totalIterations) {
this(isConverged, timeSinceConverged, totalIterations, 0, 0, "");
}
}

Function<LogExpectationData, MockLog.SeenEventExpectation> getLogExpectation = data -> {
final var singleComputeCallMsg = "Desired balance computation for [%d] "
+ (data.isConverged ? "" : "is still not ")
+ "converged after [%s] and [%d] iterations";
return new MockLog.SeenEventExpectation(
"expected a " + (data.isConverged ? "converged" : "not converged") + " log message",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
(data.totalComputeCalls > 1
? Strings.format(
singleComputeCallMsg + ", resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago",
indexSequence.get(),
data.timeSinceConverged,
data.totalIterations,
data.totalComputeCalls,
data.currentIterations,
data.currentDuration
)
: Strings.format(singleComputeCallMsg, indexSequence.get(), data.timeSinceConverged, data.totalIterations))
);
};

final Consumer<DesiredBalance.ComputationFinishReason> assertFinishReason = reason -> {
assertEquals(reason, desiredBalance.get().finishReason());
if (DesiredBalance.ComputationFinishReason.CONVERGED == reason) {
// Verify the number of compute() calls and total iterations have been reset after converging.
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get()));
}
};

// No compute() calls yet, last convergence timestamp is the startup time.
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(0, 0, timeInMillis.get()));

// Converges right away, verify the debug level convergence message.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(true, "3ms", 2))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);
final var lastConvergenceTimestampMillis = computer.getLastConvergedTimeMillis();

// Test a series of compute() calls that don't converge.
iterationCounter.set(0);
requiredIterations.set(10);
// This INFO is triggered from the interval since last convergence timestamp.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 6),
getLogExpectation.apply(new LogExpectationData(false, "5ms", 4))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT);
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(1, 6, lastConvergenceTimestampMillis));

iterationCounter.set(0);
// The next INFO is triggered from the interval since last INFO message logged, and then another after the interval period.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 8),
getLogExpectation.apply(new LogExpectationData(false, "10ms", 8, 2, 2, "2ms")),
getLogExpectation.apply(new LogExpectationData(false, "15ms", 13, 2, 7, "7ms"))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT);
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(2, 14, lastConvergenceTimestampMillis));

assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(false, "20ms", 17, 3, 3, "3ms")),
getLogExpectation.apply(new LogExpectationData(false, "25ms", 22, 3, 8, "8ms")),
getLogExpectation.apply(new LogExpectationData(true, "27ms", 24, 3, 10, "10ms"))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);

// First INFO is triggered from interval since last converged, second is triggered from the inverval since the last INFO log.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)),
getLogExpectation.apply(new LogExpectationData(false, "10ms", 9)),
getLogExpectation.apply(new LogExpectationData(true, "11ms", 10))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);

// Verify the final assignment mappings after converging.
final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex();
final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0));
assertDesiredAssignments(desiredBalance.get(), expectedAssignmentsMap);
}

private static ShardId findShardId(ClusterState clusterState, String name) {
return clusterState.getRoutingTable().index(name).shard(0).shardId();
}

static ClusterState createInitialClusterState(int dataNodesCount) {
return createInitialClusterState(dataNodesCount, 2, 1);
}

static ClusterState createInitialClusterState(int dataNodesCount, int numShards, int numReplicas) {
var discoveryNodes = DiscoveryNodes.builder().add(newNode("master", Set.of(DiscoveryNodeRole.MASTER_ROLE)));
for (int i = 0; i < dataNodesCount; i++) {
discoveryNodes.add(newNode("node-" + i, Set.of(DiscoveryNodeRole.DATA_ROLE)));
}

var indexMetadata = IndexMetadata.builder(TEST_INDEX).settings(indexSettings(IndexVersion.current(), 2, 1)).build();
var indexMetadata = IndexMetadata.builder(TEST_INDEX)
.settings(indexSettings(IndexVersion.current(), numShards, numReplicas))
.build();

return ClusterState.builder(ClusterName.DEFAULT)
.nodes(discoveryNodes.masterNodeId("master").localNodeId("master"))
Expand Down
Loading