Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
CommonStats stats = new CommonStats();
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
stats.store = new StoreStats();
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1));
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123));
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -636,7 +637,8 @@ public static final IndexShard newIndexShard(
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
System::nanoTime,
null,
MapperMetrics.NOOP
MapperMetrics.NOOP,
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_RESHARDING_METADATA = def(9_031_0_00);
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA = def(9_032_0_00);
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import static java.lang.Math.exp;
import static java.lang.Math.expm1;
import static java.lang.Math.log;

/**
* Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to
Expand Down Expand Up @@ -41,7 +42,8 @@ public class ExponentiallyWeightedMovingRate {
private final double lambda;
private final long startTime;
private double rate;
long lastTime;
private long lastTime;
private boolean waitingForFirstIncrement;

/**
* Constructor.
Expand All @@ -57,14 +59,12 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) {
if (lambda < 0.0) {
throw new IllegalArgumentException("lambda must be non-negative but was " + lambda);
}
if (startTime <= 0.0) {
throw new IllegalArgumentException("startTime must be non-negative but was " + startTime);
}
synchronized (this) {
this.lambda = lambda;
this.rate = Double.NaN; // should never be used
this.startTime = startTime;
this.lastTime = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first
this.lastTime = 0; // should never be used
this.waitingForFirstIncrement = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are tweaking this class so that it allows times to be zero. This is technically the correct thing to do, since it is allowed for System.nanoTime() to return zero (though it is vanishingly unlikely!). More importantly, it avoids having to tweak the times used in some tests.

As a result, we can no longer use lastTime == 0 as a marker that we're waiting for the first increment, so we add a boolean to do that job explicitly.

}
}

Expand All @@ -80,7 +80,7 @@ public ExponentiallyWeightedMovingRate(double lambda, long startTime) {
*/
public double getRate(long time) {
synchronized (this) {
if (lastTime == 0) { // indicates that no increment has happened yet
if (waitingForFirstIncrement) {
return 0.0;
} else if (time <= lastTime) {
return rate;
Expand All @@ -104,6 +104,9 @@ public double getRate(long time) {
* instance. It is only non-static because it uses this instance's {@code lambda} and {@code startTime}.
*/
public double calculateRateSince(long currentTime, double currentRate, long oldTime, double oldRate) {
if (oldTime < startTime) {
oldTime = startTime;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this guard for a few edge cases.

if (currentTime <= oldTime) {
return 0.0;
}
Expand All @@ -127,12 +130,13 @@ public double calculateRateSince(long currentTime, double currentRate, long oldT
*/
public void addIncrement(double increment, long time) {
synchronized (this) {
if (lastTime == 0) { // indicates that this is the first increment
if (waitingForFirstIncrement) {
if (time <= startTime) {
time = startTime + 1;
}
// This is the formula for R(t_1) given in subsection 2.6 of the document referenced above:
rate = increment / expHelper(time - startTime);
waitingForFirstIncrement = false;
} else {
if (time < lastTime) {
time = lastTime;
Expand Down Expand Up @@ -165,4 +169,12 @@ private double expHelper(double time) {
return time * (1.0 - 0.5 * lambdaTime);
}
}

/**
* Returns the configured half-life of this instance. The units are the same as all other times in API calls, and the inverse of the
* units used for the {@code lambda} constructor parameter. If {@code lambda} is {@code 0.0}, returns {@link Double#POSITIVE_INFINITY}.
*/
public double getHalfLife() {
return log(2.0) / lambda;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -631,6 +632,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING
).filter(Objects::nonNull).collect(toSet());
}
9 changes: 7 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -177,6 +178,7 @@ public interface DirectoryWrapper {
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
private final MapperMetrics mapperMetrics;
private final IndexingStatsSettings indexingStatsSettings;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -197,7 +199,8 @@ public IndexModule(
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final SlowLogFieldProvider slowLogFieldProvider,
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners
final List<SearchOperationListener> searchOperationListeners,
final IndexingStatsSettings indexingStatsSettings
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -212,6 +215,7 @@ public IndexModule(
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.mapperMetrics = mapperMetrics;
this.indexingStatsSettings = indexingStatsSettings;
}

/**
Expand Down Expand Up @@ -547,7 +551,8 @@ public IndexService newIndexService(
snapshotCommitSupplier,
indexCommitListener.get(),
mapperMetrics,
queryRewriteInterceptor
queryRewriteInterceptor,
indexingStatsSettings
);
success = true;
return indexService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -167,6 +168,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ValuesSourceRegistry valuesSourceRegistry;
private final MapperMetrics mapperMetrics;
private final QueryRewriteInterceptor queryRewriteInterceptor;
private final IndexingStatsSettings indexingStatsSettings;

@SuppressWarnings("this-escape")
public IndexService(
Expand Down Expand Up @@ -203,7 +205,8 @@ public IndexService(
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
Engine.IndexCommitListener indexCommitListener,
MapperMetrics mapperMetrics,
QueryRewriteInterceptor queryRewriteInterceptor
QueryRewriteInterceptor queryRewriteInterceptor,
IndexingStatsSettings indexingStatsSettings
) {
super(indexSettings);
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
Expand Down Expand Up @@ -288,6 +291,7 @@ public IndexService(
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
}
this.indexingStatsSettings = indexingStatsSettings;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -572,7 +576,8 @@ public synchronized IndexShard createShard(
snapshotCommitSupplier,
System::nanoTime,
indexCommitListener,
mapperMetrics
mapperMetrics,
indexingStatsSettings
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final LongSupplier relativeTimeInNanosSupplier;
private volatile long startedRelativeTimeInNanos;
private volatile long indexingTimeBeforeShardStartedInNanos;
private volatile double recentIndexingLoadAtShardStarted;
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();

// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead
Expand Down Expand Up @@ -330,7 +331,8 @@ public IndexShard(
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
final LongSupplier relativeTimeInNanosSupplier,
final Engine.IndexCommitListener indexCommitListener,
final MapperMetrics mapperMetrics
final MapperMetrics mapperMetrics,
final IndexingStatsSettings indexingStatsSettings
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand All @@ -349,7 +351,7 @@ public IndexShard(
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
this.internalIndexingStats = new InternalIndexingStats(relativeTimeInNanosSupplier, indexingStatsSettings);
var indexingFailuresDebugListener = new IndexingFailuresDebugListener(this);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(
CollectionUtils.appendToCopyNoNullElements(listeners, internalIndexingStats, indexingFailuresDebugListener),
Expand Down Expand Up @@ -557,6 +559,7 @@ public void updateShardState(
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
startedRelativeTimeInNanos = getRelativeTimeInNanos();
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
} else if (currentRouting.primary()
&& currentRouting.relocating()
&& replicationTracker.isRelocated()
Expand Down Expand Up @@ -1366,11 +1369,14 @@ public IndexingStats indexingStats() {
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
}

long currentTimeInNanos = getRelativeTimeInNanos();
return internalIndexingStats.stats(
throttled,
throttleTimeInMillis,
indexingTimeBeforeShardStartedInNanos,
getRelativeTimeInNanos() - startedRelativeTimeInNanos
currentTimeInNanos - startedRelativeTimeInNanos,
currentTimeInNanos,
recentIndexingLoadAtShardStarted
);
}

Expand Down
Loading
Loading