Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
stats.store = new StoreStats();
stats.indexing = new IndexingStats(
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, targetWriteLoad, 1, 0.123, 0.234)
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, 234, 234, 1000, 0.123, targetWriteLoad)
);
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public enum WriteLoadMetric {
public static final Setting<WriteLoadMetric> DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC = Setting.enumSetting(
WriteLoadMetric.class,
"data_streams.auto_sharding.increase_shards.load_metric",
WriteLoadMetric.ALL_TIME,
WriteLoadMetric.PEAK,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand All @@ -147,7 +147,7 @@ public enum WriteLoadMetric {
public static final Setting<WriteLoadMetric> DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC = Setting.enumSetting(
WriteLoadMetric.class,
"data_streams.auto_sharding.decrease_shards.load_metric",
WriteLoadMetric.ALL_TIME,
WriteLoadMetric.PEAK,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void testCalculateValidations() {
AutoShardingResult autoShardingResult = disabledAutoshardingService.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(1, 2.0, 9999.0, 9999.0)
createIndexStats(1, 9999.0, 9999.0, 2.0)
);
assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT));
}
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(1, 2.5, 9999.0, 9999.0)
createIndexStats(1, 9999.0, 9999.0, 2.5)
);
assertThat(autoShardingResult.type(), is(INCREASE_SHARDS));
// no pre-existing scaling event so the cool down must be zero
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testCalculateIncreaseShardingRecommendations_preventedByCooldown() {
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(1, 2.5, 9999.0, 9999.0)
createIndexStats(1, 9999.0, 9999.0, 2.5)
);
assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_INCREASE));
// no pre-existing scaling event so the cool down must be zero
Expand Down Expand Up @@ -259,7 +259,7 @@ public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousI
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(1, 2.5, 9999.0, 9999.0)
createIndexStats(1, 9999.0, 9999.0, 2.5)
);
assertThat(autoShardingResult.type(), is(INCREASE_SHARDS));
// no pre-existing scaling event so the cool down must be zero
Expand All @@ -268,7 +268,7 @@ public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousI
}

public void testCalculateIncreaseShardingRecommendations_usingRecentWriteLoad() {
// Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than ALL_TIME write load
// Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than PEAK write load
var projectId = randomProjectIdOrDefault();
ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId);
DataStream dataStream = createDataStream(
Expand Down Expand Up @@ -299,8 +299,8 @@ public void testCalculateIncreaseShardingRecommendations_usingRecentWriteLoad()
});
}

public void testCalculateIncreaseShardingRecommendations_usingPeakWriteLoad() {
// Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with PEAK rather than ALL_TIME write load
public void testCalculateIncreaseShardingRecommendations_usingAllTimeWriteLoad() {
// Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with ALL_TIME rather than PEAK write load
var projectId = randomProjectIdOrDefault();
ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId);
DataStream dataStream = createDataStream(
Expand All @@ -318,11 +318,11 @@ public void testCalculateIncreaseShardingRecommendations_usingPeakWriteLoad() {
.putProjectMetadata(builder.build())
.build();

doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> {
doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.ALL_TIME, () -> {
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(1, 9999.0, 9999.0, 2.5)
createIndexStats(1, 2.5, 9999.0, 9999.0)
);
assertThat(autoShardingResult.type(), is(INCREASE_SHARDS));
// no pre-existing scaling event so the cool down must be zero
Expand All @@ -343,7 +343,7 @@ public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough(
3,
now,
List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000),
getWriteLoad(3, 0.25, 9999.0, 9999.0),
getWriteLoad(3, 9999.0, 9999.0, 0.25),
null
);
builder.put(dataStream);
Expand All @@ -355,7 +355,7 @@ public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough(
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 1.0 / 3, 9999.0, 9999.0)
createIndexStats(3, 9999.0, 9999.0, 1.0 / 3)
);
// the cooldown period for the decrease shards event hasn't lapsed since the data stream was created
assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE));
Expand All @@ -380,7 +380,7 @@ public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent
now - TimeValue.timeValueDays(2).getMillis(),
now - 1000
),
getWriteLoad(3, 0.333, 9999.0, 9999.0),
getWriteLoad(3, 9999.0, 9999.0, 0.333),
null
);
builder.put(dataStream);
Expand All @@ -392,7 +392,7 @@ public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 1.0 / 3, 9999.0, 9999.0)
createIndexStats(3, 9999.0, 9999.0, 1.0 / 3)
);
assertThat(autoShardingResult.type(), is(DECREASE_SHARDS));
assertThat(autoShardingResult.targetNumberOfShards(), is(1));
Expand Down Expand Up @@ -424,7 +424,7 @@ public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousD
now - TimeValue.timeValueDays(2).getMillis(),
now - 1000
),
getWriteLoad(3, 0.333, 9999.0, 9999.0),
getWriteLoad(3, 9999.0, 9999.0, 0.333),
autoShardingEvent
);
builder.put(dataStream);
Expand All @@ -436,7 +436,7 @@ public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousD
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 1.0 / 3, 9999.0, 9999.0)
createIndexStats(3, 9999.0, 9999.0, 1.0 / 3)
);
assertThat(autoShardingResult.type(), is(DECREASE_SHARDS));
assertThat(autoShardingResult.targetNumberOfShards(), is(1));
Expand Down Expand Up @@ -466,7 +466,7 @@ public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() {
now - TimeValue.timeValueDays(1).getMillis(),
now - 1000
),
getWriteLoad(3, 0.25, 9999.0, 9999.0),
getWriteLoad(3, 9999.0, 9999.0, 0.25),
autoShardingEvent
);
builder.put(dataStream);
Expand All @@ -478,7 +478,7 @@ public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() {
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 1.0 / 3, 9999.0, 9999.0)
createIndexStats(3, 9999.0, 9999.0, 1.0 / 3)
);
assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE));
assertThat(autoShardingResult.targetNumberOfShards(), is(1));
Expand Down Expand Up @@ -515,15 +515,15 @@ public void testCalculateDecreaseShardingRecommendations_noChangeRequired() {
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 4.0 / 3, 9999.0, 9999.0)
createIndexStats(3, 9999.0, 9999.0, 4.0 / 3)
);
assertThat(autoShardingResult.type(), is(NO_CHANGE_REQUIRED));
assertThat(autoShardingResult.targetNumberOfShards(), is(3));
assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO));
}

