Skip to content

Commit 1c1c43d

Browse files
Calculate recent write load in indexing stats
This uses the recently-added `ExponentiallyWeightedMovingRate` class to calculate a write load which favours more recent load and include this alongside the existing unweighted all-time write load in `IndexingStats.Stats`. As of this change, the new load metric is not used anywhere, although it can be retrieved with the index stats or node stats APIs.
1 parent 0b6a3cd commit 1c1c43d

File tree

26 files changed

+460
-62
lines changed

26 files changed

+460
-62
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
527527
CommonStats stats = new CommonStats();
528528
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
529529
stats.store = new StoreStats();
530-
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1));
530+
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123));
531531
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
532532
}
533533

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.UUIDs;
3232
import org.elasticsearch.common.bytes.BytesArray;
3333
import org.elasticsearch.common.lucene.uid.Versions;
34+
import org.elasticsearch.common.settings.ClusterSettings;
3435
import org.elasticsearch.common.settings.Settings;
3536
import org.elasticsearch.common.unit.ByteSizeUnit;
3637
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -635,7 +636,8 @@ public static final IndexShard newIndexShard(
635636
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
636637
System::nanoTime,
637638
null,
638-
MapperMetrics.NOOP
639+
MapperMetrics.NOOP,
640+
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings())
639641
);
640642
}
641643

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ static TransportVersion def(int id) {
186186
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
187187
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
188188
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(9_030_00_0);
189+
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_031_0_00);
189190

