Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129990.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129990
summary: Make forecast write load accurate when shard numbers change
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public String toString() {
* <p>If the recommendation is to INCREASE/DECREASE shards the reported cooldown period will be TimeValue.ZERO.
* If the auto sharding service thinks the number of shards must be changed but it can't recommend a change due to the cooldown
* period not lapsing, the result will be of type {@link AutoShardingType#COOLDOWN_PREVENTED_INCREASE} or
* {@link AutoShardingType#COOLDOWN_PREVENTED_INCREASE} with the remaining cooldown configured and the number of shards that should
* {@link AutoShardingType#COOLDOWN_PREVENTED_DECREASE} with the remaining cooldown configured and the number of shards that should
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a typo too

* be configured for the data stream once the remaining cooldown lapses as the target number of shards.
*
* <p>The NOT_APPLICABLE type result will report a cooldown period of TimeValue.MAX_VALUE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ public ProjectMetadata.Builder withWriteLoadForecastForWriteIndex(String dataStr
}

final IndexMetadata writeIndex = metadata.getSafe(dataStream.getWriteIndex());
metadata.put(IndexMetadata.builder(writeIndex).indexWriteLoadForecast(forecastIndexWriteLoad.getAsDouble()).build(), false);
metadata.put(
IndexMetadata.builder(writeIndex)
.indexWriteLoadForecast(forecastIndexWriteLoad.getAsDouble() / writeIndex.getNumberOfShards())
.build(),
false
);

return metadata;
}
Expand All @@ -130,24 +135,41 @@ private static void clearPreviousForecast(DataStream dataStream, ProjectMetadata
}

// Visible for testing

/**
* This calculates the weighted average total write-load for all recent indices.
*
* @param indicesWriteLoadWithinMaxAgeRange The indices considered "recent"
* @return The weighted average total write-load. To get the per-shard write load, this number must be divided by the number of shards
*/
static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLoadWithinMaxAgeRange) {
double totalWeightedWriteLoad = 0;
long totalShardUptime = 0;
double allIndicesWriteLoad = 0;
long allIndicesUptime = 0;
for (IndexWriteLoad writeLoad : indicesWriteLoadWithinMaxAgeRange) {
double totalShardWriteLoad = 0;
long totalShardUptimeInMillis = 0;
long maxShardUptimeInMillis = 0;
for (int shardId = 0; shardId < writeLoad.numberOfShards(); shardId++) {
final OptionalDouble writeLoadForShard = writeLoad.getWriteLoadForShard(shardId);
final OptionalLong uptimeInMillisForShard = writeLoad.getUptimeInMillisForShard(shardId);
if (writeLoadForShard.isPresent()) {
assert uptimeInMillisForShard.isPresent();
double shardWriteLoad = writeLoadForShard.getAsDouble();
long shardUptimeInMillis = uptimeInMillisForShard.getAsLong();
totalWeightedWriteLoad += shardWriteLoad * shardUptimeInMillis;
totalShardUptime += shardUptimeInMillis;
totalShardWriteLoad += shardWriteLoad * shardUptimeInMillis;
totalShardUptimeInMillis += shardUptimeInMillis;
maxShardUptimeInMillis = Math.max(maxShardUptimeInMillis, shardUptimeInMillis);
}
}
double weightedAverageShardWriteLoad = totalShardWriteLoad / totalShardUptimeInMillis;
double totalIndexWriteLoad = weightedAverageShardWriteLoad * writeLoad.numberOfShards();
// We assume the index shards for a single index lived for approximately the same amount of time,
// so it's safe to weigh the index write load by the max shard uptime
allIndicesWriteLoad += totalIndexWriteLoad * maxShardUptimeInMillis;
allIndicesUptime += maxShardUptimeInMillis;
}

return totalShardUptime == 0 ? OptionalDouble.empty() : OptionalDouble.of(totalWeightedWriteLoad / totalShardUptime);
return allIndicesUptime == 0 ? OptionalDouble.empty() : OptionalDouble.of(allIndicesWriteLoad / allIndicesUptime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void testWriteLoadForecast() {
)
);
assertThat(writeLoadForecast.isPresent(), is(true));
assertThat(writeLoadForecast.getAsDouble(), is(equalTo(14.4)));
assertThat(writeLoadForecast.getAsDouble(), is(equalTo(72.0)));
}

{
Expand All @@ -264,14 +264,14 @@ public void testWriteLoadForecast() {
.withShardWriteLoad(1, 24, 999, 999, 5)
.withShardWriteLoad(2, 24, 999, 999, 5)
.withShardWriteLoad(3, 24, 999, 999, 5)
.withShardWriteLoad(4, 24, 999, 999, 4)
.withShardWriteLoad(4, 24, 999, 999, 5)
.build(),
// Since this shard uptime is really low, it doesn't add much to the avg
IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 999, 999, 1).build()
)
);
assertThat(writeLoadForecast.isPresent(), is(true));
assertThat(writeLoadForecast.getAsDouble(), is(equalTo(15.36)));
assertThat(writeLoadForecast.getAsDouble(), is(closeTo(72.59, 0.01)));
}

{
Expand All @@ -283,7 +283,7 @@ public void testWriteLoadForecast() {
)
);
assertThat(writeLoadForecast.isPresent(), is(true));
assertThat(writeLoadForecast.getAsDouble(), is(equalTo(12.0)));
assertThat(writeLoadForecast.getAsDouble(), is(equalTo(16.0)));
}

{
Expand All @@ -302,7 +302,7 @@ public void testWriteLoadForecast() {
)
);
assertThat(writeLoadForecast.isPresent(), is(true));
assertThat(writeLoadForecast.getAsDouble(), is(closeTo(15.83, 0.01)));
assertThat(writeLoadForecast.getAsDouble(), is(closeTo(31.66, 0.01)));
}
}

Expand Down
Loading