Skip to content

Commit 7a8a20c

Browse files
Respond to comments from Lee
1 parent 58586ee commit 7a8a20c

File tree

1 file changed

+66
-16
lines changed

1 file changed

+66
-16
lines changed

server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,20 @@ public enum WriteLoadMetric {
165165
private volatile WriteLoadMetric decreaseShardsMetric;
166166

167167
public DataStreamAutoShardingService(Settings settings, ClusterService clusterService, LongSupplier nowSupplier) {
168-
this(settings, clusterService, nowSupplier, null);
168+
this(settings, clusterService, nowSupplier, createPeriodLoggingDecisionConsumer(nowSupplier));
169+
}
170+
171+
private static Consumer<Decision> createPeriodLoggingDecisionConsumer(LongSupplier nowSupplier) {
172+
PeriodicDecisionLogger periodicDecisionLogger = new PeriodicDecisionLogger(nowSupplier);
173+
return periodicDecisionLogger::maybeLogDecision;
169174
}
170175

171176
// Exists to allow a fake decision logger to be injected in tests
172177
DataStreamAutoShardingService(
173178
Settings settings,
174179
ClusterService clusterService,
175180
LongSupplier nowSupplier,
176-
@Nullable Consumer<Decision> decisionLogger
181+
Consumer<Decision> decisionLogger
177182
) {
178183
this.clusterService = clusterService;
179184
this.isAutoShardingEnabled = settings.getAsBoolean(DATA_STREAMS_AUTO_SHARDING_ENABLED, false);
@@ -185,12 +190,7 @@ public DataStreamAutoShardingService(Settings settings, ClusterService clusterSe
185190
this.increaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC.get(settings);
186191
this.decreaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC.get(settings);
187192
this.nowSupplier = nowSupplier;
188-
if (decisionLogger == null) {
189-
PeriodicDecisionLogger periodicDecisionLogger = new PeriodicDecisionLogger(nowSupplier);
190-
this.decisionLogger = periodicDecisionLogger::maybeLogDecision;
191-
} else {
192-
this.decisionLogger = decisionLogger;
193-
}
193+
this.decisionLogger = decisionLogger;
194194
}
195195

196196
public void init() {
@@ -209,13 +209,48 @@ public void init() {
209209
}
210210

211211
// package-private for testing
212+
213+
/**
214+
* Contains all the information relating to a decision made by this service: the inputs, the calculations made, and the resulting
215+
* recommendation.
216+
*
217+
* @param inputs The inputs into the decision
218+
* @param increaseCalculation The results of the calculation to determine whether to increase the number of shards
219+
* @param decreaseCalculation The results of the calculation to determine whether to decrease the number of shards, or null of the
220+
* service already decided to increase the number (the decrease calculation is skipped in that case)
221+
* @param result The resulting recommendation to be returned from the service
222+
*/
212223
record Decision(
213224
Inputs inputs,
214225
IncreaseCalculation increaseCalculation,
215226
@Nullable DecreaseCalculation decreaseCalculation,
216227
AutoShardingResult result
217228
) {
218229

230+
/**
231+
* Contains the inputs to a decision.
232+
*
233+
* @param increaseShardsCooldown The time since the last auto-sharding to wait before increasing the shards
234+
* (see the {@link #DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN} cluster setting)
235+
* @param decreaseShardsCooldown The time since the last auto-sharding to wait before decreasing the shards - or, if there was no
236+
* previous auto-sharding, the time since the first backing index was created
237+
* (see the {@link #DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN} cluster setting)
238+
* @param minWriteThreads The minimum number of write threads per shard (see the {@link #CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS}
239+
* cluster setting)
240+
* @param maxWriteThreads The maximum number of write threads per shard (see the {@link #CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS}
241+
* cluster setting)
242+
* @param increaseShardsMetric Which load metric to use for the increase shards calculation
243+
* (see the {@link #DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC} cluster setting)
244+
* @param decreaseShardsMetric Which load metric to use for the decrease shards calculation
245+
* (see the {@link #DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC} cluster setting)
246+
* @param dataStream The name of the data stream
247+
* @param writeIndex The name of the current write index
248+
* @param writeIndexAllTimeLoad The all-time load metric for the write index (see {@link IndexingStats.Stats#getWriteLoad()})
249+
* @param writeIndexRecentLoad The recent load metric for the current write index
250+
* (see {@link IndexingStats.Stats#getRecentWriteLoad()})
251+
* @param writeIndexPeakLoad The peak load metric for the write index (see {@link IndexingStats.Stats#getPeakWriteLoad()})
252+
* @param currentNumberOfWriteIndexShards The number of shards for the current write index
253+
*/
219254
record Inputs(
220255
TimeValue increaseShardsCooldown,
221256
TimeValue decreaseShardsCooldown,
@@ -231,12 +266,31 @@ record Inputs(
231266
int currentNumberOfWriteIndexShards
232267
) {}
233268

269+
/**
270+
* Contains details from the increase shards calculation.
271+
*
272+
* @param writeIndexLoadForIncrease The load considered for the increase shards calculation (i.e. one of write index load metrics
273+
* from the {@link Decision.Inputs}, as determined by {@link Decision.Inputs#increaseShardsMetric})
274+
* @param optimalShardCountForIncrease The optimal shard count determined based on the load
275+
* @param increaseResult If the optimal shard count is greater than the current shard count, a recommendation to increase the number
276+
* of shards (possibly after a cooldown period); otherwise, null
277+
*/
234278
record IncreaseCalculation(
235279
double writeIndexLoadForIncrease,
236280
int optimalShardCountForIncrease,
237281
@Nullable AutoShardingResult increaseResult
238282
) {}
239283

284+
/**
285+
* Contains details from the increase shards calculation.
286+
*
287+
* @param maxLoadWithinCooldown The load considered for the decrease shards calculation (i.e. a load metric determined by
288+
* {@link Decision.Inputs#increaseShardsMetric}, either for the write index or an older one within the cooldown period) and a
289+
* record of which index that corresponded to
290+
* @param optimalShardCountForDecrease The optimal shard count determined based on the load
291+
* @param decreaseResult If the optimal shard count is less than the current shard count, a recommendation to decrease the number
292+
* of shards (possibly after a cooldown period); otherwise, null
293+
*/
240294
record DecreaseCalculation(
241295
MaxLoadWithinCooldown maxLoadWithinCooldown,
242296
int optimalShardCountForDecrease,
@@ -636,7 +690,7 @@ private static Double optionalDoubleToNullable(OptionalDouble optional) {
636690
private static class DecisionBuffer {
637691

638692
private final Comparator<Decision> comparator;
639-
private final PriorityQueue<Decision> queue;
693+
private final PriorityQueue<Decision> queue; // This is a Lucene PriorityQueue, which is bounded, unlike the JDK one.
640694

641695
DecisionBuffer(int maxSize, Comparator<Decision> comparator) {
642696
this.comparator = comparator;
@@ -684,20 +738,16 @@ static class PeriodicDecisionLogger {
684738
private final DecisionBuffer highestLoadNonIncreaseDecisions;
685739

686740
PeriodicDecisionLogger(LongSupplier nowSupplier) {
687-
this(nowSupplier, null);
741+
this(nowSupplier, PeriodicDecisionLogger::logFlushedDecision);
688742
}
689743

690744
// Exists to allow a fake logger to be injected in tests
691-
PeriodicDecisionLogger(LongSupplier nowSupplier, @Nullable Consumer<FlushedDecisions> logConsumer) {
745+
PeriodicDecisionLogger(LongSupplier nowSupplier, Consumer<FlushedDecisions> logConsumer) {
692746
this.nowSupplier = nowSupplier;
693747
this.highestLoadIncreaseDecisions = new DecisionBuffer(BUFFER_SIZE, HIGHEST_LOAD_COMPARATOR);
694748
this.highestLoadNonIncreaseDecisions = new DecisionBuffer(BUFFER_SIZE, HIGHEST_LOAD_COMPARATOR);
695749
this.lastFlushMillis = new AtomicLong(nowSupplier.getAsLong());
696-
if (logConsumer == null) {
697-
this.logConsumer = PeriodicDecisionLogger::logFlushedDecision;
698-
} else {
699-
this.logConsumer = logConsumer;
700-
}
750+
this.logConsumer = logConsumer;
701751
}
702752

703753
record FlushedDecisions(List<Decision> highestLoadIncreaseDecisions, List<Decision> highestLoadNonIncreaseDecisions) {}

0 commit comments

Comments
 (0)