190191
/*
191192
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRate.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import static java.lang.Math.exp;
1313
import static java.lang.Math.expm1;
14+
import static java.lang.Math.log;
1415

1516
/**
1617
* Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to
@@ -41,7 +42,8 @@ public class ExponentiallyWeightedMovingRate {
4142
private final double lambda;
4243
private final long startTime;
4344
private double rate;
44-
long lastTime;
45+
private long lastTime;
46+
private boolean waitingForFirstIncrement;
4547

4648
/**
4749
* Constructor.
@@ -57,14 +59,12 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) {
5759
if (lambda < 0.0) {
5860
throw new IllegalArgumentException("lambda must be non-negative but was " + lambda);
5961
}
60-
if (startTime <= 0.0) {
61-
throw new IllegalArgumentException("startTime must be non-negative but was " + startTime);
62-
}
6362
synchronized (this) {
6463
this.lambda = lambda;
6564
this.rate = Double.NaN; // should never be used
6665
this.startTime = startTime;
67-
this.lastTime = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first
66+
this.lastTime = 0; // should never be used
67+
this.waitingForFirstIncrement = true;
6868
}
6969
}
7070

@@ -80,7 +80,7 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) {
8080
*/
8181
public double getRate(long time) {
8282
synchronized (this) {
83-
if (lastTime == 0) { // indicates that no increment has happened yet
83+
if (waitingForFirstIncrement) {
8484
return 0.0;
8585
} else if (time <= lastTime) {
8686
return rate;
@@ -104,6 +104,9 @@ public double getRate(long time) {
104104
* instance. It is only non-static because it uses this instance's {@code lambda} and {@code startTime}.
105105
*/
106106
public double calculateRateSince(long currentTime, double currentRate, long oldTime, double oldRate) {
107+
if (oldTime < startTime) {
108+
oldTime = startTime;
109+
}
107110
if (currentTime <= oldTime) {
108111
return 0.0;
109112
}
@@ -127,12 +130,13 @@ public double calculateRateSince(long currentTime, double currentRate, long oldT
127130
*/
128131
public void addIncrement(double increment, long time) {
129132
synchronized (this) {
130-
if (lastTime == 0) { // indicates that this is the first increment
133+
if (waitingForFirstIncrement) {
131134
if (time <= startTime) {
132135
time = startTime + 1;
133136
}
134137
// This is the formula for R(t_1) given in subsection 2.6 of the document referenced above:
135138
rate = increment / expHelper(time - startTime);
139+
waitingForFirstIncrement = false;
136140
} else {
137141
if (time < lastTime) {
138142
time = lastTime;
@@ -165,4 +169,12 @@ private double expHelper(double time) {
165169
return time * (1.0 - 0.5 * lambdaTime);
166170
}
167171
}
172+
173+
/**
174+
* Returns the configured half-life of this instance. The units are the same as all other times in API calls, and the inverse of the
175+
* units used for the {@code lambda} constructor parameter. If {@code lambda} is {@code 0.0}, returns {@link Double#POSITIVE_INFINITY}.
176+
*/
177+
public double getHalfLife() {
178+
return log(2.0) / lambda;
179+
}
168180
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.elasticsearch.index.IndexSettings;
8989
import org.elasticsearch.index.IndexingPressure;
9090
import org.elasticsearch.index.MergePolicyConfig;
91+
import org.elasticsearch.index.shard.IndexingStatsSettings;
9192
import org.elasticsearch.indices.IndexingMemoryController;
9293
import org.elasticsearch.indices.IndicesQueryCache;
9394
import org.elasticsearch.indices.IndicesRequestCache;
@@ -629,6 +630,7 @@ public void apply(Settings value, Settings current, Settings previous) {
629630
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
630631
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
631632
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
632-
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
633+
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
634+
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING
633635
).filter(Objects::nonNull).collect(toSet());
634636
}

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.index.mapper.MapperService;
5050
import org.elasticsearch.index.shard.IndexEventListener;
5151
import org.elasticsearch.index.shard.IndexingOperationListener;
52+
import org.elasticsearch.index.shard.IndexingStatsSettings;
5253
import org.elasticsearch.index.shard.SearchOperationListener;
5354
import org.elasticsearch.index.shard.ShardPath;
5455
import org.elasticsearch.index.similarity.SimilarityService;
@@ -176,6 +177,7 @@ public interface DirectoryWrapper {
176177
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
177178
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
178179
private final MapperMetrics mapperMetrics;
180+
private final IndexingStatsSettings indexingStatsSettings;
179181

180182
/**
181183
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -196,7 +198,8 @@ public IndexModule(
196198
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
197199
final SlowLogFieldProvider slowLogFieldProvider,
198200
final MapperMetrics mapperMetrics,
199-
final List<SearchOperationListener> searchOperationListeners
201+
final List<SearchOperationListener> searchOperationListeners,
202+
final IndexingStatsSettings indexingStatsSettings
200203
) {
201204
this.indexSettings = indexSettings;
202205
this.analysisRegistry = analysisRegistry;
@@ -211,6 +214,7 @@ public IndexModule(
211214
this.expressionResolver = expressionResolver;
212215
this.recoveryStateFactories = recoveryStateFactories;
213216
this.mapperMetrics = mapperMetrics;
217+
this.indexingStatsSettings = indexingStatsSettings;
214218
}
215219

216220
/**
@@ -544,7 +548,8 @@ public IndexService newIndexService(
544548
snapshotCommitSupplier,
545549
indexCommitListener.get(),
546550
mapperMetrics,
547-
queryRewriteInterceptor
551+
queryRewriteInterceptor,
552+
indexingStatsSettings
548553
);
549554
success = true;
550555
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.index.shard.IndexShard;
7373
import org.elasticsearch.index.shard.IndexShardClosedException;
7474
import org.elasticsearch.index.shard.IndexingOperationListener;
75+
import org.elasticsearch.index.shard.IndexingStatsSettings;
7576
import org.elasticsearch.index.shard.SearchOperationListener;
7677
import org.elasticsearch.index.shard.ShardId;
7778
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -164,6 +165,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
164165
private final ValuesSourceRegistry valuesSourceRegistry;
165166
private final MapperMetrics mapperMetrics;
166167
private final QueryRewriteInterceptor queryRewriteInterceptor;
168+
private final IndexingStatsSettings indexingStatsSettings;
167169

168170
@SuppressWarnings("this-escape")
169171
public IndexService(
@@ -199,7 +201,8 @@ public IndexService(
199201
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
200202
Engine.IndexCommitListener indexCommitListener,
201203
MapperMetrics mapperMetrics,
202-
QueryRewriteInterceptor queryRewriteInterceptor
204+
QueryRewriteInterceptor queryRewriteInterceptor,
205+
IndexingStatsSettings indexingStatsSettings
203206
) {
204207
super(indexSettings);
205208
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@@ -283,6 +286,7 @@ public IndexService(
283286
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
284287
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
285288
}
289+
this.indexingStatsSettings = indexingStatsSettings;
286290
updateFsyncTaskIfNecessary();
287291
}
288292

@@ -566,7 +570,8 @@ public synchronized IndexShard createShard(
566570
snapshotCommitSupplier,
567571
System::nanoTime,
568572
indexCommitListener,
569-
mapperMetrics
573+
mapperMetrics,
574+
indexingStatsSettings
570575
);
571576
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
572577
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
297297
private final LongSupplier relativeTimeInNanosSupplier;
298298
private volatile long startedRelativeTimeInNanos;
299299
private volatile long indexingTimeBeforeShardStartedInNanos;
300+
private volatile double recentIndexingLoadAtShardStarted;
300301
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
301302

302303
// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead
@@ -326,7 +327,8 @@ public IndexShard(
326327
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
327328
final LongSupplier relativeTimeInNanosSupplier,
328329
final Engine.IndexCommitListener indexCommitListener,
329-
final MapperMetrics mapperMetrics
330+
final MapperMetrics mapperMetrics,
331+
final IndexingStatsSettings indexingStatsSettings
330332
) throws IOException {
331333
super(shardRouting.shardId(), indexSettings);
332334
assert shardRouting.initializing();
@@ -344,7 +346,7 @@ public IndexShard(
344346
this.threadPool = threadPool;
345347
this.mapperService = mapperService;
346348
this.indexCache = indexCache;
347-
this.internalIndexingStats = new InternalIndexingStats();
349+
this.internalIndexingStats = new InternalIndexingStats(relativeTimeInNanosSupplier, indexingStatsSettings);
348350
var indexingFailuresDebugListener = new IndexingFailuresDebugListener(this);
349351
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(
350352
CollectionUtils.appendToCopyNoNullElements(listeners, internalIndexingStats, indexingFailuresDebugListener),
@@ -552,6 +554,7 @@ public void updateShardState(
552554
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
553555
startedRelativeTimeInNanos = getRelativeTimeInNanos();
554556
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
557+
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
555558
} else if (currentRouting.primary()
556559
&& currentRouting.relocating()
557560
&& replicationTracker.isRelocated()
@@ -1361,11 +1364,14 @@ public IndexingStats indexingStats() {
13611364
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
13621365
}
13631366

1367+
long currentTimeInNanos = getRelativeTimeInNanos();
13641368
return internalIndexingStats.stats(
13651369
throttled,
13661370
throttleTimeInMillis,
13671371
indexingTimeBeforeShardStartedInNanos,
1368-
getRelativeTimeInNanos() - startedRelativeTimeInNanos
1372+
currentTimeInNanos - startedRelativeTimeInNanos,
1373+
currentTimeInNanos,
1374+
recentIndexingLoadAtShardStarted
13691375
);
13701376
}
13711377

0 commit comments

Comments
 (0)