diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index a37c20aeac461..f2b52349fa210 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -195,6 +195,7 @@ static TransportVersion def(int id) { public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00); public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00); public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00); + public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index beea9ef07c86a..42f7d7888a050 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -322,6 +322,17 @@ private AutoShardingResult getDecreaseShardsResult( // Visible for testing static long computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumberWriteThreads, double indexingLoad) { + /* + * Define: + * - shardsByMaxThreads = number of shards required to ensure no more than 50% utilization with max number of threads per shard + * - shardsByMinThreads = number of shards required to ensure no more than 50% utilization with min number of threads per shard + * Note that shardsByMaxThreads <= shardsByMinThreads. + * This returns: + * - shardsByMaxThreads if shardsByMaxThreads > 3 + * - 3 if shardsByMaxThreads <= 3 and shardsByMinThreads > 3 + * - shardsByMinThreads if 0 < shardsByMinThreads <= 3 + * - 1 if shardsByMinThreads == 0 + */ return Math.max( Math.max( Math.min(roundUp(indexingLoad / (minNumberWriteThreads / 2.0)), 3), diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java index 201f4aab6eb85..f2064397ed777 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java @@ -108,6 +108,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata, indexWriteLoadBuilder.withShardWriteLoad( shardStats.getShardRouting().id(), indexingShardStats.getWriteLoad(), + indexingShardStats.getRecentWriteLoad(), indexingShardStats.getTotalActiveTimeInMillis() ); totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java index 459f8e0618f33..b03a1c02891f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java @@ -9,9 +9,11 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentFragment; @@ -27,6 +29,7 @@ public class IndexWriteLoad implements Writeable, ToXContentFragment { public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads"); public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes"); + public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads"); private static final Double UNKNOWN_LOAD = -1.0; private static final long UNKNOWN_UPTIME = -1; @@ -34,17 +37,23 @@ public class IndexWriteLoad implements Writeable, ToXContentFragment { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "index_write_load_parser", false, - (args, unused) -> IndexWriteLoad.create((List) args[0], (List) args[1]) + (args, unused) -> IndexWriteLoad.create((List) args[0], (List) args[1], (List) args[2]) ); static { PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD); PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS); + // The recent write load field is optional so that we can parse XContent built by older versions which did not include it: + PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD); } - public static IndexWriteLoad create(List shardsWriteLoad, List shardsUptimeInMillis) { + private static IndexWriteLoad create( + List shardsWriteLoad, + List shardsUptimeInMillis, + @Nullable List shardsRecentWriteLoad + ) { if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) { - assert false; + assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis"; throw new IllegalArgumentException( "The same number of shard write loads and shard uptimes should be provided, but " + shardsWriteLoad @@ -55,39 +64,70 @@ public static IndexWriteLoad create(List shardsWriteLoad, List sha } if (shardsWriteLoad.isEmpty()) { - assert false; + assert false : "IndexWriteLoad.create() was called with empty shardsRecentWriteLoad"; + ; throw new IllegalArgumentException("At least one shard write load and uptime should be provided, but none was provided"); } + if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) { + assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis"; + throw new IllegalArgumentException( + "The same number of shard write loads and shard uptimes should be provided, but " + + shardsWriteLoad + + " " + + shardsUptimeInMillis + + " were provided" + ); + } + return new IndexWriteLoad( shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(), - shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray() + shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(), + shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null ); } private final double[] shardWriteLoad; private final long[] shardUptimeInMillis; + private final double[] shardRecentWriteLoad; - private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis) { - assert shardWriteLoad.length == shardUptimeInMillis.length; + private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) { + assert shardWriteLoad.length == shardUptimeInMillis.length + : "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis"; this.shardWriteLoad = shardWriteLoad; this.shardUptimeInMillis = shardUptimeInMillis; + if (shardRecentWriteLoad != null) { + assert shardRecentWriteLoad.length == shardUptimeInMillis.length + : "IndexWriteLoad constructor was called with non-matched lengths for shardRecentWriteLoad and shardUptimeInMillis"; + this.shardRecentWriteLoad = shardRecentWriteLoad; + } else { + this.shardRecentWriteLoad = new double[shardUptimeInMillis.length]; + Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD); + } } public IndexWriteLoad(StreamInput in) throws IOException { - this(in.readDoubleArray(), in.readLongArray()); + this( + in.readDoubleArray(), + in.readLongArray(), + in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null + ); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeDoubleArray(shardWriteLoad); out.writeLongArray(shardUptimeInMillis); + if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) { + out.writeDoubleArray(shardRecentWriteLoad); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad); builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis); + builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad); return builder; } @@ -102,6 +142,13 @@ public OptionalDouble getWriteLoadForShard(int shardId) { return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty(); } + public OptionalDouble getRecentWriteLoadForShard(int shardId) { + assertShardInBounds(shardId); + + double load = shardRecentWriteLoad[shardId]; + return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty(); + } + public OptionalLong getUptimeInMillisForShard(int shardId) { assertShardInBounds(shardId); @@ -109,7 +156,6 @@ public OptionalLong getUptimeInMillisForShard(int shardId) { return uptime != UNKNOWN_UPTIME ? OptionalLong.of(uptime) : OptionalLong.empty(); } - // Visible for testing public int numberOfShards() { return shardWriteLoad.length; } @@ -124,13 +170,16 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; IndexWriteLoad that = (IndexWriteLoad) o; - return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis); + return Arrays.equals(shardWriteLoad, that.shardWriteLoad) + && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis) + && Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad); } @Override public int hashCode() { int result = Arrays.hashCode(shardWriteLoad); result = 31 * result + Arrays.hashCode(shardUptimeInMillis); + result = 31 * result + Arrays.hashCode(shardRecentWriteLoad); return result; } @@ -140,29 +189,33 @@ public static Builder builder(int numShards) { } public static class Builder { - final double[] shardWriteLoad; - final long[] uptimeInMillis; + private final double[] shardWriteLoad; + private final long[] uptimeInMillis; + private final double[] shardRecentWriteLoad; private Builder(int numShards) { this.shardWriteLoad = new double[numShards]; this.uptimeInMillis = new long[numShards]; + this.shardRecentWriteLoad = new double[numShards]; Arrays.fill(shardWriteLoad, UNKNOWN_LOAD); Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME); + Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD); } - public Builder withShardWriteLoad(int shardId, double load, long uptimeInMillis) { + public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) { if (shardId >= this.shardWriteLoad.length) { throw new IllegalArgumentException(); } this.shardWriteLoad[shardId] = load; this.uptimeInMillis[shardId] = uptimeInMillis; + this.shardRecentWriteLoad[shardId] = recentLoad; return this; } public IndexWriteLoad build() { - return new IndexWriteLoad(shardWriteLoad, uptimeInMillis); + return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad); } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 657a3976c46f7..511e06d6884ee 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -298,7 +298,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final RefreshFieldHasValueListener refreshFieldHasValueListener; private volatile boolean useRetentionLeasesInPeerRecovery; private final LongSupplier relativeTimeInNanosSupplier; - private volatile long startedRelativeTimeInNanos; + private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value private volatile long indexingTimeBeforeShardStartedInNanos; private volatile double recentIndexingLoadAtShardStarted; private final SubscribableListener waitForEngineOrClosedShardListeners = new SubscribableListener<>(); @@ -557,7 +557,10 @@ public void updateShardState( : "a primary relocation is completed by the master, but primary mode is not active " + currentRouting; changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); - startedRelativeTimeInNanos = getRelativeTimeInNanos(); + long relativeTimeInNanos = getRelativeTimeInNanos(); + // We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value. So in the vanishingly + // unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value. + startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L; indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos(); recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos); } else if (currentRouting.primary() @@ -1370,11 +1373,14 @@ public IndexingStats indexingStats() { } long currentTimeInNanos = getRelativeTimeInNanos(); + // We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started. + // In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero). + long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L; return internalIndexingStats.stats( throttled, throttleTimeInMillis, indexingTimeBeforeShardStartedInNanos, - currentTimeInNanos - startedRelativeTimeInNanos, + timeSinceShardStartedInNanos, currentTimeInNanos, recentIndexingLoadAtShardStarted ); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java index c04fba13a14d3..c2a8c47743bf0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -22,8 +21,9 @@ */ public class IndexingStatsSettings { - // TODO: Change this default to something sensible: - static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = new TimeValue(10000, TimeUnit.DAYS); + static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = TimeValue.timeValueMinutes(5); // Aligns with the interval between DSL runs + static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MIN = TimeValue.timeValueSeconds(1); // A sub-second half-life makes no sense + static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MAX = TimeValue.timeValueDays(100_000); // Long.MAX_VALUE nanos, rounded down /** * A cluster setting giving the half-life, in seconds, to use for the Exponentially Weighted Moving Rate calculation used for the @@ -34,7 +34,8 @@ public class IndexingStatsSettings { public static final Setting RECENT_WRITE_LOAD_HALF_LIFE_SETTING = Setting.timeSetting( "indices.stats.recent_write_load.half_life", RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT, - TimeValue.ZERO, + RECENT_WRITE_LOAD_HALF_LIFE_MIN, + RECENT_WRITE_LOAD_HALF_LIFE_MAX, Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 29a5c4b7fcd45..4cf6120e12bbd 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -100,7 +100,7 @@ public void testCalculateValidations() { 1, now, List.of(now - 3000, now - 2000, now - 1000), - getWriteLoad(1, 2.0), + getWriteLoad(1, 2.0, 9999.0), null ); builder.put(dataStream); @@ -141,7 +141,7 @@ public void testCalculateIncreaseShardingRecommendations() { 1, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(1, 2.0), + getWriteLoad(1, 2.0, 9999.0), autoShardingEvent ); @@ -170,7 +170,7 @@ public void testCalculateIncreaseShardingRecommendations() { 1, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(1, 2.0), + getWriteLoad(1, 2.0, 9999.0), autoShardingEvent ); @@ -202,7 +202,7 @@ public void testCalculateIncreaseShardingRecommendations() { 1, now, List.of(now - 10_000_000, now - 7_000_000, now - 2_000_000, now - 1_000_000, now - 1000), - getWriteLoad(1, 2.0), + getWriteLoad(1, 2.0, 9999.0), autoShardingEvent ); @@ -237,7 +237,7 @@ public void testCalculateDecreaseShardingRecommendations() { 3, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(3, 0.25), + getWriteLoad(3, 0.25, 9999.0), autoShardingEvent ); @@ -271,7 +271,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 0.333), + getWriteLoad(3, 0.333, 9999.0), autoShardingEvent ); @@ -306,7 +306,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 0.333), + getWriteLoad(3, 0.333, 9999.0), autoShardingEvent ); @@ -346,7 +346,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(1).getMillis(), now - 1000 ), - getWriteLoad(3, 0.25), + getWriteLoad(3, 0.25, 9999.0), autoShardingEvent ); @@ -386,7 +386,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 1.333), + getWriteLoad(3, 1.333, 9999.0), autoShardingEvent ); @@ -479,7 +479,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { IndexMetadata indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), creationDate), 1, - getWriteLoad(1, 999.0), + getWriteLoad(1, 999.0, 9999.0), creationDate ); @@ -487,7 +487,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), creationDate), 1, - getWriteLoad(1, 1.0), + getWriteLoad(1, 1.0, 9999.0), creationDate ); } @@ -502,14 +502,14 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), 3, - getWriteLoad(3, 5.0), // max write index within cooling period + getWriteLoad(3, 5.0, 9999.0), // max write index within cooling period createdAt ); } else { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), 3, - getWriteLoad(3, 3.0), // each backing index has a write load of 3.0 + getWriteLoad(3, 3.0, 9999.0), // each backing index has a write load of 3.0 createdAt ); } @@ -518,7 +518,12 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { } final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); - final IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, getWriteLoad(3, 1.0), System.currentTimeMillis()); + final IndexMetadata writeIndexMetadata = createIndexMetadata( + writeIndexName, + 3, + getWriteLoad(3, 1.0, 9999.0), + System.currentTimeMillis() + ); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); @@ -558,9 +563,9 @@ public void testIndexLoadWithinCoolingPeriodIsSumOfShardsLoads() { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(3); for (int shardId = 0; shardId < 3; shardId++) { switch (shardId) { - case 0 -> builder.withShardWriteLoad(shardId, 0.5, 40); - case 1 -> builder.withShardWriteLoad(shardId, 3.0, 10); - case 2 -> builder.withShardWriteLoad(shardId, 0.3333, 150); + case 0 -> builder.withShardWriteLoad(shardId, 0.5, 9999.0, 40); + case 1 -> builder.withShardWriteLoad(shardId, 3.0, 9999.0, 10); + case 2 -> builder.withShardWriteLoad(shardId, 0.3333, 9999.0, 150); } } indexMetadata = createIndexMetadata( @@ -574,7 +579,12 @@ public void testIndexLoadWithinCoolingPeriodIsSumOfShardsLoads() { } final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); - final IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, getWriteLoad(3, 0.1), System.currentTimeMillis()); + final IndexMetadata writeIndexMetadata = createIndexMetadata( + writeIndexName, + 3, + getWriteLoad(3, 0.1, 9999.0), + System.currentTimeMillis() + ); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); @@ -694,12 +704,11 @@ private IndexMetadata createIndexMetadata( .build(); } - private IndexWriteLoad getWriteLoad(int numberOfShards, double shardWriteLoad) { + private IndexWriteLoad getWriteLoad(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad) { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); for (int shardId = 0; shardId < numberOfShards; shardId++) { - builder.withShardWriteLoad(shardId, shardWriteLoad, 1); + builder.withShardWriteLoad(shardId, shardWriteLoad, shardRecentWriteLoad, 1); } return builder.build(); } - } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 14e13cc2ab5ca..5a819419cd93d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -1069,7 +1069,8 @@ public void testToXContent() throws IOException { "stats": { "write_load": { "loads": [-1.0], - "uptimes": [-1] + "uptimes": [-1], + "recent_loads": [-1.0] }, "avg_size": { "total_size_in_bytes": 120, @@ -1343,6 +1344,9 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti ], "uptimes" : [ -1 + ], + "recent_loads" : [ + -1.0 ] }, "avg_size" : { @@ -1623,6 +1627,9 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti ], "uptimes" : [ -1 + ], + "recent_loads" : [ + -1.0 ] }, "avg_size" : { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 6d2b8cb7a3f82..c8f4dd2670291 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2138,7 +2138,12 @@ public void testGetIndicesWithinMaxAgeRangeAllIndicesOutsideRange() { private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); for (int shardId = 0; shardId < numberOfShards; shardId++) { - builder.withShardWriteLoad(shardId, randomDoubleBetween(0, 64, true), randomLongBetween(1, 10)); + builder.withShardWriteLoad( + shardId, + randomDoubleBetween(0, 64, true), + randomDoubleBetween(0, 64, true), + randomLongBetween(1, 10) + ); } return builder.build(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java index 99adc1185878f..148063c8a00b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java @@ -32,7 +32,12 @@ protected IndexMetadataStats createTestInstance() { final int numberOfShards = randomIntBetween(1, 10); final var indexWriteLoad = IndexWriteLoad.builder(numberOfShards); for (int i = 0; i < numberOfShards; i++) { - indexWriteLoad.withShardWriteLoad(i, randomDoubleBetween(1, 10, true), randomLongBetween(1, 1000)); + indexWriteLoad.withShardWriteLoad( + i, + randomDoubleBetween(1, 10, true), + randomDoubleBetween(1, 10, true), + randomLongBetween(1, 1000) + ); } return new IndexMetadataStats(indexWriteLoad.build(), randomLongBetween(1024, 10240), randomIntBetween(1, 4)); } @@ -53,10 +58,13 @@ protected IndexMetadataStats mutateInstance(IndexMetadataStats originalStats) { double shardLoad = existingShard && randomBoolean() ? originalWriteLoad.getWriteLoadForShard(i).getAsDouble() : randomDoubleBetween(0, 128, true); + double recentShardLoad = existingShard && randomBoolean() + ? originalWriteLoad.getRecentWriteLoadForShard(i).getAsDouble() + : randomDoubleBetween(0, 128, true); long uptimeInMillis = existingShard && randomBoolean() ? originalWriteLoad.getUptimeInMillisForShard(i).getAsLong() : randomNonNegativeLong(); - indexWriteLoad.withShardWriteLoad(i, shardLoad, uptimeInMillis); + indexWriteLoad.withShardWriteLoad(i, shardLoad, recentShardLoad, uptimeInMillis); } return new IndexMetadataStats(indexWriteLoad.build(), randomLongBetween(1024, 10240), randomIntBetween(1, 4)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index d80c54b34bc2d..190cd61a3a8e4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -735,7 +735,12 @@ private IndexMetadataStats randomIndexStats(int numberOfShards) { IndexWriteLoad.Builder indexWriteLoadBuilder = IndexWriteLoad.builder(numberOfShards); int numberOfPopulatedWriteLoads = randomIntBetween(0, numberOfShards); for (int i = 0; i < numberOfPopulatedWriteLoads; i++) { - indexWriteLoadBuilder.withShardWriteLoad(i, randomDoubleBetween(0.0, 128.0, true), randomNonNegativeLong()); + indexWriteLoadBuilder.withShardWriteLoad( + i, + randomDoubleBetween(0.0, 128.0, true), + randomDoubleBetween(0.0, 128.0, true), + randomNonNegativeLong() + ); } return new IndexMetadataStats(indexWriteLoadBuilder.build(), randomLongBetween(100, 1024), randomIntBetween(1, 2)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java index 159b747cdfd15..e55da0d65b950 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java @@ -10,9 +10,17 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import java.io.IOException; +import java.util.OptionalDouble; +import java.util.OptionalLong; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; public class IndexWriteLoadTests extends ESTestCase { @@ -22,26 +30,71 @@ public void testGetWriteLoadForShardAndGetUptimeInMillisForShard() { final IndexWriteLoad.Builder indexWriteLoadBuilder = IndexWriteLoad.builder(numberOfShards); final double[] populatedShardWriteLoads = new double[numberOfPopulatedShards]; + final double[] populatedShardRecentWriteLoads = new double[numberOfPopulatedShards]; final long[] populatedShardUptimes = new long[numberOfPopulatedShards]; for (int shardId = 0; shardId < numberOfPopulatedShards; shardId++) { double writeLoad = randomDoubleBetween(1, 128, true); + double recentWriteLoad = randomDoubleBetween(1, 128, true); long uptimeInMillis = randomNonNegativeLong(); populatedShardWriteLoads[shardId] = writeLoad; + populatedShardRecentWriteLoads[shardId] = recentWriteLoad; populatedShardUptimes[shardId] = uptimeInMillis; - indexWriteLoadBuilder.withShardWriteLoad(shardId, writeLoad, uptimeInMillis); + indexWriteLoadBuilder.withShardWriteLoad(shardId, writeLoad, recentWriteLoad, uptimeInMillis); } final IndexWriteLoad indexWriteLoad = indexWriteLoadBuilder.build(); for (int shardId = 0; shardId < numberOfShards; shardId++) { if (shardId < numberOfPopulatedShards) { - assertThat(indexWriteLoad.getWriteLoadForShard(shardId).isPresent(), is(equalTo(true))); - assertThat(indexWriteLoad.getWriteLoadForShard(shardId).getAsDouble(), is(equalTo(populatedShardWriteLoads[shardId]))); - assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).isPresent(), is(equalTo(true))); - assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).getAsLong(), is(equalTo(populatedShardUptimes[shardId]))); + assertThat(indexWriteLoad.getWriteLoadForShard(shardId), equalTo(OptionalDouble.of(populatedShardWriteLoads[shardId]))); + assertThat( + indexWriteLoad.getRecentWriteLoadForShard(shardId), + equalTo(OptionalDouble.of(populatedShardRecentWriteLoads[shardId])) + ); + assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId), equalTo(OptionalLong.of(populatedShardUptimes[shardId]))); } else { - assertThat(indexWriteLoad.getWriteLoadForShard(shardId).isPresent(), is(false)); - assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).isPresent(), is(false)); + assertThat(indexWriteLoad.getWriteLoadForShard(shardId), equalTo(OptionalDouble.empty())); + assertThat(indexWriteLoad.getRecentWriteLoadForShard(shardId), equalTo(OptionalDouble.empty())); + assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId), equalTo(OptionalLong.empty())); } } } + + public void testXContent_roundTrips() throws IOException { + IndexWriteLoad original = randomIndexWriteLoad(); + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().value(original).endObject(); + IndexWriteLoad roundTripped = IndexWriteLoad.fromXContent(createParser(xContentBuilder)); + assertThat(roundTripped, equalTo(original)); + } + + public void testXContent_missingRecentWriteLoad() throws IOException { + // Simulate a JSON serialization from before we added recent write load: + IndexWriteLoad original = randomIndexWriteLoad(); + XContentBuilder builder = XContentBuilder.builder( + XContentType.JSON, + emptySet(), + singleton(IndexWriteLoad.SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName()) + ).startObject().value(original).endObject(); + // Deserialize, and assert that it matches the original except the recent write loads are all missing: + IndexWriteLoad roundTripped = IndexWriteLoad.fromXContent(createParser(builder)); + for (int i = 0; i < original.numberOfShards(); i++) { + assertThat(roundTripped.getUptimeInMillisForShard(i), equalTo(original.getUptimeInMillisForShard(i))); + assertThat(roundTripped.getWriteLoadForShard(i), equalTo(original.getWriteLoadForShard(i))); + assertThat(roundTripped.getRecentWriteLoadForShard(i), equalTo(OptionalDouble.empty())); + } + } + + private static IndexWriteLoad randomIndexWriteLoad() { + int numberOfPopulatedShards = 10; + int numberOfShards = randomIntBetween(numberOfPopulatedShards, 20); + IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); + for (int shardId = 0; shardId < numberOfPopulatedShards; shardId++) { + builder.withShardWriteLoad( + shardId, + randomDoubleBetween(1, 128, true), + randomDoubleBetween(1, 128, true), + randomNonNegativeLong() + ); + } + return builder.build(); + } } diff --git a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java index 0ae3a7eb8a67b..24c24f0846d44 100644 --- a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java +++ b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java @@ -128,11 +128,11 @@ public void testUptimeIsUsedToWeightWriteLoad() { DataStream.getDefaultBackingIndexName(dataStreamName, 0), numberOfShards, IndexWriteLoad.builder(numberOfShards) - .withShardWriteLoad(0, 12, 80) - .withShardWriteLoad(1, 24, 5) - .withShardWriteLoad(2, 24, 5) - .withShardWriteLoad(3, 24, 5) - .withShardWriteLoad(4, 24, 5) + .withShardWriteLoad(0, 12, 999, 80) + .withShardWriteLoad(1, 24, 999, 5) + .withShardWriteLoad(2, 24, 999, 5) + .withShardWriteLoad(3, 24, 999, 5) + .withShardWriteLoad(4, 24, 999, 5) .build(), System.currentTimeMillis() - (maxIndexAge.millis() / 2) ); @@ -234,7 +234,7 @@ public void testWriteLoadForecast() { { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( - List.of(IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 100).build()) + List.of(IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 999, 100).build()) ); assertThat(writeLoadForecast.isPresent(), is(true)); assertThat(writeLoadForecast.getAsDouble(), is(equalTo(12.0))); @@ -244,11 +244,11 @@ public void testWriteLoadForecast() { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( IndexWriteLoad.builder(5) - .withShardWriteLoad(0, 12, 80) - .withShardWriteLoad(1, 24, 5) - .withShardWriteLoad(2, 24, 5) - .withShardWriteLoad(3, 24, 5) - .withShardWriteLoad(4, 24, 5) + .withShardWriteLoad(0, 12, 999, 80) + .withShardWriteLoad(1, 24, 999, 5) + .withShardWriteLoad(2, 24, 999, 5) + .withShardWriteLoad(3, 24, 999, 5) + .withShardWriteLoad(4, 24, 999, 5) .build() ) ); @@ -260,14 +260,14 @@ public void testWriteLoadForecast() { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( IndexWriteLoad.builder(5) - .withShardWriteLoad(0, 12, 80) - .withShardWriteLoad(1, 24, 5) - .withShardWriteLoad(2, 24, 5) - .withShardWriteLoad(3, 24, 5) - .withShardWriteLoad(4, 24, 4) + .withShardWriteLoad(0, 12, 999, 80) + .withShardWriteLoad(1, 24, 999, 5) + .withShardWriteLoad(2, 24, 999, 5) + .withShardWriteLoad(3, 24, 999, 5) + .withShardWriteLoad(4, 24, 999, 4) .build(), // Since this shard uptime is really low, it doesn't add much to the avg - IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 1).build() + IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 999, 1).build() ) ); assertThat(writeLoadForecast.isPresent(), is(true)); @@ -277,9 +277,9 @@ public void testWriteLoadForecast() { { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( - IndexWriteLoad.builder(2).withShardWriteLoad(0, 12, 25).withShardWriteLoad(1, 12, 25).build(), + IndexWriteLoad.builder(2).withShardWriteLoad(0, 12, 999, 25).withShardWriteLoad(1, 12, 999, 25).build(), - IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 50).build() + IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 999, 50).build() ) ); assertThat(writeLoadForecast.isPresent(), is(true)); @@ -291,14 +291,14 @@ public void testWriteLoadForecast() { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( IndexWriteLoad.builder(3) - .withShardWriteLoad(0, 25, 1) - .withShardWriteLoad(1, 18, 1) - .withShardWriteLoad(2, 23, 1) + .withShardWriteLoad(0, 25, 999, 1) + .withShardWriteLoad(1, 18, 999, 1) + .withShardWriteLoad(2, 23, 999, 1) .build(), - IndexWriteLoad.builder(2).withShardWriteLoad(0, 6, 1).withShardWriteLoad(1, 8, 1).build(), + IndexWriteLoad.builder(2).withShardWriteLoad(0, 6, 999, 1).withShardWriteLoad(1, 8, 999, 1).build(), - IndexWriteLoad.builder(1).withShardWriteLoad(0, 15, 1).build() + IndexWriteLoad.builder(1).withShardWriteLoad(0, 15, 999, 1).build() ) ); assertThat(writeLoadForecast.isPresent(), is(true)); @@ -309,7 +309,12 @@ public void testWriteLoadForecast() { private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); for (int shardId = 0; shardId < numberOfShards; shardId++) { - builder.withShardWriteLoad(shardId, randomDoubleBetween(0, 64, true), randomLongBetween(1, 10)); + builder.withShardWriteLoad( + shardId, + randomDoubleBetween(0, 64, true), + randomDoubleBetween(0, 64, true), + randomLongBetween(1, 10) + ); } return builder.build(); }