diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/AutoShardingResult.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/AutoShardingResult.java index e0d48b139acff..d5bee47a27109 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/AutoShardingResult.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/AutoShardingResult.java @@ -9,7 +9,7 @@ package org.elasticsearch.action.datastreams.autosharding; -import org.elasticsearch.core.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; import java.util.Arrays; @@ -25,13 +25,7 @@ * {@link DataStreamAutoShardingService#DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING}) the target number of shards will be -1 and cool down * remaining {@link TimeValue#MAX_VALUE}. */ -public record AutoShardingResult( - AutoShardingType type, - int currentNumberOfShards, - int targetNumberOfShards, - TimeValue coolDownRemaining, - @Nullable Double writeLoad -) { +public record AutoShardingResult(AutoShardingType type, int currentNumberOfShards, int targetNumberOfShards, TimeValue coolDownRemaining) { static final String COOLDOWN_PREVENTING_TYPES = Arrays.toString( new AutoShardingType[] { COOLDOWN_PREVENTED_DECREASE, COOLDOWN_PREVENTED_INCREASE } @@ -53,8 +47,36 @@ public record AutoShardingResult( AutoShardingType.NOT_APPLICABLE, -1, -1, - TimeValue.MAX_VALUE, - null + TimeValue.MAX_VALUE ); + @Override + public String toString() { + return switch (type) { + case INCREASE_SHARDS -> Strings.format( + "Recommendation to increase shards from %d to %d", + currentNumberOfShards, + targetNumberOfShards + ); + case DECREASE_SHARDS -> Strings.format( + "Recommendation to decrease shards from %d to %d", + currentNumberOfShards, + targetNumberOfShards + ); + case COOLDOWN_PREVENTED_INCREASE -> Strings.format( + "Deferred recommendation to increase shards from %d to %d after cooldown period %s", + currentNumberOfShards, + targetNumberOfShards, + coolDownRemaining + ); + case COOLDOWN_PREVENTED_DECREASE -> Strings.format( + "Deferred recommendation to decrease shards from %d to %d after cooldown period %s", + currentNumberOfShards, + targetNumberOfShards, + coolDownRemaining + ); + case NO_CHANGE_REQUIRED -> Strings.format("Recommendation to leave shards unchanged at %d", currentNumberOfShards); + case NOT_APPLICABLE -> "No recommendation as auto-sharding not enabled"; + }; + } } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index e4c5bff660f15..d4b2a46b73e6d 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -11,14 +11,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.IndexMetadataStats; import org.elasticsearch.cluster.metadata.IndexWriteLoad; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -28,12 +29,20 @@ import org.elasticsearch.index.shard.IndexingStats; import java.util.Arrays; +import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.OptionalDouble; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static java.util.stream.Collectors.toMap; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingResult.NOT_APPLICABLE_RESULT; /** @@ -146,8 +155,9 @@ public enum WriteLoadMetric { private final ClusterService clusterService; private final boolean isAutoShardingEnabled; private final LongSupplier nowSupplier; + private final Consumer decisionLogger; private volatile TimeValue increaseShardsCooldown; - private volatile TimeValue reduceShardsCooldown; + private volatile TimeValue decreaseShardsCooldown; private volatile int minWriteThreads; private volatile int maxWriteThreads; private volatile List dataStreamExcludePatterns; @@ -155,16 +165,32 @@ public enum WriteLoadMetric { private volatile WriteLoadMetric decreaseShardsMetric; public DataStreamAutoShardingService(Settings settings, ClusterService clusterService, LongSupplier nowSupplier) { + this(settings, clusterService, nowSupplier, createPeriodicLoggingDecisionConsumer(nowSupplier)); + } + + private static Consumer createPeriodicLoggingDecisionConsumer(LongSupplier nowSupplier) { + PeriodicDecisionLogger periodicDecisionLogger = new PeriodicDecisionLogger(nowSupplier); + return periodicDecisionLogger::maybeLogDecision; + } + + // Exists to allow a fake decision logger to be injected in tests + DataStreamAutoShardingService( + Settings settings, + ClusterService clusterService, + LongSupplier nowSupplier, + Consumer decisionLogger + ) { this.clusterService = clusterService; this.isAutoShardingEnabled = settings.getAsBoolean(DATA_STREAMS_AUTO_SHARDING_ENABLED, false); this.increaseShardsCooldown = DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN.get(settings); - this.reduceShardsCooldown = DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN.get(settings); + this.decreaseShardsCooldown = DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN.get(settings); this.minWriteThreads = CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS.get(settings); this.maxWriteThreads = CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS.get(settings); this.dataStreamExcludePatterns = DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.get(settings); this.increaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC.get(settings); this.decreaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC.get(settings); this.nowSupplier = nowSupplier; + this.decisionLogger = decisionLogger; } public void init() { @@ -182,6 +208,141 @@ public void init() { .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, this::updateDecreaseShardsMetric); } + // package-private for testing + + /** + * Contains all the information relating to a decision made by this service: the inputs, the calculations made, and the resulting + * recommendation. + * + * @param inputs The inputs into the decision + * @param increaseCalculation The results of the calculation to determine whether to increase the number of shards + * @param decreaseCalculation The results of the calculation to determine whether to decrease the number of shards, or null of the + * service already decided to increase the number (the decrease calculation is skipped in that case) + * @param result The resulting recommendation to be returned from the service + */ + record Decision( + Inputs inputs, + IncreaseCalculation increaseCalculation, + @Nullable DecreaseCalculation decreaseCalculation, + AutoShardingResult result + ) { + + /** + * Contains the inputs to a decision. + * + * @param increaseShardsCooldown The time since the last auto-sharding to wait before increasing the shards + * (see the {@link #DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN} cluster setting) + * @param decreaseShardsCooldown The time since the last auto-sharding to wait before decreasing the shards - or, if there was no + * previous auto-sharding, the time since the first backing index was created + * (see the {@link #DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN} cluster setting) + * @param minWriteThreads The minimum number of write threads per shard (see the {@link #CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS} + * cluster setting) + * @param maxWriteThreads The maximum number of write threads per shard (see the {@link #CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS} + * cluster setting) + * @param increaseShardsMetric Which load metric to use for the increase shards calculation + * (see the {@link #DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC} cluster setting) + * @param decreaseShardsMetric Which load metric to use for the decrease shards calculation + * (see the {@link #DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC} cluster setting) + * @param dataStream The name of the data stream + * @param writeIndex The name of the current write index + * @param writeIndexAllTimeLoad The all-time load metric for the write index (see {@link IndexingStats.Stats#getWriteLoad()}) + * @param writeIndexRecentLoad The recent load metric for the current write index + * (see {@link IndexingStats.Stats#getRecentWriteLoad()}) + * @param writeIndexPeakLoad The peak load metric for the write index (see {@link IndexingStats.Stats#getPeakWriteLoad()}) + * @param currentNumberOfWriteIndexShards The number of shards for the current write index + */ + record Inputs( + TimeValue increaseShardsCooldown, + TimeValue decreaseShardsCooldown, + int minWriteThreads, + int maxWriteThreads, + DataStreamAutoShardingService.WriteLoadMetric increaseShardsMetric, + DataStreamAutoShardingService.WriteLoadMetric decreaseShardsMetric, + String dataStream, + String writeIndex, + double writeIndexAllTimeLoad, + double writeIndexRecentLoad, + double writeIndexPeakLoad, + int currentNumberOfWriteIndexShards + ) {} + + /** + * Contains details from the increase shards calculation. + * + * @param writeIndexLoadForIncrease The load considered for the increase shards calculation (i.e. one of write index load metrics + * from the {@link Decision.Inputs}, as determined by {@link Decision.Inputs#increaseShardsMetric}) + * @param optimalShardCountForIncrease The optimal shard count determined based on the load + * @param increaseResult If the optimal shard count is greater than the current shard count, a recommendation to increase the number + * of shards (possibly after a cooldown period); otherwise, null + */ + record IncreaseCalculation( + double writeIndexLoadForIncrease, + int optimalShardCountForIncrease, + @Nullable AutoShardingResult increaseResult + ) {} + + /** + * Contains details from the increase shards calculation. + * + * @param maxLoadWithinCooldown The load considered for the decrease shards calculation (i.e. a load metric determined by + * {@link Decision.Inputs#increaseShardsMetric}, either for the write index or an older one within the cooldown period) and a + * record of which index that corresponded to + * @param optimalShardCountForDecrease The optimal shard count determined based on the load + * @param decreaseResult If the optimal shard count is less than the current shard count, a recommendation to decrease the number + * of shards (possibly after a cooldown period); otherwise, null + */ + record DecreaseCalculation( + MaxLoadWithinCooldown maxLoadWithinCooldown, + int optimalShardCountForDecrease, + @Nullable AutoShardingResult decreaseResult + ) { + + /** + * Contains information about the backing index with the highest load, out of the current write index and all the older + * indices created within the cooldown period. + * + * @param load The highest load + * @param previousIndexWithMaxLoad If this load came from one of the previous backing indices, the name of that index; null if + * it came from the current write index + */ + record MaxLoadWithinCooldown(double load, @Nullable String previousIndexWithMaxLoad) {} + } + + @Override + public String toString() { + return Strings.format( + "For data stream %s: %s based on [inc/dec cooldowns %s/%s, %d-%d threads, " + + "write index %s has all-time/recent/peak loads %g/%g/%g, current shards %d, " + + "using %s value %g for increase gives %d shards%s]", + inputs.dataStream, + result, + inputs.increaseShardsCooldown, + inputs.decreaseShardsCooldown, + inputs.minWriteThreads, + inputs.maxWriteThreads, + inputs.writeIndex, + inputs.writeIndexAllTimeLoad, + inputs.writeIndexRecentLoad, + inputs.writeIndexPeakLoad, + inputs.currentNumberOfWriteIndexShards, + inputs.increaseShardsMetric, + increaseCalculation.writeIndexLoadForIncrease, + increaseCalculation.optimalShardCountForIncrease, + decreaseCalculation == null + ? "" + : Strings.format( + ", and using %s value %g for dec based on %s gives %d shards", + inputs.decreaseShardsMetric, + decreaseCalculation.maxLoadWithinCooldown.load, + decreaseCalculation.maxLoadWithinCooldown.previousIndexWithMaxLoad != null + ? decreaseCalculation.maxLoadWithinCooldown.previousIndexWithMaxLoad + : "write index", + decreaseCalculation.optimalShardCountForDecrease + ) + ); + } + } + /** * Computes the optimal number of shards for the provided data stream according to the write index's indexing load (to check if we must * increase the number of shards, whilst the heuristics for decreasing the number of shards _might_ use the provided write indexing @@ -233,23 +394,28 @@ public AutoShardingResult calculate(ProjectState state, DataStream dataStream, @ return NOT_APPLICABLE_RESULT; } - double writeIndexLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getWriteLoad); + double writeIndexAllTimeLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getWriteLoad); double writeIndexRecentLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getRecentWriteLoad); double writeIndexPeakLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getPeakWriteLoad); - double writeIndexLoadForIncrease = pickMetric(increaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); - double writeIndexLoadForDecrease = pickMetric(decreaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); - - logger.trace( - "Data stream auto-sharding service calculating recommendation with all-time load {}, recent load {}, peak load {}, " - + "using {} for increase and {} for decrease", - writeIndexLoad, + IndexMetadata writeIndex = state.metadata().index(dataStream.getWriteIndex()); + assert writeIndex != null : "the data stream write index must exist in the provided cluster metadata"; + Decision.Inputs inputs = new Decision.Inputs( + increaseShardsCooldown, + decreaseShardsCooldown, + minWriteThreads, + maxWriteThreads, + increaseShardsMetric, + decreaseShardsMetric, + dataStream.getName(), + writeIndex.getIndex().getName(), + writeIndexAllTimeLoad, writeIndexRecentLoad, writeIndexPeakLoad, - increaseShardsMetric, - decreaseShardsMetric + writeIndex.getNumberOfShards() ); - - return innerCalculate(state.metadata(), dataStream, writeIndexLoadForIncrease, writeIndexLoadForDecrease, nowSupplier); + Decision decision = innerCalculate(state.metadata(), dataStream, inputs); + decisionLogger.accept(decision); + return decision.result(); } private static double sumLoadMetrics(IndexStats stats, Function loadMetric) { @@ -262,75 +428,65 @@ private static double sumLoadMetrics(IndexStats stats, Function getDecreaseShardsResult( - project, - dataStream, - writeIndexLoadForDecrease, - nowSupplier, - writeIndex, - getRemainingDecreaseShardsCooldown(project, dataStream) + private Decision innerCalculate(ProjectMetadata project, DataStream dataStream, Decision.Inputs inputs) { + // See whether we recommend increasing the number of shards. + Decision.IncreaseCalculation increaseCalculation = calculateIncreaseShardsDecision(dataStream, inputs); + if (increaseCalculation.increaseResult() != null) { + return new Decision(inputs, increaseCalculation, null, increaseCalculation.increaseResult()); + } + // If not, see whether we recommend decreasing the number of shards. + Decision.DecreaseCalculation decreaseCalculation = calculateDecreaseShardsDecision(project, dataStream, inputs); + if (decreaseCalculation.decreaseResult() != null) { + return new Decision(inputs, increaseCalculation, decreaseCalculation, decreaseCalculation.decreaseResult()); + } + // If we don't recommend increasing or decreasing then we recommend no change. + return new Decision( + inputs, + increaseCalculation, + decreaseCalculation, + new AutoShardingResult( + AutoShardingType.NO_CHANGE_REQUIRED, + inputs.currentNumberOfWriteIndexShards(), + inputs.currentNumberOfWriteIndexShards(), + TimeValue.ZERO ) ); - } - @Nullable - private AutoShardingResult getIncreaseShardsResult( - DataStream dataStream, - double writeIndexLoadForIncrease, - LongSupplier nowSupplier, - IndexMetadata writeIndex - ) { + private Decision.IncreaseCalculation calculateIncreaseShardsDecision(DataStream dataStream, Decision.Inputs inputs) { // increasing the number of shards is calculated solely based on the index load of the write index - long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, writeIndexLoadForIncrease); - logger.trace( - "Calculated the optimal number of shards for a potential increase in number of shards for data stream [{}] as [{}]" - + " with the {} indexing load [{}] for the write index assuming [{}-{}] threads per shard", - dataStream.getName(), - optimalShardCount, - increaseShardsMetric, - writeIndexLoadForIncrease, - minWriteThreads, - maxWriteThreads + double writeIndexLoadForIncrease = pickMetric( + inputs.increaseShardsMetric(), + inputs.writeIndexAllTimeLoad(), + inputs.writeIndexRecentLoad(), + inputs.writeIndexPeakLoad() ); - if (optimalShardCount > writeIndex.getNumberOfShards()) { + int optimalShardCountForIncrease = computeOptimalNumberOfShards( + inputs.minWriteThreads(), + inputs.maxWriteThreads(), + writeIndexLoadForIncrease + ); + if (optimalShardCountForIncrease > inputs.currentNumberOfWriteIndexShards()) { TimeValue timeSinceLastAutoShardingEvent = dataStream.getAutoShardingEvent() != null ? dataStream.getAutoShardingEvent().getTimeSinceLastAutoShardingEvent(nowSupplier) : TimeValue.MAX_VALUE; - TimeValue coolDownRemaining = TimeValue.timeValueMillis( - Math.max(0L, increaseShardsCooldown.millis() - timeSinceLastAutoShardingEvent.millis()) - ); - logger.debug( - "Data stream auto-sharding service recommends increasing the number of shards from [{}] to [{}] after [{}] cooldown for " - + "data stream [{}]", - writeIndex.getNumberOfShards(), - optimalShardCount, - coolDownRemaining, - dataStream.getName() + Math.max(0L, inputs.increaseShardsCooldown().millis() - timeSinceLastAutoShardingEvent.millis()) ); - return new AutoShardingResult( - coolDownRemaining.equals(TimeValue.ZERO) ? AutoShardingType.INCREASE_SHARDS : AutoShardingType.COOLDOWN_PREVENTED_INCREASE, - writeIndex.getNumberOfShards(), - Math.toIntExact(optimalShardCount), - coolDownRemaining, - writeIndexLoadForIncrease + return new Decision.IncreaseCalculation( + writeIndexLoadForIncrease, + optimalShardCountForIncrease, + new AutoShardingResult( + coolDownRemaining.equals(TimeValue.ZERO) + ? AutoShardingType.INCREASE_SHARDS + : AutoShardingType.COOLDOWN_PREVENTED_INCREASE, + inputs.currentNumberOfWriteIndexShards(), + optimalShardCountForIncrease, + coolDownRemaining + ) ); } - return null; + return new Decision.IncreaseCalculation(writeIndexLoadForIncrease, optimalShardCountForIncrease, null); } /** @@ -338,7 +494,7 @@ private AutoShardingResult getIncreaseShardsResult( * This reference for the remaining time math is either the time since the last auto sharding event (if available) or otherwise the * oldest index in the data stream. */ - private TimeValue getRemainingDecreaseShardsCooldown(ProjectMetadata project, DataStream dataStream) { + private TimeValue getRemainingDecreaseShardsCooldown(ProjectMetadata project, DataStream dataStream, TimeValue decreaseShardsCooldown) { Index oldestBackingIndex = dataStream.getIndices().get(0); IndexMetadata oldestIndexMeta = project.getIndexSafe(oldestBackingIndex); @@ -346,85 +502,54 @@ private TimeValue getRemainingDecreaseShardsCooldown(ProjectMetadata project, Da // without a pre-existing auto sharding event we wait until the oldest index has been created longer than the decrease_shards // cool down period "ago" so we don't immediately reduce the number of shards after a data stream is created ? TimeValue.timeValueMillis( - Math.max(0L, oldestIndexMeta.getCreationDate() + reduceShardsCooldown.millis() - nowSupplier.getAsLong()) + Math.max(0L, oldestIndexMeta.getCreationDate() + decreaseShardsCooldown.millis() - nowSupplier.getAsLong()) ) : TimeValue.timeValueMillis( Math.max( 0L, - reduceShardsCooldown.millis() - dataStream.getAutoShardingEvent() + decreaseShardsCooldown.millis() - dataStream.getAutoShardingEvent() .getTimeSinceLastAutoShardingEvent(nowSupplier) .millis() ) ); } - private AutoShardingResult getDecreaseShardsResult( + private Decision.DecreaseCalculation calculateDecreaseShardsDecision( ProjectMetadata project, DataStream dataStream, - double writeIndexLoadForDecrease, - LongSupplier nowSupplier, - IndexMetadata writeIndex, - TimeValue remainingReduceShardsCooldown + Decision.Inputs inputs ) { - double maxIndexLoadWithinCoolingPeriod = getMaxIndexLoadWithinCoolingPeriod( + TimeValue remainingCooldownForDecrease = getRemainingDecreaseShardsCooldown(project, dataStream, inputs.decreaseShardsCooldown()); + Decision.DecreaseCalculation.MaxLoadWithinCooldown maxLoadWithinCooldownForDecrease = getMaxIndexLoadWithinCoolingPeriod( project, dataStream, - writeIndexLoadForDecrease, - reduceShardsCooldown, - nowSupplier, - decreaseShardsMetric + inputs, + nowSupplier ); - - long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, maxIndexLoadWithinCoolingPeriod); - logger.trace( - "Calculated the optimal number of shards for a potential decrease in number of shards for data stream [{}] as [{}]" - + " shards, using a max {} indexing load [{}] over the cool down period [{}] assuming [{}-{}] threads per shard", - dataStream.getName(), - optimalShardCount, - decreaseShardsMetric, - maxIndexLoadWithinCoolingPeriod, - reduceShardsCooldown, - minWriteThreads, - maxWriteThreads + int optimalShardCountForDecrease = computeOptimalNumberOfShards( + inputs.minWriteThreads(), + inputs.maxWriteThreads(), + maxLoadWithinCooldownForDecrease.load() ); - if (optimalShardCount < writeIndex.getNumberOfShards()) { - logger.debug( - "data stream auto-sharding service recommends decreasing the number of shards from [{}] to [{}] after [{}] cooldown for " - + "data stream [{}]", - writeIndex.getNumberOfShards(), - optimalShardCount, - remainingReduceShardsCooldown, - dataStream.getName() - ); - - // we should reduce the number of shards - return new AutoShardingResult( - remainingReduceShardsCooldown.equals(TimeValue.ZERO) - ? AutoShardingType.DECREASE_SHARDS - : AutoShardingType.COOLDOWN_PREVENTED_DECREASE, - writeIndex.getNumberOfShards(), - Math.toIntExact(optimalShardCount), - remainingReduceShardsCooldown, - maxIndexLoadWithinCoolingPeriod + if (optimalShardCountForDecrease < inputs.currentNumberOfWriteIndexShards()) { + return new Decision.DecreaseCalculation( + maxLoadWithinCooldownForDecrease, + optimalShardCountForDecrease, + new AutoShardingResult( + remainingCooldownForDecrease.equals(TimeValue.ZERO) + ? AutoShardingType.DECREASE_SHARDS + : AutoShardingType.COOLDOWN_PREVENTED_DECREASE, + inputs.currentNumberOfWriteIndexShards, + optimalShardCountForDecrease, + remainingCooldownForDecrease + ) ); } - - logger.trace( - "data stream auto-sharding service recommends maintaining the number of shards [{}] for data stream [{}]", - writeIndex.getNumberOfShards(), - dataStream.getName() - ); - return new AutoShardingResult( - AutoShardingType.NO_CHANGE_REQUIRED, - writeIndex.getNumberOfShards(), - writeIndex.getNumberOfShards(), - TimeValue.ZERO, - maxIndexLoadWithinCoolingPeriod - ); + return new Decision.DecreaseCalculation(maxLoadWithinCooldownForDecrease, optimalShardCountForDecrease, null); } // Visible for testing - static long computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumberWriteThreads, double indexingLoad) { + static int computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumberWriteThreads, double indexingLoad) { /* * Define: * - shardsByMaxThreads = number of shards required to ensure no more than 50% utilization with max number of threads per shard @@ -436,12 +561,14 @@ static long computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumbe * - shardsByMinThreads if 0 < shardsByMinThreads <= 3 * - 1 if shardsByMinThreads == 0 */ - return Math.max( + return Math.toIntExact( Math.max( - Math.min(roundUp(indexingLoad / (minNumberWriteThreads / 2.0)), 3), - roundUp(indexingLoad / (maxNumberWriteThreads / 2.0)) - ), - 1 // we don't want to go lower than 1 shard + Math.max( + Math.min(roundUp(indexingLoad / (minNumberWriteThreads / 2.0)), 3), + roundUp(indexingLoad / (maxNumberWriteThreads / 2.0)) + ), + 1 // we don't want to go lower than 1 shard + ) ); } @@ -455,38 +582,46 @@ private static long roundUp(double value) { * during the provide {@param coolingPeriod} (note: to cover the entire cooling period, the backing index created before the cooling * period is also considered). */ - static double getMaxIndexLoadWithinCoolingPeriod( + static Decision.DecreaseCalculation.MaxLoadWithinCooldown getMaxIndexLoadWithinCoolingPeriod( ProjectMetadata project, DataStream dataStream, - double writeIndexLoadForDecrease, - TimeValue coolingPeriod, - LongSupplier nowSupplier, - WriteLoadMetric decreaseShardsMetric + Decision.Inputs inputs, + LongSupplier nowSupplier ) { // for reducing the number of shards we look at more than just the write index - List writeLoadsWithinCoolingPeriod = DataStream.getIndicesWithinMaxAgeRange( + Map writeLoadsWithinCoolingPeriod = DataStream.getIndicesWithinMaxAgeRange( dataStream, project::getIndexSafe, - coolingPeriod, + inputs.decreaseShardsCooldown(), nowSupplier ) .stream() .filter(index -> index.equals(dataStream.getWriteIndex()) == false) .map(project::index) .filter(Objects::nonNull) - .map(IndexMetadata::getStats) - .filter(Objects::nonNull) - .map(IndexMetadataStats::writeLoad) - .filter(Objects::nonNull) - .toList(); + .filter(metadata -> metadata.getStats() != null) + .filter(metadata -> metadata.getStats().indexWriteLoad() != null) + .collect( + toMap(metadata -> metadata.getIndex().getName(), metadata -> metadata.getStats().indexWriteLoad(), (unused1, unused2) -> { + throw new IllegalStateException("Multiple indices with same name"); + }, LinkedHashMap::new) + ); // assume the current write index load is the highest observed and look back to find the actual maximum - double maxIndexLoadWithinCoolingPeriod = writeIndexLoadForDecrease; - for (IndexWriteLoad writeLoad : writeLoadsWithinCoolingPeriod) { + double maxLoadWithinCooldown = pickMetric( + inputs.decreaseShardsMetric(), + inputs.writeIndexAllTimeLoad(), + inputs.writeIndexRecentLoad(), + inputs.writeIndexPeakLoad() + ); + String previousIndexInCooldownWithMaxLoad = null; + for (Map.Entry entry : writeLoadsWithinCoolingPeriod.entrySet()) { + String indexName = entry.getKey(); + IndexWriteLoad writeLoad = entry.getValue(); double totalIndexLoad = 0; for (int shardId = 0; shardId < writeLoad.numberOfShards(); shardId++) { Double writeLoadForShard = pickMetric( - decreaseShardsMetric, + inputs.decreaseShardsMetric(), optionalDoubleToNullable(writeLoad.getWriteLoadForShard(shardId)), optionalDoubleToNullable(writeLoad.getRecentWriteLoadForShard(shardId)), optionalDoubleToNullable(writeLoad.getPeakWriteLoadForShard(shardId)) @@ -495,11 +630,12 @@ static double getMaxIndexLoadWithinCoolingPeriod( totalIndexLoad += writeLoadForShard; } } - if (totalIndexLoad > maxIndexLoadWithinCoolingPeriod) { - maxIndexLoadWithinCoolingPeriod = totalIndexLoad; + if (totalIndexLoad > maxLoadWithinCooldown) { + maxLoadWithinCooldown = totalIndexLoad; + previousIndexInCooldownWithMaxLoad = indexName; } } - return maxIndexLoadWithinCoolingPeriod; + return new Decision.DecreaseCalculation.MaxLoadWithinCooldown(maxLoadWithinCooldown, previousIndexInCooldownWithMaxLoad); } void updateIncreaseShardsCooldown(TimeValue scaleUpCooldown) { @@ -507,7 +643,7 @@ void updateIncreaseShardsCooldown(TimeValue scaleUpCooldown) { } void updateReduceShardsCooldown(TimeValue scaleDownCooldown) { - this.reduceShardsCooldown = scaleDownCooldown; + this.decreaseShardsCooldown = scaleDownCooldown; } void updateMinWriteThreads(int minNumberWriteThreads) { @@ -546,4 +682,116 @@ private static Double pickMetric( private static Double optionalDoubleToNullable(OptionalDouble optional) { return optional.isPresent() ? optional.getAsDouble() : null; } + + /** + * A buffer which can be used to store a number of {@link Decision} instances, up to some fixed size limit, keeping the decisions with + * the highest value according to some comparator. + */ + private static class DecisionBuffer { + + private final Comparator comparator; + private final PriorityQueue queue; // This is a Lucene PriorityQueue, which is bounded, unlike the JDK one. + + DecisionBuffer(int maxSize, Comparator comparator) { + this.comparator = comparator; + this.queue = new PriorityQueue<>(maxSize) { + @Override + protected boolean lessThan(Decision decision1, Decision decision2) { + return comparator.compare(decision1, decision2) < 0; + } + }; + } + + /** + * Inserts the decision into the buffer, unless the buffer is full and all the contents are of higher value according to the + * comparator, in which case the decision is discarded. + */ + synchronized void insert(Decision Decision) { + queue.insertWithOverflow(Decision); + } + + /** + * Clears the buffer, returning a copy of the previous contents of the buffer. The returned values will be sorted from high to low + * according to the comparator. + */ + synchronized List flush() { + List previousDecisions = StreamSupport.stream(queue.spliterator(), false).sorted(comparator.reversed()).toList(); + queue.clear(); + return previousDecisions; + } + } + + // Package-private for testing + static class PeriodicDecisionLogger { + + static final int BUFFER_SIZE = 10; + static final long FLUSH_INTERVAL_MILLIS = TimeValue.timeValueMinutes(5).millis(); + + private static final Comparator HIGHEST_LOAD_COMPARATOR = Comparator.comparing( + d -> d.increaseCalculation().writeIndexLoadForIncrease() + ); + + private final LongSupplier nowSupplier; + private final Consumer logConsumer; + private final AtomicLong lastFlushMillis; + private final DecisionBuffer highestLoadIncreaseDecisions; + private final DecisionBuffer highestLoadNonIncreaseDecisions; + + PeriodicDecisionLogger(LongSupplier nowSupplier) { + this(nowSupplier, PeriodicDecisionLogger::logFlushedDecision); + } + + // Exists to allow a fake logger to be injected in tests + PeriodicDecisionLogger(LongSupplier nowSupplier, Consumer logConsumer) { + this.nowSupplier = nowSupplier; + this.highestLoadIncreaseDecisions = new DecisionBuffer(BUFFER_SIZE, HIGHEST_LOAD_COMPARATOR); + this.highestLoadNonIncreaseDecisions = new DecisionBuffer(BUFFER_SIZE, HIGHEST_LOAD_COMPARATOR); + this.lastFlushMillis = new AtomicLong(nowSupplier.getAsLong()); + this.logConsumer = logConsumer; + } + + record FlushedDecisions(List highestLoadIncreaseDecisions, List highestLoadNonIncreaseDecisions) {} + + void maybeLogDecision(Decision Decision) { + assert Decision.result != null : "Attempting to log a decision with no result"; + logger.debug("Data stream auto-sharding result: {}", Decision); + if (Decision.result.type() == AutoShardingType.INCREASE_SHARDS) { + highestLoadIncreaseDecisions.insert(Decision); + } else { + highestLoadNonIncreaseDecisions.insert(Decision); + } + if (shouldFlush()) { + FlushedDecisions flushedDecisions = new FlushedDecisions( + highestLoadIncreaseDecisions.flush(), + highestLoadNonIncreaseDecisions.flush() + ); + logConsumer.accept(flushedDecisions); + } + } + + private boolean shouldFlush() { + long now = nowSupplier.getAsLong(); + // Get the time of the last flush. If it is time for the next flush, update that with the current time; else leave it alone. + long previous = lastFlushMillis.getAndUpdate(last -> ((now - last) >= FLUSH_INTERVAL_MILLIS) ? now : last); + // Return whether it is time for the next flush. + return (now - previous) >= FLUSH_INTERVAL_MILLIS; + } + + private static void logFlushedDecision(FlushedDecisions decisions) { + if (decisions.highestLoadIncreaseDecisions.isEmpty() == false) { + logger.info( + "Data stream auto-sharding decisions in the last {} with highest load with an increase shards recommendation: \n{}", + TimeValue.timeValueMillis(FLUSH_INTERVAL_MILLIS), + decisions.highestLoadIncreaseDecisions.stream().map(d -> " - " + d).collect(Collectors.joining("\n")) + ); + } + if (decisions.highestLoadNonIncreaseDecisions.isEmpty() == false) { + logger.info( + "Data stream auto-sharding decisions in the last {} with highest load without an increase shards recommendation: \n{}", + TimeValue.timeValueMillis(FLUSH_INTERVAL_MILLIS), + decisions.highestLoadNonIncreaseDecisions.stream().map(d -> " - " + d).collect(Collectors.joining("\n")) + ); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java index 5e009ce902873..312996de5892d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java @@ -117,7 +117,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(INCREASE_SHARDS, 3, 5, TimeValue.ZERO, 64.33), + new AutoShardingResult(INCREASE_SHARDS, 3, 5, TimeValue.ZERO), false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 5); @@ -146,7 +146,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33), + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO), false ); assertRolloverResult( @@ -182,7 +182,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33), + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO), false ); assertRolloverResult( @@ -200,8 +200,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception COOLDOWN_PREVENTED_INCREASE, 3, 5, - TimeValue.timeValueMinutes(10), - 64.33 + TimeValue.timeValueMinutes(10) ); MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( clusterState.projectState(projectId), @@ -213,7 +212,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(COOLDOWN_PREVENTED_INCREASE, 3, 5, TimeValue.timeValueMinutes(10), 64.33), + new AutoShardingResult(COOLDOWN_PREVENTED_INCREASE, 3, 5, TimeValue.timeValueMinutes(10)), false ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown @@ -239,7 +238,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(COOLDOWN_PREVENTED_DECREASE, 3, 1, TimeValue.timeValueMinutes(10), 64.33), + new AutoShardingResult(COOLDOWN_PREVENTED_DECREASE, 3, 1, TimeValue.timeValueMinutes(10)), false ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown @@ -266,7 +265,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO, 2.33), + new AutoShardingResult(NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO), false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); @@ -293,7 +292,7 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception randomBoolean(), false, null, - new AutoShardingResult(NOT_APPLICABLE, 1, 1, TimeValue.MAX_VALUE, null), + new AutoShardingResult(NOT_APPLICABLE, 1, 1, TimeValue.MAX_VALUE), false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); @@ -379,7 +378,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(INCREASE_SHARDS, 3, 5, TimeValue.ZERO, 64.33), + new AutoShardingResult(INCREASE_SHARDS, 3, 5, TimeValue.ZERO), false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 5); @@ -408,7 +407,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33), + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO), false ); assertRolloverResult( @@ -444,7 +443,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33), + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO), false ); assertRolloverResult( @@ -468,7 +467,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(COOLDOWN_PREVENTED_INCREASE, 3, 5, TimeValue.timeValueMinutes(10), 64.33), + new AutoShardingResult(COOLDOWN_PREVENTED_INCREASE, 3, 5, TimeValue.timeValueMinutes(10)), false ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown @@ -494,7 +493,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(COOLDOWN_PREVENTED_DECREASE, 3, 1, TimeValue.timeValueMinutes(10), 64.33), + new AutoShardingResult(COOLDOWN_PREVENTED_DECREASE, 3, 1, TimeValue.timeValueMinutes(10)), false ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown @@ -521,7 +520,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO, 2.33), + new AutoShardingResult(NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO), false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); @@ -548,7 +547,7 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception randomBoolean(), false, null, - new AutoShardingResult(NOT_APPLICABLE, 1, 1, TimeValue.MAX_VALUE, null), + new AutoShardingResult(NOT_APPLICABLE, 1, 1, TimeValue.MAX_VALUE), false ); // if the auto sharding is not applicable we just use whatever's in the index template (1 shard in this case) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java index d85e10f13e3b7..10d3029b76c63 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java @@ -161,8 +161,8 @@ public void testConditionsAreMet() { rolloverConditions = RolloverConditions.newBuilder() .addOptimalShardCountCondition( randomBoolean() - ? new AutoShardingResult(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO, 3.0) - : new AutoShardingResult(AutoShardingType.DECREASE_SHARDS, 7, 3, TimeValue.ZERO, 0.8) + ? new AutoShardingResult(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO) + : new AutoShardingResult(AutoShardingType.DECREASE_SHARDS, 7, 3, TimeValue.ZERO) ) .build(); assertThat(rolloverConditions.areConditionsMet(Map.of(optimalShardCountCondition.toString(), true)), is(true)); @@ -180,8 +180,7 @@ public void testConditionsAreMet() { ), 1, 3, - TimeValue.ZERO, - 3.0 + TimeValue.ZERO ) ) .build(); diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 1011c20bb95cc..a9aa023b9434c 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.Decision; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.WriteLoadMetric; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -51,6 +52,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingResult.NOT_APPLICABLE_RESULT; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.COOLDOWN_PREVENTED_DECREASE; @@ -61,7 +65,12 @@ import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC; import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; public class DataStreamAutoShardingServiceTests extends ESTestCase { @@ -72,6 +81,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase { private ClusterSettings clusterSettings; private ClusterService clusterService; private ThreadPool threadPool; + private List decisionsLogged; private DataStreamAutoShardingService service; private long now; private String dataStreamName; @@ -91,10 +101,12 @@ public void setupService() { clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings); clusterService = createClusterService(threadPool, clusterSettings); now = System.currentTimeMillis(); + decisionsLogged = new ArrayList<>(); service = new DataStreamAutoShardingService( Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build(), clusterService, - () -> now + () -> now, + decisionsLogged::add ); service.init(); dataStreamName = randomAlphaOfLengthBetween(10, 100); @@ -584,41 +596,121 @@ public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() { }); } + public void testCalculateDecreaseShardingRecommendations_correctDecisionData() { + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), // outside cooling period + now - TimeValue.timeValueDays(4).getMillis(), // within cooling period + now - TimeValue.timeValueDays(2).getMillis(), // within cooling period + now - TimeValue.timeValueDays(1).getMillis(), // within cooling period + now - 1000 + ), + List.of( + getWriteLoad(3, 9999.0, 0.444 / 3, 9999.0), + getWriteLoad(3, 9999.0, 0.222 / 3, 9999.0), + getWriteLoad(3, 9999.0, 0.333 / 3, 9999.0), + getWriteLoad(3, 9999.0, 0.111 / 3, 9999.0) + ), + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); + + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 1.9 / 3, 0.3 / 3, 0.9 / 3) + ); + assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); + assertThat(autoShardingResult.currentNumberOfShards(), is(3)); + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + assertThat(decisionsLogged, hasSize(1)); + Decision decision = decisionsLogged.get(0); + assertThat( + decision.inputs().increaseShardsCooldown(), + equalTo(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN.getDefault(Settings.EMPTY)) + ); + assertThat( + decision.inputs().decreaseShardsCooldown(), + equalTo(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN.getDefault(Settings.EMPTY)) + ); + assertThat( + decision.inputs().minWriteThreads(), + equalTo(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS.getDefault(Settings.EMPTY)) + ); + assertThat( + decision.inputs().maxWriteThreads(), + equalTo(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS.getDefault(Settings.EMPTY)) + ); + assertThat(decision.inputs().increaseShardsMetric(), equalTo(WriteLoadMetric.ALL_TIME)); + assertThat(decision.inputs().decreaseShardsMetric(), equalTo(WriteLoadMetric.RECENT)); + assertThat(decision.inputs().dataStream(), equalTo(dataStreamName)); + assertThat(decision.inputs().writeIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 5, now - 1000))); + assertThat(decision.inputs().writeIndexAllTimeLoad(), closeTo(1.9, 1.0e-8)); + assertThat(decision.inputs().writeIndexRecentLoad(), closeTo(0.3, 1.0e-8)); + assertThat(decision.inputs().writeIndexPeakLoad(), closeTo(0.9, 1.0e-8)); + assertThat(decision.inputs().currentNumberOfWriteIndexShards(), equalTo(3)); + assertThat(decision.increaseCalculation().writeIndexLoadForIncrease(), closeTo(1.9, 1.0e-8)); // all-time + // Increase shard count based on all-time load of 1.9 for write index: + assertThat(decision.increaseCalculation().optimalShardCountForIncrease(), equalTo(2)); + // The highest load for decrease (i.e. recent load) within the cooling period is the 0.333 from generation 3 + assertThat(decision.decreaseCalculation().maxLoadWithinCooldown().load(), closeTo(0.333, 1.0e-8)); + assertThat( + decision.decreaseCalculation().maxLoadWithinCooldown().previousIndexWithMaxLoad(), + equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 3, now - TimeValue.timeValueDays(2).getMillis())) + ); + // Decrease shard count based on recent load of 0.333 for gen 3 index + assertThat(decision.decreaseCalculation().optimalShardCountForDecrease(), equalTo(1)); + assertThat(decision.result(), equalTo(autoShardingResult)); + }); + } + public void testComputeOptimalNumberOfShards_zeroLoad() { - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 0.0), is(1L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 0.0), is(1)); } public void testComputeOptimalNumberOfShards_smallLoad() { // the small values will be very common so let's randomise to make sure we never go below 1L double indexingLoad = randomDoubleBetween(0.0001, 1.0, true); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, indexingLoad), is(1L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, indexingLoad), is(1)); } public void testComputeOptimalNumberOfShards_load2() { - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 2.0), is(2L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 2.0), is(2)); } public void testComputeOptimalNumberOfShards_loadUpTo32() { // there's a broad range of popular values (a write index starting to be very busy, using between 3 and all of the 32 write // threads, so let's randomise this too to make sure we stay at 3 recommended shards) double indexingLoad = randomDoubleBetween(3.0002, 32.0, true); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, indexingLoad), is(3L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, indexingLoad), is(3)); } public void testComputeOptimalNumberOfShards_load49() { - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 49.0), is(4L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 49.0), is(4)); } public void testComputeOptimalNumberOfShards_load70() { - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 70.0), is(5L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 70.0), is(5)); } public void testComputeOptimalNumberOfShards_load100() { - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 100.0), is(7L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 100.0), is(7)); } public void testComputeOptimalNumberOfShards_load180() { - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 180.0), is(12L)); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 180.0), is(12)); } public void testGetMaxIndexLoadWithinCoolingPeriod_withLongHistory() { @@ -690,16 +782,15 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_withLongHistory() { metadataBuilder.put(dataStream); - double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( - metadataBuilder.build().getProject(), - dataStream, - 3.0, - coolingPeriod, - () -> now, - WriteLoadMetric.ALL_TIME - ); + Decision.DecreaseCalculation.MaxLoadWithinCooldown maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService + .getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + createDecisionInputsForMaxLoadInCooldownTests(3.0, 9999.0, 9999.0, WriteLoadMetric.RECENT, WriteLoadMetric.ALL_TIME), + () -> now + ); // to cover the entire cooldown period, the last index before the cooling period is taken into account - assertThat(maxIndexLoadWithinCoolingPeriod, is(lastIndexBeforeCoolingPeriodHasLowWriteLoad ? 15.0 : 999.0)); + assertThat(maxIndexLoadWithinCoolingPeriod.load(), is(lastIndexBeforeCoolingPeriodHasLowWriteLoad ? 15.0 : 999.0)); } public void testGetMaxIndexLoadWithinCoolingPeriod_sumsShardLoads() { @@ -747,15 +838,14 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_sumsShardLoads() { metadataBuilder.put(dataStream); - double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( - metadataBuilder.build().getProject(), - dataStream, - 0.1, - coolingPeriod, - () -> now, - WriteLoadMetric.ALL_TIME - ); - assertThat(maxIndexLoadWithinCoolingPeriod, is(expectedIsSumOfShardLoads)); + Decision.DecreaseCalculation.MaxLoadWithinCooldown maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService + .getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + createDecisionInputsForMaxLoadInCooldownTests(0.1, 9999.0, 9999.0, WriteLoadMetric.RECENT, WriteLoadMetric.ALL_TIME), + () -> now + ); + assertThat(maxIndexLoadWithinCoolingPeriod.load(), is(expectedIsSumOfShardLoads)); } public void testGetMaxIndexLoadWithinCoolingPeriod_usingAllTimeWriteLoad() { @@ -789,15 +879,14 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_usingAllTimeWriteLoad() { metadataBuilder.put(dataStream); - double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( - metadataBuilder.build().getProject(), - dataStream, - 3.0, - coolingPeriod, - () -> now, - WriteLoadMetric.ALL_TIME - ); - assertThat(maxIndexLoadWithinCoolingPeriod, equalTo(3 * 5.0)); + Decision.DecreaseCalculation.MaxLoadWithinCooldown maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService + .getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + createDecisionInputsForMaxLoadInCooldownTests(3.0, 9999.0, 9999.0, WriteLoadMetric.RECENT, WriteLoadMetric.ALL_TIME), + () -> now + ); + assertThat(maxIndexLoadWithinCoolingPeriod.load(), equalTo(3 * 5.0)); } public void testGetMaxIndexLoadWithinCoolingPeriod_usingRecentWriteLoad() { @@ -831,15 +920,14 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_usingRecentWriteLoad() { metadataBuilder.put(dataStream); - double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( - metadataBuilder.build().getProject(), - dataStream, - 3.0, - coolingPeriod, - () -> now, - WriteLoadMetric.RECENT - ); - assertThat(maxIndexLoadWithinCoolingPeriod, equalTo(3 * 5.0)); + Decision.DecreaseCalculation.MaxLoadWithinCooldown maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService + .getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + createDecisionInputsForMaxLoadInCooldownTests(9999.0, 3.0, 9999.0, WriteLoadMetric.ALL_TIME, WriteLoadMetric.RECENT), + () -> now + ); + assertThat(maxIndexLoadWithinCoolingPeriod.load(), equalTo(3 * 5.0)); } public void testGetMaxIndexLoadWithinCoolingPeriod_usingPeakWriteLoad() { @@ -873,29 +961,45 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_usingPeakWriteLoad() { metadataBuilder.put(dataStream); - double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( - metadataBuilder.build().getProject(), - dataStream, - 3.0, - coolingPeriod, - () -> now, - WriteLoadMetric.PEAK + Decision.DecreaseCalculation.MaxLoadWithinCooldown maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService + .getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + createDecisionInputsForMaxLoadInCooldownTests(9999.0, 9999.0, 3.0, WriteLoadMetric.RECENT, WriteLoadMetric.PEAK), + () -> now + ); + assertThat(maxIndexLoadWithinCoolingPeriod.load(), equalTo(3 * 5.0)); + } + + private Decision.Inputs createDecisionInputsForMaxLoadInCooldownTests( + double writeIndexAllTimeLoad, + double writeIndexRecentLoad, + double writeIndexPeakLoad, + WriteLoadMetric increaseShardsMetric, + WriteLoadMetric decreaseShardsMetric + ) { + return new Decision.Inputs( + TimeValue.timeValueSeconds(270), + TimeValue.timeValueDays(3), + 2, + 32, + increaseShardsMetric, + decreaseShardsMetric, + dataStreamName, + "the-write-index", + writeIndexAllTimeLoad, + writeIndexRecentLoad, + writeIndexPeakLoad, + 3 ); - assertThat(maxIndexLoadWithinCoolingPeriod, equalTo(3 * 5.0)); } public void testAutoShardingResultValidation_increaseShardsShouldNotReportCooldown() { - expectThrows( - IllegalArgumentException.class, - () -> new AutoShardingResult(INCREASE_SHARDS, 1, 3, TimeValue.timeValueSeconds(3), 3.0) - ); + expectThrows(IllegalArgumentException.class, () -> new AutoShardingResult(INCREASE_SHARDS, 1, 3, TimeValue.timeValueSeconds(3))); } public void testAutoShardingResultValidation_decreaseShardsShouldNotReportCooldown() { - expectThrows( - IllegalArgumentException.class, - () -> new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.timeValueSeconds(3), 1.0) - ); + expectThrows(IllegalArgumentException.class, () -> new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.timeValueSeconds(3))); } public void testAutoShardingResultValidation_validCooldownPreventedIncrease() { @@ -903,8 +1007,7 @@ public void testAutoShardingResultValidation_validCooldownPreventedIncrease() { COOLDOWN_PREVENTED_INCREASE, 1, 3, - TimeValue.timeValueSeconds(3), - 3.0 + TimeValue.timeValueSeconds(3) ); assertThat(cooldownPreventedIncrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(3))); } @@ -914,8 +1017,7 @@ public void testAutoShardingResultValidation_validCooldownPreventedDecrease() { COOLDOWN_PREVENTED_DECREASE, 3, 1, - TimeValue.timeValueSeconds(7), - 1.0 + TimeValue.timeValueSeconds(7) ); assertThat(cooldownPreventedDecrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(7))); } @@ -980,17 +1082,39 @@ private DataStream createDataStream( IndexWriteLoad backingIndicesWriteLoad, @Nullable DataStreamAutoShardingEvent autoShardingEvent ) { + return createDataStream( + builder, + dataStreamName, + numberOfShards, + now, + indicesCreationDate, + Stream.generate(() -> backingIndicesWriteLoad).limit(indicesCreationDate.size() - 1).toList(), + autoShardingEvent + ); + } + + private DataStream createDataStream( + ProjectMetadata.Builder builder, + String dataStreamName, + int numberOfShards, + Long now, + List indicesCreationDate, + List backingIndicesWriteLoads, + @Nullable DataStreamAutoShardingEvent autoShardingEvent + ) { + assert backingIndicesWriteLoads.size() == indicesCreationDate.size() - 1 : "Expected index load for all except write index"; final List backingIndices = new ArrayList<>(); int backingIndicesCount = indicesCreationDate.size(); for (int k = 0; k < indicesCreationDate.size(); k++) { long createdAt = indicesCreationDate.get(k); IndexMetadata.Builder indexMetaBuilder; if (k < backingIndicesCount - 1) { + IndexWriteLoad backingIndexWriteLoad = backingIndicesWriteLoads.get(k); indexMetaBuilder = IndexMetadata.builder( createIndexMetadata( - DataStream.getDefaultBackingIndexName(dataStreamName, k + 1), + DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, createdAt), numberOfShards, - backingIndicesWriteLoad, + backingIndexWriteLoad, createdAt ) ); @@ -1049,4 +1173,173 @@ private void doWithMetricSelection(Setting setting, WriteLoadMe } } + // Tests for PeriodicDecisionLogger + + private static class FlushedDecisionsRecorder { + + List highestLoadIncreaseDecisions = new ArrayList<>(); + List highestLoadNonIncreaseDecisions = new ArrayList<>(); + + public void record(DataStreamAutoShardingService.PeriodicDecisionLogger.FlushedDecisions flushedDecisions) { + highestLoadIncreaseDecisions.addAll(flushedDecisions.highestLoadIncreaseDecisions()); + highestLoadNonIncreaseDecisions.addAll(flushedDecisions.highestLoadNonIncreaseDecisions()); + } + + public void clear() { + highestLoadIncreaseDecisions.clear(); + highestLoadNonIncreaseDecisions.clear(); + } + } + + public void testPeriodResultLogger_logsPeriodically() { + long start = System.currentTimeMillis(); + AtomicLong clock = new AtomicLong(start); + FlushedDecisionsRecorder recorder = new FlushedDecisionsRecorder(); + DataStreamAutoShardingService.PeriodicDecisionLogger periodicDecisionLogger = + new DataStreamAutoShardingService.PeriodicDecisionLogger(clock::get, recorder::record); + + // Should not flush when logging at the start time: + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(1)); + assertThat(recorder.highestLoadNonIncreaseDecisions, empty()); + + // Should not flush when logging in the past: + clock.set(start - 1); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(2)); + assertThat(recorder.highestLoadNonIncreaseDecisions, empty()); + + // Should not flush when logging just before the interval since the start has elapsed: + clock.set(start + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS - 1); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(3)); + assertThat(recorder.highestLoadNonIncreaseDecisions, empty()); + + // Should flush when logging at exactly the interval since the start has elapsed: + long firstFlushTime = start + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS; + clock.set(firstFlushTime); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(4)); + assertThat( + recorder.highestLoadNonIncreaseDecisions.stream().map(d -> d.inputs().dataStream()).toList(), + containsInAnyOrder("data-stream-1", "data-stream-2", "data-stream-3", "data-stream-4") + ); + recorder.clear(); + + // Should not flush a second time when logging again at the same time: + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(5)); + assertThat(recorder.highestLoadNonIncreaseDecisions, empty()); + + // Should not flush a second time when logging just before the interval since the first flush has elapsed: + clock.set(firstFlushTime + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS - 1); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(6)); + assertThat(recorder.highestLoadNonIncreaseDecisions, empty()); + + // Should flush a second time when logging some extra time after the interval since the first flush has elapsed: + long secondFlushTime = firstFlushTime + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS + 123456L; + clock.set(secondFlushTime); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(7)); + assertThat( + recorder.highestLoadNonIncreaseDecisions.stream().map(d -> d.inputs().dataStream()).toList(), + containsInAnyOrder("data-stream-5", "data-stream-6", "data-stream-7") + ); + recorder.clear(); + + // Should not flush a third time when logging just before the interval since the second flush has elapsed: + // (N.B. This time is more than two intervals since the start, but we count time from the last flush.) + clock.set(secondFlushTime + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS - 1); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(8)); + assertThat(recorder.highestLoadNonIncreaseDecisions, empty()); + + // Should flush a third time when logging at exactly the interval since the second flush has elapsed: + clock.set(secondFlushTime + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(9)); + assertThat( + recorder.highestLoadNonIncreaseDecisions.stream().map(d -> d.inputs().dataStream()).toList(), + containsInAnyOrder("data-stream-8", "data-stream-9") + ); + } + + public void testPeriodResultLogger_logsHighestLoadNonIncrementDecisions() { + long start = System.nanoTime(); + AtomicLong clock = new AtomicLong(start); + FlushedDecisionsRecorder recorder = new FlushedDecisionsRecorder(); + DataStreamAutoShardingService.PeriodicDecisionLogger periodicDecisionLogger = + new DataStreamAutoShardingService.PeriodicDecisionLogger(clock::get, recorder::record); + + // Pass in 13 decisions, in the order 8, 7, 6, 5; 13, 12, 11, 10; 4, 3, 2, 1; 9. Updating the clock before the last decision. + for (int i = 8; i >= 5; i--) { + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(i)); + } + for (int i = 13; i >= 10; i--) { + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(i)); + } + for (int i = 4; i >= 1; i--) { + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(i)); + } + clock.set(start + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(9)); + + // Should have logged the 10 decisions with the highest load, in decreasing order, i.e. number 13 down to number 4: + assertThat( + recorder.highestLoadNonIncreaseDecisions.stream().map(d -> d.inputs().dataStream()).toList(), + contains(IntStream.rangeClosed(4, 13).mapToObj(i -> "data-stream-" + i).toList().reversed().toArray()) + ); + } + + public void testPeriodResultLogger_separatesIncreaseAndNonIncreaseDecisions() { + long start = System.nanoTime(); + AtomicLong clock = new AtomicLong(start); + FlushedDecisionsRecorder recorder = new FlushedDecisionsRecorder(); + DataStreamAutoShardingService.PeriodicDecisionLogger periodicDecisionLogger = + new DataStreamAutoShardingService.PeriodicDecisionLogger(clock::get, recorder::record); + + // Pass in 3 decisions. Update the clock before the last decision. The highest load decision has an increment result. + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(1)); + periodicDecisionLogger.maybeLogDecision(createIncreaseDecision(3)); + clock.set(start + DataStreamAutoShardingService.PeriodicDecisionLogger.FLUSH_INTERVAL_MILLIS); + periodicDecisionLogger.maybeLogDecision(createNoChangeDecision(2)); + + // Assert that we correctly separated the increase and the non-increase decisions. + assertThat( + recorder.highestLoadIncreaseDecisions.stream().map(d -> d.inputs().dataStream()).toList(), + containsInAnyOrder("data-stream-3") + ); + assertThat( + recorder.highestLoadNonIncreaseDecisions.stream().map(d -> d.inputs().dataStream()).toList(), + containsInAnyOrder("data-stream-1", "data-stream-2") + ); + } + + private DataStreamAutoShardingService.Decision createNoChangeDecision(int writeIndexLoad) { + return new Decision( + createDecisionInputsForPeriodLoggerTests(writeIndexLoad), + new Decision.IncreaseCalculation(1.0 * writeIndexLoad, 2, null), + null, + new AutoShardingResult(AutoShardingType.NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO) + ); + } + + private DataStreamAutoShardingService.Decision createIncreaseDecision(int writeIndexLoad) { + AutoShardingResult result = new AutoShardingResult(INCREASE_SHARDS, 3, 4, TimeValue.ZERO); + return new Decision( + createDecisionInputsForPeriodLoggerTests(writeIndexLoad), + new Decision.IncreaseCalculation(1.0 * writeIndexLoad, 4, result), + null, + result + ); + } + + private static Decision.Inputs createDecisionInputsForPeriodLoggerTests(int writeIndexLoadForIncreaseAndDataStreamName) { + return new Decision.Inputs( + TimeValue.timeValueSeconds(270), + TimeValue.timeValueDays(3), + 2, + 32, + WriteLoadMetric.PEAK, + WriteLoadMetric.ALL_TIME, + "data-stream-" + writeIndexLoadForIncreaseAndDataStreamName, + "the-write-index", + 0.1, + 0.2, + 1.0 * writeIndexLoadForIncreaseAndDataStreamName, + 3 + ); + } }