public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad() {
// Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than ALL_TIME write load
// Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than PEAK write load
var projectId = randomProjectIdOrDefault();
ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId);
DataStream dataStream = createDataStream(
Expand Down Expand Up @@ -551,16 +551,16 @@ public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad()
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 0.5 / 3, 1.0 / 3, 9999.0)
createIndexStats(3, 9999.0, 1.0 / 3, 0.5 / 3)
);
assertThat(autoShardingResult.type(), is(DECREASE_SHARDS));
assertThat(autoShardingResult.targetNumberOfShards(), is(1));
assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO));
});
}

public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() {
// Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with PEAK rather than ALL_TIME write load
public void testCalculateDecreaseShardingRecommendations_usingAllTimeWriteLoad() {
// Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with ALL_TIME rather than PEAK write load
var projectId = randomProjectIdOrDefault();
ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId);
DataStream dataStream = createDataStream(
Expand Down Expand Up @@ -588,7 +588,7 @@ public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() {
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 0.5 / 3, 9999.0, 1.0 / 3)
createIndexStats(3, 1.0 / 3, 9999.0, 0.5 / 3)
);
assertThat(autoShardingResult.type(), is(DECREASE_SHARDS));
assertThat(autoShardingResult.targetNumberOfShards(), is(1));
Expand Down Expand Up @@ -629,7 +629,7 @@ public void testCalculateDecreaseShardingRecommendations_correctDecisionData() {
AutoShardingResult autoShardingResult = service.calculate(
state.projectState(projectId),
dataStream,
createIndexStats(3, 1.9 / 3, 0.3 / 3, 0.9 / 3)
createIndexStats(3, 0.9 / 3, 0.3 / 3, 1.9 / 3)
);
assertThat(autoShardingResult.type(), is(DECREASE_SHARDS));
assertThat(autoShardingResult.currentNumberOfShards(), is(3));
Expand All @@ -653,13 +653,13 @@ public void testCalculateDecreaseShardingRecommendations_correctDecisionData() {
decision.inputs().maxWriteThreads(),
equalTo(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS.getDefault(Settings.EMPTY))
);
assertThat(decision.inputs().increaseShardsMetric(), equalTo(WriteLoadMetric.ALL_TIME));
assertThat(decision.inputs().increaseShardsMetric(), equalTo(WriteLoadMetric.PEAK));
assertThat(decision.inputs().decreaseShardsMetric(), equalTo(WriteLoadMetric.RECENT));
assertThat(decision.inputs().dataStream(), equalTo(dataStreamName));
assertThat(decision.inputs().writeIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 5, now - 1000)));
assertThat(decision.inputs().writeIndexAllTimeLoad(), closeTo(1.9, 1.0e-8));
assertThat(decision.inputs().writeIndexAllTimeLoad(), closeTo(0.9, 1.0e-8));
assertThat(decision.inputs().writeIndexRecentLoad(), closeTo(0.3, 1.0e-8));
assertThat(decision.inputs().writeIndexPeakLoad(), closeTo(0.9, 1.0e-8));
assertThat(decision.inputs().writeIndexPeakLoad(), closeTo(1.9, 1.0e-8));
assertThat(decision.inputs().currentNumberOfWriteIndexShards(), equalTo(3));
assertThat(decision.increaseCalculation().writeIndexLoadForIncrease(), closeTo(1.9, 1.0e-8)); // all-time
// Increase shard count based on all-time load of 1.9 for write index:
Expand Down Expand Up @@ -1022,7 +1022,7 @@ public void testAutoShardingResultValidation_validCooldownPreventedDecrease() {
assertThat(cooldownPreventedDecrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(7)));
}

IndexStats createIndexStats(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad, double shardPeakWriteLoad) {
private IndexStats createIndexStats(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad, double shardPeakWriteLoad) {
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 99); // the generation number here is not used
Index index = new Index(indexName, randomUUID());
IndexStats.IndexStatsBuilder builder = new IndexStats.IndexStatsBuilder(indexName, randomUUID(), null, null);
Expand Down