diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java index f6867dff1a14f..a93e1d7ea9fb1 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -528,7 +528,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex, stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); stats.store = new StoreStats(); stats.indexing = new IndexingStats( - new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, targetWriteLoad, 1, 0.123, 0.234) + new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, 234, 234, 1000, 0.123, targetWriteLoad) ); return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index 72857c8249055..61eb2867790a2 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -136,7 +136,7 @@ public enum WriteLoadMetric { public static final Setting DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC = Setting.enumSetting( WriteLoadMetric.class, "data_streams.auto_sharding.increase_shards.load_metric", - WriteLoadMetric.ALL_TIME, + WriteLoadMetric.PEAK, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -147,7 +147,7 @@ public enum WriteLoadMetric { public static final Setting DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC = Setting.enumSetting( WriteLoadMetric.class, "data_streams.auto_sharding.decrease_shards.load_metric", - WriteLoadMetric.ALL_TIME, + WriteLoadMetric.PEAK, Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index a174a8ae1cea1..403c395e31b57 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -148,7 +148,7 @@ public void testCalculateValidations() { AutoShardingResult autoShardingResult = disabledAutoshardingService.calculate( state.projectState(projectId), dataStream, - createIndexStats(1, 2.0, 9999.0, 9999.0) + createIndexStats(1, 9999.0, 9999.0, 2.0) ); assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); } @@ -182,7 +182,7 @@ public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(1, 2.5, 9999.0, 9999.0) + createIndexStats(1, 9999.0, 9999.0, 2.5) ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero @@ -221,7 +221,7 @@ public void testCalculateIncreaseShardingRecommendations_preventedByCooldown() { AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(1, 2.5, 9999.0, 9999.0) + createIndexStats(1, 9999.0, 9999.0, 2.5) ); assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_INCREASE)); // no pre-existing scaling event so the cool down must be zero @@ -259,7 +259,7 @@ public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousI AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(1, 2.5, 9999.0, 9999.0) + createIndexStats(1, 9999.0, 9999.0, 2.5) ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero @@ -268,7 +268,7 @@ public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousI } public void testCalculateIncreaseShardingRecommendations_usingRecentWriteLoad() { - // Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than ALL_TIME write load + // Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than PEAK write load var projectId = randomProjectIdOrDefault(); ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( @@ -299,8 +299,8 @@ public void testCalculateIncreaseShardingRecommendations_usingRecentWriteLoad() }); } - public void testCalculateIncreaseShardingRecommendations_usingPeakWriteLoad() { - // Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with PEAK rather than ALL_TIME write load + public void testCalculateIncreaseShardingRecommendations_usingAllTimeWriteLoad() { + // Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with ALL_TIME rather than PEAK write load var projectId = randomProjectIdOrDefault(); ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( @@ -318,11 +318,11 @@ public void testCalculateIncreaseShardingRecommendations_usingPeakWriteLoad() { .putProjectMetadata(builder.build()) .build(); - doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.ALL_TIME, () -> { AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(1, 9999.0, 9999.0, 2.5) + createIndexStats(1, 2.5, 9999.0, 9999.0) ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero @@ -343,7 +343,7 @@ public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough( 3, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(3, 0.25, 9999.0, 9999.0), + getWriteLoad(3, 9999.0, 9999.0, 0.25), null ); builder.put(dataStream); @@ -355,7 +355,7 @@ public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough( AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + createIndexStats(3, 9999.0, 9999.0, 1.0 / 3) ); // the cooldown period for the decrease shards event hasn't lapsed since the data stream was created assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); @@ -380,7 +380,7 @@ public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 0.333, 9999.0, 9999.0), + getWriteLoad(3, 9999.0, 9999.0, 0.333), null ); builder.put(dataStream); @@ -392,7 +392,7 @@ public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + createIndexStats(3, 9999.0, 9999.0, 1.0 / 3) ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); @@ -424,7 +424,7 @@ public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousD now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 0.333, 9999.0, 9999.0), + getWriteLoad(3, 9999.0, 9999.0, 0.333), autoShardingEvent ); builder.put(dataStream); @@ -436,7 +436,7 @@ public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousD AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + createIndexStats(3, 9999.0, 9999.0, 1.0 / 3) ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); @@ -466,7 +466,7 @@ public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() { now - TimeValue.timeValueDays(1).getMillis(), now - 1000 ), - getWriteLoad(3, 0.25, 9999.0, 9999.0), + getWriteLoad(3, 9999.0, 9999.0, 0.25), autoShardingEvent ); builder.put(dataStream); @@ -478,7 +478,7 @@ public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() { AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + createIndexStats(3, 9999.0, 9999.0, 1.0 / 3) ); assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); @@ -515,7 +515,7 @@ public void testCalculateDecreaseShardingRecommendations_noChangeRequired() { AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 4.0 / 3, 9999.0, 9999.0) + createIndexStats(3, 9999.0, 9999.0, 4.0 / 3) ); assertThat(autoShardingResult.type(), is(NO_CHANGE_REQUIRED)); assertThat(autoShardingResult.targetNumberOfShards(), is(3)); @@ -523,7 +523,7 @@ public void testCalculateDecreaseShardingRecommendations_noChangeRequired() { } public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad() { - // Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than ALL_TIME write load + // Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than PEAK write load var projectId = randomProjectIdOrDefault(); ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( @@ -551,7 +551,7 @@ public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad() AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 0.5 / 3, 1.0 / 3, 9999.0) + createIndexStats(3, 9999.0, 1.0 / 3, 0.5 / 3) ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); @@ -559,8 +559,8 @@ public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad() }); } - public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() { - // Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with PEAK rather than ALL_TIME write load + public void testCalculateDecreaseShardingRecommendations_usingAllTimeWriteLoad() { + // Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with ALL_TIME rather than PEAK write load var projectId = randomProjectIdOrDefault(); ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( @@ -588,7 +588,7 @@ public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() { AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 0.5 / 3, 9999.0, 1.0 / 3) + createIndexStats(3, 1.0 / 3, 9999.0, 0.5 / 3) ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); @@ -629,7 +629,7 @@ public void testCalculateDecreaseShardingRecommendations_correctDecisionData() { AutoShardingResult autoShardingResult = service.calculate( state.projectState(projectId), dataStream, - createIndexStats(3, 1.9 / 3, 0.3 / 3, 0.9 / 3) + createIndexStats(3, 0.9 / 3, 0.3 / 3, 1.9 / 3) ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.currentNumberOfShards(), is(3)); @@ -653,13 +653,13 @@ public void testCalculateDecreaseShardingRecommendations_correctDecisionData() { decision.inputs().maxWriteThreads(), equalTo(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS.getDefault(Settings.EMPTY)) ); - assertThat(decision.inputs().increaseShardsMetric(), equalTo(WriteLoadMetric.ALL_TIME)); + assertThat(decision.inputs().increaseShardsMetric(), equalTo(WriteLoadMetric.PEAK)); assertThat(decision.inputs().decreaseShardsMetric(), equalTo(WriteLoadMetric.RECENT)); assertThat(decision.inputs().dataStream(), equalTo(dataStreamName)); assertThat(decision.inputs().writeIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 5, now - 1000))); - assertThat(decision.inputs().writeIndexAllTimeLoad(), closeTo(1.9, 1.0e-8)); + assertThat(decision.inputs().writeIndexAllTimeLoad(), closeTo(0.9, 1.0e-8)); assertThat(decision.inputs().writeIndexRecentLoad(), closeTo(0.3, 1.0e-8)); - assertThat(decision.inputs().writeIndexPeakLoad(), closeTo(0.9, 1.0e-8)); + assertThat(decision.inputs().writeIndexPeakLoad(), closeTo(1.9, 1.0e-8)); assertThat(decision.inputs().currentNumberOfWriteIndexShards(), equalTo(3)); assertThat(decision.increaseCalculation().writeIndexLoadForIncrease(), closeTo(1.9, 1.0e-8)); // all-time // Increase shard count based on all-time load of 1.9 for write index: @@ -1022,7 +1022,7 @@ public void testAutoShardingResultValidation_validCooldownPreventedDecrease() { assertThat(cooldownPreventedDecrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(7))); } - IndexStats createIndexStats(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad, double shardPeakWriteLoad) { + private IndexStats createIndexStats(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad, double shardPeakWriteLoad) { String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 99); // the generation number here is not used Index index = new Index(indexName, randomUUID()); IndexStats.IndexStatsBuilder builder = new IndexStats.IndexStatsBuilder(indexName, randomUUID(), null, null);