Skip to content

Commit 5da90fb

Browse files
PeteGillinElasticsmalyshev
authored andcommitted
Calculate recent write load in indexing stats (elastic#124652)
This uses the recently-added `ExponentiallyWeightedMovingRate` class to calculate a write load which favours more recent load and include this alongside the existing unweighted all-time write load in `IndexingStats.Stats`. As of this change, the new load metric is not used anywhere, although it can be retrieved with the index stats or node stats APIs.
1 parent d42529b commit 5da90fb

File tree

26 files changed

+468
-62
lines changed

26 files changed

+468
-62
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
527527
CommonStats stats = new CommonStats();
528528
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
529529
stats.store = new StoreStats();
530-
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1));
530+
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123));
531531
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
532532
}
533533

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.UUIDs;
3232
import org.elasticsearch.common.bytes.BytesArray;
3333
import org.elasticsearch.common.lucene.uid.Versions;
34+
import org.elasticsearch.common.settings.ClusterSettings;
3435
import org.elasticsearch.common.settings.Settings;
3536
import org.elasticsearch.common.unit.ByteSizeUnit;
3637
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -636,7 +637,8 @@ public static final IndexShard newIndexShard(
636637
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
637638
System::nanoTime,
638639
null,
639-
MapperMetrics.NOOP
640+
MapperMetrics.NOOP,
641+
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings())
640642
);
641643
}
642644

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ static TransportVersion def(int id) {
191191
public static final TransportVersion INDEX_RESHARDING_METADATA = def(9_031_0_00);
192192
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA = def(9_032_0_00);
193193
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
194+
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
194195

195196
/*
196197
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRate.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import static java.lang.Math.exp;
1313
import static java.lang.Math.expm1;
14+
import static java.lang.Math.log;
1415

1516
/**
1617
* Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to
@@ -41,7 +42,8 @@ public class ExponentiallyWeightedMovingRate {
4142
private final double lambda;
4243
private final long startTime;
4344
private double rate;
44-
long lastTime;
45+
private long lastTime;
46+
private boolean waitingForFirstIncrement;
4547

4648
/**
4749
* Constructor.
@@ -57,14 +59,12 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) {
5759
if (lambda < 0.0) {
5860
throw new IllegalArgumentException("lambda must be non-negative but was " + lambda);
5961
}
60-
if (startTime <= 0.0) {
61-
throw new IllegalArgumentException("startTime must be non-negative but was " + startTime);
62-
}
6362
synchronized (this) {
6463
this.lambda = lambda;
6564
this.rate = Double.NaN; // should never be used
6665
this.startTime = startTime;
67-
this.lastTime = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first
66+
this.lastTime = 0; // should never be used
67+
this.waitingForFirstIncrement = true;
6868
}
6969
}
7070

@@ -80,7 +80,7 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) {
8080
*/
8181
public double getRate(long time) {
8282
synchronized (this) {
83-
if (lastTime == 0) { // indicates that no increment has happened yet
83+
if (waitingForFirstIncrement) {
8484
return 0.0;
8585
} else if (time <= lastTime) {
8686
return rate;
@@ -104,6 +104,9 @@ public double getRate(long time) {
104104
* instance. It is only non-static because it uses this instance's {@code lambda} and {@code startTime}.
105105
*/
106106
public double calculateRateSince(long currentTime, double currentRate, long oldTime, double oldRate) {
107+
if (oldTime < startTime) {
108+
oldTime = startTime;
109+
}
107110
if (currentTime <= oldTime) {
108111
return 0.0;
109112
}
@@ -127,12 +130,13 @@ public double calculateRateSince(long currentTime, double currentRate, long oldT
127130
*/
128131
public void addIncrement(double increment, long time) {
129132
synchronized (this) {
130-
if (lastTime == 0) { // indicates that this is the first increment
133+
if (waitingForFirstIncrement) {
131134
if (time <= startTime) {
132135
time = startTime + 1;
133136
}
134137
// This is the formula for R(t_1) given in subsection 2.6 of the document referenced above:
135138
rate = increment / expHelper(time - startTime);
139+
waitingForFirstIncrement = false;
136140
} else {
137141
if (time < lastTime) {
138142
time = lastTime;
@@ -165,4 +169,12 @@ private double expHelper(double time) {
165169
return time * (1.0 - 0.5 * lambdaTime);
166170
}
167171
}
172+
173+
/**
174+
* Returns the configured half-life of this instance. The units are the same as all other times in API calls, and the inverse of the
175+
* units used for the {@code lambda} constructor parameter. If {@code lambda} is {@code 0.0}, returns {@link Double#POSITIVE_INFINITY}.
176+
*/
177+
public double getHalfLife() {
178+
return log(2.0) / lambda;
179+
}
168180
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.elasticsearch.index.IndexingPressure;
9090
import org.elasticsearch.index.MergePolicyConfig;
9191
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
92+
import org.elasticsearch.index.shard.IndexingStatsSettings;
9293
import org.elasticsearch.indices.IndexingMemoryController;
9394
import org.elasticsearch.indices.IndicesQueryCache;
9495
import org.elasticsearch.indices.IndicesRequestCache;
@@ -631,6 +632,7 @@ public void apply(Settings value, Settings current, Settings previous) {
631632
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
632633
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
633634
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
634-
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
635+
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
636+
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING
635637
).filter(Objects::nonNull).collect(toSet());
636638
}

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.index.mapper.MapperService;
5151
import org.elasticsearch.index.shard.IndexEventListener;
5252
import org.elasticsearch.index.shard.IndexingOperationListener;
53+
import org.elasticsearch.index.shard.IndexingStatsSettings;
5354
import org.elasticsearch.index.shard.SearchOperationListener;
5455
import org.elasticsearch.index.shard.ShardPath;
5556
import org.elasticsearch.index.similarity.SimilarityService;
@@ -177,6 +178,7 @@ public interface DirectoryWrapper {
177178
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
178179
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
179180
private final MapperMetrics mapperMetrics;
181+
private final IndexingStatsSettings indexingStatsSettings;
180182

181183
/**
182184
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -197,7 +199,8 @@ public IndexModule(
197199
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
198200
final SlowLogFieldProvider slowLogFieldProvider,
199201
final MapperMetrics mapperMetrics,
200-
final List<SearchOperationListener> searchOperationListeners
202+
final List<SearchOperationListener> searchOperationListeners,
203+
final IndexingStatsSettings indexingStatsSettings
201204
) {
202205
this.indexSettings = indexSettings;
203206
this.analysisRegistry = analysisRegistry;
@@ -212,6 +215,7 @@ public IndexModule(
212215
this.expressionResolver = expressionResolver;
213216
this.recoveryStateFactories = recoveryStateFactories;
214217
this.mapperMetrics = mapperMetrics;
218+
this.indexingStatsSettings = indexingStatsSettings;
215219
}
216220

217221
/**
@@ -547,7 +551,8 @@ public IndexService newIndexService(
547551
snapshotCommitSupplier,
548552
indexCommitListener.get(),
549553
mapperMetrics,
550-
queryRewriteInterceptor
554+
queryRewriteInterceptor,
555+
indexingStatsSettings
551556
);
552557
success = true;
553558
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.elasticsearch.index.shard.IndexShard;
7474
import org.elasticsearch.index.shard.IndexShardClosedException;
7575
import org.elasticsearch.index.shard.IndexingOperationListener;
76+
import org.elasticsearch.index.shard.IndexingStatsSettings;
7677
import org.elasticsearch.index.shard.SearchOperationListener;
7778
import org.elasticsearch.index.shard.ShardId;
7879
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -167,6 +168,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
167168
private final ValuesSourceRegistry valuesSourceRegistry;
168169
private final MapperMetrics mapperMetrics;
169170
private final QueryRewriteInterceptor queryRewriteInterceptor;
171+
private final IndexingStatsSettings indexingStatsSettings;
170172

171173
@SuppressWarnings("this-escape")
172174
public IndexService(
@@ -203,7 +205,8 @@ public IndexService(
203205
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
204206
Engine.IndexCommitListener indexCommitListener,
205207
MapperMetrics mapperMetrics,
206-
QueryRewriteInterceptor queryRewriteInterceptor
208+
QueryRewriteInterceptor queryRewriteInterceptor,
209+
IndexingStatsSettings indexingStatsSettings
207210
) {
208211
super(indexSettings);
209212
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@@ -288,6 +291,7 @@ public IndexService(
288291
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
289292
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
290293
}
294+
this.indexingStatsSettings = indexingStatsSettings;
291295
updateFsyncTaskIfNecessary();
292296
}
293297

@@ -572,7 +576,8 @@ public synchronized IndexShard createShard(
572576
snapshotCommitSupplier,
573577
System::nanoTime,
574578
indexCommitListener,
575-
mapperMetrics
579+
mapperMetrics,
580+
indexingStatsSettings
576581
);
577582
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
578583
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
300300
private final LongSupplier relativeTimeInNanosSupplier;
301301
private volatile long startedRelativeTimeInNanos;
302302
private volatile long indexingTimeBeforeShardStartedInNanos;
303+
private volatile double recentIndexingLoadAtShardStarted;
303304
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
304305

305306
// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead
@@ -330,7 +331,8 @@ public IndexShard(
330331
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
331332
final LongSupplier relativeTimeInNanosSupplier,
332333
final Engine.IndexCommitListener indexCommitListener,
333-
final MapperMetrics mapperMetrics
334+
final MapperMetrics mapperMetrics,
335+
final IndexingStatsSettings indexingStatsSettings
334336
) throws IOException {
335337
super(shardRouting.shardId(), indexSettings);
336338
assert shardRouting.initializing();
@@ -349,7 +351,7 @@ public IndexShard(
349351
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
350352
this.mapperService = mapperService;
351353
this.indexCache = indexCache;
352-
this.internalIndexingStats = new InternalIndexingStats();
354+
this.internalIndexingStats = new InternalIndexingStats(relativeTimeInNanosSupplier, indexingStatsSettings);
353355
var indexingFailuresDebugListener = new IndexingFailuresDebugListener(this);
354356
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(
355357
CollectionUtils.appendToCopyNoNullElements(listeners, internalIndexingStats, indexingFailuresDebugListener),
@@ -557,6 +559,7 @@ public void updateShardState(
557559
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
558560
startedRelativeTimeInNanos = getRelativeTimeInNanos();
559561
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
562+
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
560563
} else if (currentRouting.primary()
561564
&& currentRouting.relocating()
562565
&& replicationTracker.isRelocated()
@@ -1366,11 +1369,14 @@ public IndexingStats indexingStats() {
13661369
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
13671370
}
13681371

1372+
long currentTimeInNanos = getRelativeTimeInNanos();
13691373
return internalIndexingStats.stats(
13701374
throttled,
13711375
throttleTimeInMillis,
13721376
indexingTimeBeforeShardStartedInNanos,
1373-
getRelativeTimeInNanos() - startedRelativeTimeInNanos
1377+
currentTimeInNanos - startedRelativeTimeInNanos,
1378+
currentTimeInNanos,
1379+
recentIndexingLoadAtShardStarted
13741380
);
13751381
}
13761382

0 commit comments

Comments
 (0)