diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java index adb8267c6b341..e337d351d3483 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex, CommonStats stats = new CommonStats(); stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); stats.store = new StoreStats(); - stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1)); + stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123)); return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index dd32e6b30fb7b..41307b92a61a4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -636,7 +637,8 @@ public static final IndexShard newIndexShard( IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, System::nanoTime, null, - MapperMetrics.NOOP + MapperMetrics.NOOP, + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index aec57af92f4ad..ff6785362b5c7 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -191,6 +191,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_RESHARDING_METADATA = def(9_031_0_00); public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA = def(9_032_0_00); public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00); + public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRate.java b/server/src/main/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRate.java index 0b7c915dd5347..4858b40e7a486 100644 --- a/server/src/main/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRate.java +++ b/server/src/main/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRate.java @@ -11,6 +11,7 @@ import static java.lang.Math.exp; import static java.lang.Math.expm1; +import static java.lang.Math.log; /** * Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to @@ -41,7 +42,8 @@ public class ExponentiallyWeightedMovingRate { private final double lambda; private final long startTime; private double rate; - long lastTime; + private long lastTime; + private boolean waitingForFirstIncrement; /** * Constructor. @@ -57,14 +59,12 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) { if (lambda < 0.0) { throw new IllegalArgumentException("lambda must be non-negative but was " + lambda); } - if (startTime <= 0.0) { - throw new IllegalArgumentException("startTime must be non-negative but was " + startTime); - } synchronized (this) { this.lambda = lambda; this.rate = Double.NaN; // should never be used this.startTime = startTime; - this.lastTime = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first + this.lastTime = 0; // should never be used + this.waitingForFirstIncrement = true; } } @@ -80,7 +80,7 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) { */ public double getRate(long time) { synchronized (this) { - if (lastTime == 0) { // indicates that no increment has happened yet + if (waitingForFirstIncrement) { return 0.0; } else if (time <= lastTime) { return rate; @@ -104,6 +104,9 @@ public double getRate(long time) { * instance. It is only non-static because it uses this instance's {@code lambda} and {@code startTime}. */ public double calculateRateSince(long currentTime, double currentRate, long oldTime, double oldRate) { + if (oldTime < startTime) { + oldTime = startTime; + } if (currentTime <= oldTime) { return 0.0; } @@ -127,12 +130,13 @@ public double calculateRateSince(long currentTime, double currentRate, long oldT */ public void addIncrement(double increment, long time) { synchronized (this) { - if (lastTime == 0) { // indicates that this is the first increment + if (waitingForFirstIncrement) { if (time <= startTime) { time = startTime + 1; } // This is the formula for R(t_1) given in subsection 2.6 of the document referenced above: rate = increment / expHelper(time - startTime); + waitingForFirstIncrement = false; } else { if (time < lastTime) { time = lastTime; @@ -165,4 +169,12 @@ private double expHelper(double time) { return time * (1.0 - 0.5 * lambdaTime); } } + + /** + * Returns the configured half-life of this instance. The units are the same as all other times in API calls, and the inverse of the + * units used for the {@code lambda} constructor parameter. If {@code lambda} is {@code 0.0}, returns {@link Double#POSITIVE_INFINITY}. + */ + public double getHalfLife() { + return log(2.0) / lambda; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 025626fbed9d3..e392e829b06ee 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -89,6 +89,7 @@ import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -631,6 +632,7 @@ public void apply(Settings value, Settings current, Settings previous) { DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING, DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING, ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME, - DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null + DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null, + IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING ).filter(Objects::nonNull).collect(toSet()); } diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 6cd63b3c0047d..3418d8a9b7b2e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexingOperationListener; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.similarity.SimilarityService; @@ -177,6 +178,7 @@ public interface DirectoryWrapper { private final Map recoveryStateFactories; private final SetOnce indexCommitListener = new SetOnce<>(); private final MapperMetrics mapperMetrics; + private final IndexingStatsSettings indexingStatsSettings; /** * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins @@ -197,7 +199,8 @@ public IndexModule( final Map recoveryStateFactories, final SlowLogFieldProvider slowLogFieldProvider, final MapperMetrics mapperMetrics, - final List searchOperationListeners + final List searchOperationListeners, + final IndexingStatsSettings indexingStatsSettings ) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; @@ -212,6 +215,7 @@ public IndexModule( this.expressionResolver = expressionResolver; this.recoveryStateFactories = recoveryStateFactories; this.mapperMetrics = mapperMetrics; + this.indexingStatsSettings = indexingStatsSettings; } /** @@ -547,7 +551,8 @@ public IndexService newIndexService( snapshotCommitSupplier, indexCommitListener.get(), mapperMetrics, - queryRewriteInterceptor + queryRewriteInterceptor, + indexingStatsSettings ); success = true; return indexService; diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index ee7a3038bb8b8..cdf1ad177f1e4 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -73,6 +73,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexingOperationListener; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -167,6 +168,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final ValuesSourceRegistry valuesSourceRegistry; private final MapperMetrics mapperMetrics; private final QueryRewriteInterceptor queryRewriteInterceptor; + private final IndexingStatsSettings indexingStatsSettings; @SuppressWarnings("this-escape") public IndexService( @@ -203,7 +205,8 @@ public IndexService( IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, Engine.IndexCommitListener indexCommitListener, MapperMetrics mapperMetrics, - QueryRewriteInterceptor queryRewriteInterceptor + QueryRewriteInterceptor queryRewriteInterceptor, + IndexingStatsSettings indexingStatsSettings ) { super(indexSettings); assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS @@ -288,6 +291,7 @@ public IndexService( this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); } + this.indexingStatsSettings = indexingStatsSettings; updateFsyncTaskIfNecessary(); } @@ -572,7 +576,8 @@ public synchronized IndexShard createShard( snapshotCommitSupplier, System::nanoTime, indexCommitListener, - mapperMetrics + mapperMetrics, + indexingStatsSettings ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c337929eea69e..657a3976c46f7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -300,6 +300,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final LongSupplier relativeTimeInNanosSupplier; private volatile long startedRelativeTimeInNanos; private volatile long indexingTimeBeforeShardStartedInNanos; + private volatile double recentIndexingLoadAtShardStarted; private final SubscribableListener waitForEngineOrClosedShardListeners = new SubscribableListener<>(); // the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead @@ -330,7 +331,8 @@ public IndexShard( final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, final LongSupplier relativeTimeInNanosSupplier, final Engine.IndexCommitListener indexCommitListener, - final MapperMetrics mapperMetrics + final MapperMetrics mapperMetrics, + final IndexingStatsSettings indexingStatsSettings ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -349,7 +351,7 @@ public IndexShard( this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; this.mapperService = mapperService; this.indexCache = indexCache; - this.internalIndexingStats = new InternalIndexingStats(); + this.internalIndexingStats = new InternalIndexingStats(relativeTimeInNanosSupplier, indexingStatsSettings); var indexingFailuresDebugListener = new IndexingFailuresDebugListener(this); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener( CollectionUtils.appendToCopyNoNullElements(listeners, internalIndexingStats, indexingFailuresDebugListener), @@ -557,6 +559,7 @@ public void updateShardState( changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); startedRelativeTimeInNanos = getRelativeTimeInNanos(); indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos(); + recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos); } else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isRelocated() @@ -1366,11 +1369,14 @@ public IndexingStats indexingStats() { throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); } + long currentTimeInNanos = getRelativeTimeInNanos(); return internalIndexingStats.stats( throttled, throttleTimeInMillis, indexingTimeBeforeShardStartedInNanos, - getRelativeTimeInNanos() - startedRelativeTimeInNanos + currentTimeInNanos - startedRelativeTimeInNanos, + currentTimeInNanos, + recentIndexingLoadAtShardStarted ); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java index 62e456d95f467..86e894d91440f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java @@ -25,6 +25,8 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD; + public class IndexingStats implements Writeable, ToXContentFragment { public static class Stats implements Writeable, ToXContentFragment { @@ -43,6 +45,7 @@ public static class Stats implements Writeable, ToXContentFragment { private boolean isThrottled; private long totalIndexingTimeSinceShardStartedInNanos; private long totalActiveTimeInNanos; + private double recentIndexingLoad; Stats() {} @@ -64,6 +67,15 @@ public Stats(StreamInput in) throws IOException { totalIndexingTimeSinceShardStartedInNanos = in.readLong(); totalActiveTimeInNanos = in.readLong(); } + if (in.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) { + recentIndexingLoad = in.readDouble(); + } else { + // When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the + // unweighted write load, rather that assuming zero load: + recentIndexingLoad = totalActiveTimeInNanos > 0 + ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos + : 0; + } } public Stats( @@ -79,7 +91,8 @@ public Stats( boolean isThrottled, long throttleTimeInMillis, long totalIndexingTimeSinceShardStartedInNanos, - long totalActiveTimeInNanos + long totalActiveTimeInNanos, + double recentIndexingLoad ) { this.indexCount = indexCount; this.indexTimeInMillis = indexTimeInMillis; @@ -92,9 +105,11 @@ public Stats( this.noopUpdateCount = noopUpdateCount; this.isThrottled = isThrottled; this.throttleTimeInMillis = throttleTimeInMillis; - // We store the raw write-load values in order to avoid losing precision when we combine the shard stats + // We store the raw unweighted write load values in order to avoid losing precision when we combine the shard stats this.totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeSinceShardStartedInNanos; this.totalActiveTimeInNanos = totalActiveTimeInNanos; + // We store the weighted write load as a double because the calculation is inherently floating point + this.recentIndexingLoad = recentIndexingLoad; } public void add(Stats stats) { @@ -113,8 +128,15 @@ public void add(Stats stats) { if (isThrottled != stats.isThrottled) { isThrottled = true; // When combining if one is throttled set result to throttled. } + // N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time: totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos; totalActiveTimeInNanos += stats.totalActiveTimeInNanos; + // We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating + // formula for a weighted mean: + if (totalActiveTimeInNanos > 0) { + recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos + / totalActiveTimeInNanos; + } } /** @@ -191,10 +213,32 @@ public long getNoopUpdateCount() { return noopUpdateCount; } + /** + * Returns a measurement of the write load. + * + *

If this {@link Stats} instance represents a single shard, this is ratio of the sum of the time taken by every index operations + * since the shard started to the elapsed time since the shard started. + * + *

If this {@link Stats} instance represents multiple shards, this is the average of that ratio for each shard, weighted by + * the elapsed time for each shard. + */ public double getWriteLoad() { return totalActiveTimeInNanos > 0 ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0; } + /** + * Returns a measurement of the write load which favours more recent load. + * + *

If this {@link Stats} instance represents a single shard, this is an Exponentially Weighted Moving Rate based on the time + * taken by indexing operations in this shard since the shard started. + * + *

If this {@link Stats} instance represents multiple shards, this is the average of that ratio for each shard, weighted by + * the elapsed time for each shard. + */ + public double getRecentWriteLoad() { + return recentIndexingLoad; + } + public long getTotalActiveTimeInMillis() { return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos); } @@ -218,6 +262,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalIndexingTimeSinceShardStartedInNanos); out.writeLong(totalActiveTimeInNanos); } + if (out.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) { + out.writeDouble(recentIndexingLoad); + } } @Override @@ -238,6 +285,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime()); builder.field(Fields.WRITE_LOAD, getWriteLoad()); + builder.field(Fields.RECENT_WRITE_LOAD, getRecentWriteLoad()); return builder; } @@ -258,7 +306,8 @@ public boolean equals(Object o) { && isThrottled == that.isThrottled && throttleTimeInMillis == that.throttleTimeInMillis && totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos - && totalActiveTimeInNanos == that.totalActiveTimeInNanos; + && totalActiveTimeInNanos == that.totalActiveTimeInNanos + && recentIndexingLoad == that.recentIndexingLoad; } @Override @@ -358,6 +407,7 @@ static final class Fields { static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis"; static final String THROTTLED_TIME = "throttle_time"; static final String WRITE_LOAD = "write_load"; + static final String RECENT_WRITE_LOAD = "recent_write_load"; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java new file mode 100644 index 0000000000000..c04fba13a14d3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Container for cluster settings related to {@link IndexingStats}. + */ +public class IndexingStatsSettings { + + // TODO: Change this default to something sensible: + static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = new TimeValue(10000, TimeUnit.DAYS); + + /** + * A cluster setting giving the half-life, in seconds, to use for the Exponentially Weighted Moving Rate calculation used for the + * recency-weighted write load returned by {@link IndexingStats.Stats#getRecentWriteLoad()}. + * + *

This is dynamic, but changes only apply to newly-opened shards. + */ + public static final Setting RECENT_WRITE_LOAD_HALF_LIFE_SETTING = Setting.timeSetting( + "indices.stats.recent_write_load.half_life", + RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT, + TimeValue.ZERO, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final AtomicReference recentWriteLoadHalfLifeForNewShards = new AtomicReference<>( + RECENT_WRITE_LOAD_HALF_LIFE_SETTING.getDefault(Settings.EMPTY) + ); + + public IndexingStatsSettings(ClusterSettings clusterSettings) { + clusterSettings.initializeAndWatch(RECENT_WRITE_LOAD_HALF_LIFE_SETTING, recentWriteLoadHalfLifeForNewShards::set); + } + + TimeValue getRecentWriteLoadHalfLifeForNewShards() { + return recentWriteLoadHalfLifeForNewShards.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index 13d270ba36786..f7c5f3de2d497 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -9,13 +9,21 @@ package org.elasticsearch.index.shard; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.ExponentiallyWeightedMovingRate; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +import static org.elasticsearch.core.TimeValue.timeValueNanos; /** * Internal class that maintains relevant indexing statistics / metrics. @@ -23,7 +31,15 @@ */ final class InternalIndexingStats implements IndexingOperationListener { - private final StatsHolder totalStats = new StatsHolder(); + private static final Logger logger = LogManager.getLogger(InternalIndexingStats.class); + + private final LongSupplier relativeTimeInNanosSupplier; + private final StatsHolder totalStats; + + InternalIndexingStats(LongSupplier relativeTimeInNanosSupplier, IndexingStatsSettings settings) { + this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier; + this.totalStats = new StatsHolder(relativeTimeInNanosSupplier.getAsLong(), settings.getRecentWriteLoadHalfLifeForNewShards()); + } /** * Returns the stats, including type specific stats. If the types are null/0 length, then nothing @@ -34,13 +50,17 @@ IndexingStats stats( boolean isThrottled, long currentThrottleInMillis, long indexingTimeBeforeShardStartedInNanos, - long timeSinceShardStartedInNanos + long timeSinceShardStartedInNanos, + long currentTimeInNanos, + double recentIndexingLoadAtShardStarted ) { IndexingStats.Stats total = totalStats.stats( isThrottled, currentThrottleInMillis, indexingTimeBeforeShardStartedInNanos, - timeSinceShardStartedInNanos + timeSinceShardStartedInNanos, + currentTimeInNanos, + recentIndexingLoadAtShardStarted ); return new IndexingStats(total); } @@ -49,6 +69,13 @@ long totalIndexingTimeInNanos() { return totalStats.indexMetric.sum(); } + /** + * Returns an exponentially-weighted moving rate which measures the indexing load, favoring more recent load. + */ + double recentIndexingLoad(long timeInNanos) { + return totalStats.recentIndexMetric.getRate(timeInNanos); + } + @Override public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { if (operation.origin().isRecovery() == false) { @@ -64,6 +91,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re if (index.origin().isRecovery() == false) { long took = result.getTook(); totalStats.indexMetric.inc(took); + totalStats.recentIndexMetric.addIncrement(took, relativeTimeInNanosSupplier.getAsLong()); totalStats.indexCurrent.dec(); } break; @@ -125,7 +153,8 @@ void noopUpdate() { } static class StatsHolder { - private final MeanMetric indexMetric = new MeanMetric(); + private final MeanMetric indexMetric = new MeanMetric(); // Used for the count and total 'took' time (in ns) of index operations + private final ExponentiallyWeightedMovingRate recentIndexMetric; // An EWMR of the total 'took' time of index operations (in ns) private final MeanMetric deleteMetric = new MeanMetric(); private final CounterMetric indexCurrent = new CounterMetric(); private final CounterMetric indexFailed = new CounterMetric(); @@ -133,14 +162,44 @@ static class StatsHolder { private final CounterMetric deleteCurrent = new CounterMetric(); private final CounterMetric noopUpdates = new CounterMetric(); + StatsHolder(long startTimeInNanos, TimeValue recentWriteLoadHalfLife) { + double lambdaInInverseNanos = Math.log(2.0) / recentWriteLoadHalfLife.nanos(); + logger.debug( + "Initialized stats for new shard calculating recent indexing load with half-life {} (decay parameter {} ns^-1)", + recentWriteLoadHalfLife, + lambdaInInverseNanos + ); + this.recentIndexMetric = new ExponentiallyWeightedMovingRate(lambdaInInverseNanos, startTimeInNanos); + } + IndexingStats.Stats stats( boolean isThrottled, long currentThrottleMillis, long indexingTimeBeforeShardStartedInNanos, - long timeSinceShardStartedInNanos + long timeSinceShardStartedInNanos, + long currentTimeInNanos, + double recentIndexingLoadAtShardStarted ) { final long totalIndexingTimeInNanos = indexMetric.sum(); final long totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeInNanos - indexingTimeBeforeShardStartedInNanos; + final double recentIndexingLoadSinceShardStarted = recentIndexMetric.calculateRateSince( + currentTimeInNanos, + recentIndexMetric.getRate(currentTimeInNanos), + // The recentIndexingLoadAtShardStarted passed in should have been calculated at this time: + currentTimeInNanos - timeSinceShardStartedInNanos, + recentIndexingLoadAtShardStarted + ); + logger.debug( + () -> Strings.format( + "Generating stats for an index shard with indexing time %s and active time %s giving unweighted write load %g, " + + "while the recency-weighted write load is %g using a half-life of %s", + timeValueNanos(totalIndexingTimeSinceShardStartedInNanos), + timeValueNanos(timeSinceShardStartedInNanos), + 1.0 * totalIndexingTimeSinceShardStartedInNanos / timeSinceShardStartedInNanos, + recentIndexingLoadSinceShardStarted, + timeValueNanos((long) recentIndexMetric.getHalfLife()) + ) + ); return new IndexingStats.Stats( indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(totalIndexingTimeInNanos), @@ -154,7 +213,8 @@ IndexingStats.Stats stats( isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis), totalIndexingTimeSinceShardStartedInNanos, - timeSinceShardStartedInNanos + timeSinceShardStartedInNanos, + recentIndexingLoadSinceShardStarted ); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 6e0d53a176cea..c3adcf1160e66 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -127,6 +127,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.TranslogStats; @@ -277,6 +278,7 @@ public class IndicesService extends AbstractLifecycleComponent private final List searchOperationListeners; private final QueryRewriteInterceptor queryRewriteInterceptor; final SlowLogFieldProvider slowLogFieldProvider; // pkg-private for testingå + private final IndexingStatsSettings indexStatsSettings; @Override protected void doStart() { @@ -401,6 +403,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.postRecoveryMerger = new PostRecoveryMerger(settings, threadPool.executor(ThreadPool.Names.FORCE_MERGE), this::getShardOrNull); this.searchOperationListeners = builder.searchOperationListener; this.slowLogFieldProvider = builder.slowLogFieldProvider; + this.indexStatsSettings = new IndexingStatsSettings(clusterService.getClusterSettings()); } private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; @@ -774,7 +777,8 @@ private synchronized IndexService createIndexService( recoveryStateFactories, slowLogFieldProvider, mapperMetrics, - searchOperationListeners + searchOperationListeners, + indexStatsSettings ); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); @@ -871,7 +875,8 @@ public synchronized MapperService createIndexMapperServiceForValidation(IndexMet recoveryStateFactories, slowLogFieldProvider, mapperMetrics, - searchOperationListeners + searchOperationListeners, + indexStatsSettings ); pluginsService.forEach(p -> p.onIndexModule(indexModule)); return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 10915d385d5b3..99c0583a351eb 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -591,6 +591,7 @@ private static CommonStats createShardLevelCommonStats() { false, ++iota, ++iota, + ++iota, ++iota ); indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java index 1709e2d4d372f..55b620580be3f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java @@ -126,7 +126,8 @@ private ShardStats createShardStats( false, 0, totalIndexingTimeSinceShardStartedInNanos, - totalActiveTimeInNanos + totalActiveTimeInNanos, + 0.0 ) ); return new ShardStats(shardRouting, commonStats, null, null, null, null, null, false, false, 0); diff --git a/server/src/test/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRateTests.java b/server/src/test/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRateTests.java index 0255c80e9d680..15ee27e5b3e73 100644 --- a/server/src/test/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRateTests.java +++ b/server/src/test/java/org/elasticsearch/common/metrics/ExponentiallyWeightedMovingRateTests.java @@ -214,14 +214,6 @@ public void testEwmr_negativeLambdaThrowsOnConstruction() { assertThrows(IllegalArgumentException.class, () -> new ExponentiallyWeightedMovingRate(-1.0e-6, START_TIME_IN_MILLIS)); } - public void testEwmr_zeroStartTimeInMillis() { - assertThrows(IllegalArgumentException.class, () -> new ExponentiallyWeightedMovingRate(LAMBDA, 0)); - } - - public void testEwmr_negativeStartTimeInMillis() { - assertThrows(IllegalArgumentException.class, () -> new ExponentiallyWeightedMovingRate(LAMBDA, -1)); - } - // N.B. This test is not guaranteed to fail even if the implementation is not thread-safe. The operations are fast enough that there is // a chance each thread will complete before the next one has started. We use a high thread count to try to get a decent change of // hitting a race condition if there is one. This should be run with e.g. -Dtests.iters=20 to test thoroughly. @@ -351,4 +343,14 @@ public void testCalculateRateSince_currentTimeNotAfterOldTime() { assertThat(ewmr.calculateRateSince(START_TIME_IN_MILLIS + 500, 123.0, START_TIME_IN_MILLIS + 1000, 123.0), equalTo(0.0)); assertThat(ewmr.calculateRateSince(START_TIME_IN_MILLIS + 500, 123.0, START_TIME_IN_MILLIS + 1000, 456), equalTo(0.0)); } + + public void testGetHalfLife() { + ExponentiallyWeightedMovingRate ewmr = new ExponentiallyWeightedMovingRate(LAMBDA, START_TIME_IN_MILLIS); + assertThat(ewmr.getHalfLife(), closeTo(HALF_LIFE_MILLIS, 1.0e-9)); + } + + public void testGetHalfLife_lambdaZero() { + ExponentiallyWeightedMovingRate ewmr = new ExponentiallyWeightedMovingRate(0.0, START_TIME_IN_MILLIS); + assertThat(ewmr.getHalfLife(), equalTo(Double.POSITIVE_INFINITY)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index cf1b05bc29630..565037eba8369 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -71,6 +72,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.IndexingOperationListener; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -247,7 +249,8 @@ public void testWrapperIsBound() throws IOException { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); module.setReaderWrapper(s -> new Wrapper()); @@ -275,7 +278,8 @@ public void testRegisterIndexStore() throws IOException { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); final IndexService indexService = newIndexService(module); @@ -301,7 +305,8 @@ public void testDirectoryWrapper() throws IOException { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); module.setDirectoryWrapper(new TestDirectoryWrapper()); @@ -655,7 +660,8 @@ public void testRegisterCustomRecoveryStateFactory() throws IOException { recoveryStateFactories, mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); final IndexService indexService = newIndexService(module); @@ -678,7 +684,8 @@ public void testIndexCommitListenerIsBound() throws IOException, ExecutionExcept Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong(); @@ -781,7 +788,8 @@ private static IndexModule createIndexModule( Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index faf782e02c6cf..c0a9f5d84fe3b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5122,7 +5122,15 @@ public void testShardExposesWriteLoadStats() throws Exception { NOOP_GCP_SYNCER, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, - fakeClock + fakeClock, + // Use a listener to advance the fake clock once per indexing operation: + new IndexingOperationListener() { + @Override + public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { + fakeClock.advance(); + return IndexingOperationListener.super.preIndex(shardId, operation); + } + } ); // Now simulate that each operation takes 1 minute to complete. @@ -5230,24 +5238,19 @@ public void indexTranslogOperations( static class FakeClock implements LongSupplier { private final AtomicLong currentRelativeTime = new AtomicLong(); - private final AtomicInteger tick = new AtomicInteger(); - private volatile TimeValue elapsedTimePerPairOfQueries = TimeValue.ZERO; + private volatile TimeValue simulatedElapsedRelativeTime = TimeValue.ZERO; @Override public long getAsLong() { - // Since the clock is checked at the beginning and at the end of - // the indexing op, just increase the current relative time at the - // end. - if (tick.getAndIncrement() % 2 == 0) { - return currentRelativeTime.get(); - } else { - return currentRelativeTime.addAndGet(elapsedTimePerPairOfQueries.nanos()); - } + return currentRelativeTime.get(); + } + + void setSimulatedElapsedRelativeTime(TimeValue simulatedElapsedRelativeTime) { + this.simulatedElapsedRelativeTime = simulatedElapsedRelativeTime; } - void setSimulatedElapsedRelativeTime(TimeValue elapsedTimePerPairOfQueries) { - tick.set(0); - this.elapsedTimePerPairOfQueries = elapsedTimePerPairOfQueries; + public void advance() { + currentRelativeTime.addAndGet(simulatedElapsedRelativeTime.nanos()); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsSettingsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsSettingsTests.java new file mode 100644 index 0000000000000..416357d34678e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsSettingsTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class IndexingStatsSettingsTests extends ESTestCase { + + public void testRecentWriteLoadHalfLife_defaultValue() { + IndexingStatsSettings settings = new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()); + assertThat(settings.getRecentWriteLoadHalfLifeForNewShards(), equalTo(IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT)); + } + + public void testRecentWriteLoadHalfLife_initialValue() { + IndexingStatsSettings settings = new IndexingStatsSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder().put(IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING.getKey(), "2h").build() + ) + ); + assertThat(settings.getRecentWriteLoadHalfLifeForNewShards(), equalTo(TimeValue.timeValueHours(2))); + } + + public void testRecentWriteLoadHalfLife_updateValue() { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(); + IndexingStatsSettings settings = new IndexingStatsSettings(clusterSettings); + clusterSettings.applySettings( + Settings.builder().put(IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING.getKey(), "90m").build() + ); + assertThat(settings.getRecentWriteLoadHalfLifeForNewShards(), equalTo(TimeValue.timeValueMinutes(90))); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java new file mode 100644 index 0000000000000..aa267ba6a16b9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class IndexingStatsTests extends ESTestCase { + + private static final double DOUBLE_TOLERANCE = 1.0e-10; + + public void testStatsGetWriteLoad() { + IndexingStats.Stats stats = new IndexingStats.Stats( + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + false, + 10, + 1_800_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1.8sec + 3_000_000_000L, // totalActiveTimeInNanos - 3sec + 0.1357 + ); + double expectedWriteLoad = 0.6; // 1.8sec / 3sec + assertThat(stats.getWriteLoad(), closeTo(expectedWriteLoad, DOUBLE_TOLERANCE)); + } + + public void testStatsAdd_indexCount() { + IndexingStats.Stats stats1 = new IndexingStats.Stats( + 1001L, // indexCount + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + false, + 10, + 11, + 12, + 0.1357 + ); + IndexingStats.Stats stats2 = new IndexingStats.Stats( + 2001L, // indexCount + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + false, + 10, + 11, + 12, + 0.1357 + ); + IndexingStats.Stats statsAgg = sumOfStats(stats1, stats2); + assertThat(statsAgg.getIndexCount(), equalTo(1001L + 2001L)); + } + + public void testStatsAdd_throttled() { + IndexingStats.Stats statsFalse = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, false, 10, 11, 12, 0.1357); + IndexingStats.Stats statsTrue = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, true, 10, 11, 12, 0.1357); + assertThat(sumOfStats(statsFalse, statsFalse).isThrottled(), is(false)); + assertThat(sumOfStats(statsFalse, statsTrue).isThrottled(), is(true)); + assertThat(sumOfStats(statsTrue, statsFalse).isThrottled(), is(true)); + assertThat(sumOfStats(statsTrue, statsTrue).isThrottled(), is(true)); + } + + public void testStatsAdd_writeLoads() { + IndexingStats.Stats stats1 = new IndexingStats.Stats( + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + false, + 10, + 1_000_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1sec + 2_000_000_000L, // totalActiveTimeInNanos - 2sec + 0.1357 // recentWriteLoad + ); + IndexingStats.Stats stats2 = new IndexingStats.Stats( + 2, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + false, + 10, + 2_100_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 2.1sec + 3_000_000_000L, // totalActiveTimeInNanos - 3sec + 0.2468 // recentWriteLoad + ); + IndexingStats.Stats statsAgg = sumOfStats(stats1, stats2); + // The unweighted write loads for the two shards are 0.5 (1sec / 2sec) and 0.7 (2.1sec / 3sec) respectively. + // The aggregated value should be the average weighted by the times, i.e. by 2sec and 3sec, giving weights of 0.4 and 0.6. + double expectedWriteLoad = 0.4 * 0.5 + 0.6 * 0.7; + // The aggregated value for the recent write load should be the average with the same weights. + double expectedRecentWriteLoad = 0.4 * 0.1357 + 0.6 * 0.2468; + assertThat(statsAgg.getWriteLoad(), closeTo(expectedWriteLoad, DOUBLE_TOLERANCE)); + assertThat(statsAgg.getRecentWriteLoad(), closeTo(expectedRecentWriteLoad, DOUBLE_TOLERANCE)); + } + + private static IndexingStats.Stats sumOfStats(IndexingStats.Stats stats1, IndexingStats.Stats stats2) { + IndexingStats.Stats statsAgg = new IndexingStats.Stats(); + statsAgg.add(stats1); + statsAgg.add(stats2); + return statsAgg; + } +} diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java index b1da067e2f7e6..730197b0450cc 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java @@ -152,7 +152,8 @@ private void mockShardStats(boolean includeCommonStats) { randomBoolean(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong() + randomNonNegativeLong(), + randomDoubleBetween(0.0, 1.0, true) ) ) ); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index e8286835e9cfa..89ce1f4eb06cd 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -553,7 +553,8 @@ protected IndexShard newShard( IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, relativeTimeSupplier, null, - MapperMetrics.NOOP + MapperMetrics.NOOP, + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 4811d65e6ed85..415af89d9835a 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -402,6 +402,7 @@ private static CommonStats mockCommonStats() { false, ++iota, no, + no, no ); commonStats.getIndexing().add(new IndexingStats(indexingStats)); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index ca7651ce84497..dc75f6dd9d554 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -183,7 +183,7 @@ private CommonStats mockCommonStats() { commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong() >> 8)); // >> 8 to avoid overflow - we add these things up commonStats.getStore().add(new StoreStats(2L, 0L, 0L)); - final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0); + final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0, 0.0); commonStats.getIndexing().add(new IndexingStats(indexingStats)); final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index be87b92479c21..982680ec65283 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -348,6 +348,7 @@ private static NodeStats mockNodeStats() { false, ++iota, no, + no, no ); indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats)); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 402674b35b3f4..d2873654275da 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.mapper.MapperMetrics; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.license.ClusterStateLicenseService; import org.elasticsearch.license.License; @@ -375,7 +376,8 @@ public void testOnIndexModuleIsNoOpWithSecurityDisabled() throws Exception { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - List.of() + List.of(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); security.onIndexModule(indexModule); // indexReaderWrapper is a SetOnce so if Security#onIndexModule had already set an ReaderWrapper we would get an exception here diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java index e8d6a2868a496..bd8d15ea809fe 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.watcher; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.index.IndexModule; @@ -14,6 +15,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.mapper.MapperMetrics; +import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.plugins.Plugin; @@ -71,7 +73,8 @@ public void testWatcherDisabledTests() throws Exception { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - List.of() + List.of(), + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) ); // this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it watcher.onIndexModule(indexModule);