Skip to content

Commit ffdea84

Browse files
Address code review comments
1 parent 4078955 commit ffdea84

File tree

2 files changed

+183
-127
lines changed

2 files changed

+183
-127
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.settings.Setting;
2525
import org.elasticsearch.common.time.TimeProvider;
2626
import org.elasticsearch.common.util.Maps;
27+
import org.elasticsearch.core.Strings;
2728
import org.elasticsearch.core.TimeValue;
2829
import org.elasticsearch.index.shard.ShardId;
2930

@@ -46,9 +47,8 @@
4647
*/
4748
public class DesiredBalanceComputer {
4849

49-
private static final Logger STATIC_LOGGER = LogManager.getLogger(DesiredBalanceComputer.class);
50+
private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
5051

51-
private final Logger logger;
5252
private final ShardsAllocator delegateAllocator;
5353
private final TimeProvider timeProvider;
5454

@@ -76,43 +76,30 @@ public class DesiredBalanceComputer {
7676
private long numIterationsSinceLastConverged;
7777
private long lastConvergedTimeMillis;
7878
private long lastNotConvergedLogMessageTimeMillis;
79+
private Level convergenceLogMsgLevel;
7980

8081
public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
81-
this(clusterSettings, timeProvider, delegateAllocator, STATIC_LOGGER);
82-
}
83-
84-
// Package access for testing.
85-
DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator, Logger logger) {
86-
this.logger = logger;
8782
this.delegateAllocator = delegateAllocator;
8883
this.timeProvider = timeProvider;
8984
this.numComputeCallsSinceLastConverged = 0;
9085
this.numIterationsSinceLastConverged = 0;
9186
this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis();
9287
this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis;
88+
this.convergenceLogMsgLevel = Level.DEBUG;
9389
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
9490
clusterSettings.initializeAndWatch(
9591
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
9692
value -> this.maxBalanceComputationTimeDuringIndexCreationMillis = value.millis()
9793
);
9894
}
9995

100-
public record LastConvergenceInfo(long numComputeCallsSince, long numIterationsSince, long timestampMillis) {}
101-
102-
/**
103-
* Returns the last convergence timestamp along with the total number of computation calls and iterations since the last convergence.
104-
*/
105-
public LastConvergenceInfo getLastConvergence() {
106-
return new LastConvergenceInfo(numComputeCallsSinceLastConverged, numIterationsSinceLastConverged, lastConvergedTimeMillis);
107-
}
108-
10996
public DesiredBalance compute(
11097
DesiredBalance previousDesiredBalance,
11198
DesiredBalanceInput desiredBalanceInput,
11299
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
113100
Predicate<DesiredBalanceInput> isFresh
114101
) {
115-
++numComputeCallsSinceLastConverged;
102+
numComputeCallsSinceLastConverged += 1;
116103
if (logger.isTraceEnabled()) {
117104
logger.trace(
118105
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
@@ -345,8 +332,8 @@ public DesiredBalance compute(
345332
}
346333
}
347334

348-
i++;
349-
++numIterationsSinceLastConverged;
335+
i += 1;
336+
numIterationsSinceLastConverged += 1;
350337
final int iterations = i;
351338
final long currentTime = timeProvider.relativeTimeInMillis();
352339
final boolean reportByTime = nextReportTime <= currentTime;
@@ -356,16 +343,32 @@ public DesiredBalance compute(
356343
}
357344

358345
if (hasChanges == false && hasEnoughIterations(i)) {
359-
logger.debug(
360-
"Desired balance computation for [{}] converged after [{}] and [{}] iterations, "
361-
+ "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago",
362-
desiredBalanceInput.index(),
363-
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
364-
i,
365-
numComputeCallsSinceLastConverged,
366-
numIterationsSinceLastConverged,
367-
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString()
368-
);
346+
if (numComputeCallsSinceLastConverged > 1) {
347+
logger.log(
348+
convergenceLogMsgLevel,
349+
() -> Strings.format(
350+
"""
351+
Desired balance computation for [%d] converged after [%s] and [%d] iterations, \
352+
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
353+
desiredBalanceInput.index(),
354+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
355+
numIterationsSinceLastConverged,
356+
numComputeCallsSinceLastConverged,
357+
iterations,
358+
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
359+
)
360+
);
361+
} else {
362+
logger.log(
363+
convergenceLogMsgLevel,
364+
() -> Strings.format(
365+
"Desired balance computation for [%d] converged after [%s] and [%d] iterations",
366+
desiredBalanceInput.index(),
367+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
368+
numIterationsSinceLastConverged
369+
)
370+
);
371+
}
369372
numComputeCallsSinceLastConverged = 0;
370373
numIterationsSinceLastConverged = 0;
371374
lastConvergedTimeMillis = currentTime;
@@ -400,17 +403,33 @@ public DesiredBalance compute(
400403
break;
401404
}
402405

403-
logger.log(
404-
reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE,
405-
"Desired balance computation for [{}] is still not converged after [{}] and [{}] iterations, "
406-
+ "[{}] compute() calls with [{}] total iterations since last convergence [{}] ago",
407-
desiredBalanceInput.index(),
408-
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
409-
iterations,
410-
numComputeCallsSinceLastConverged,
411-
numIterationsSinceLastConverged,
412-
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString()
413-
);
406+
final var logLevel = reportByIterationCount || reportByTime ? Level.INFO : i % 100 == 0 ? Level.DEBUG : Level.TRACE;
407+
if (numComputeCallsSinceLastConverged > 1) {
408+
logger.log(
409+
logLevel,
410+
() -> Strings.format(
411+
"""
412+
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \
413+
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
414+
desiredBalanceInput.index(),
415+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
416+
numIterationsSinceLastConverged,
417+
numComputeCallsSinceLastConverged,
418+
iterations,
419+
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
420+
)
421+
);
422+
} else {
423+
logger.log(
424+
logLevel,
425+
() -> Strings.format(
426+
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
427+
desiredBalanceInput.index(),
428+
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
429+
numIterationsSinceLastConverged
430+
)
431+
);
432+
}
414433

415434
if (reportByIterationCount || reportByTime) {
416435
lastNotConvergedLogMessageTimeMillis = currentTime;
@@ -509,4 +528,21 @@ private static int computeIterationCountReportInterval(RoutingAllocation allocat
509528
}
510529
return iterations;
511530
}
531+
532+
// Package-level getters for testing.
533+
long getNumComputeCallsSinceLastConverged() {
534+
return numComputeCallsSinceLastConverged;
535+
}
536+
537+
long getNumIterationsSinceLastConverged() {
538+
return numIterationsSinceLastConverged;
539+
}
540+
541+
long getLastConvergedTimeMillis() {
542+
return lastConvergedTimeMillis;
543+
}
544+
545+
void setConvergenceLogMsgLevel(Level level) {
546+
convergenceLogMsgLevel = level;
547+
}
512548
}

0 commit comments

Comments
 (0)