-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Improve accuracy of write load forecast when shard numbers change #129990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
nicktindall
merged 15 commits into
elastic:main
from
nicktindall:fix_type_ds_autosharding
Jun 27, 2025
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
6cd413a
Fix typo in DataStreamAutoShardingService#calculate javadoc
nicktindall 73ad77b
Make write load forecast accurate when shard count is dynamic
nicktindall c9c0ad8
Update docs/changelog/129990.yaml
nicktindall abbbaf6
Merge branch 'main' into fix_type_ds_autosharding
nicktindall c5054e8
Merge remote-tracking branch 'origin/main' into fix_type_ds_autosharding
nicktindall 241adc2
Add specific tests
nicktindall ec47a3c
Move comment closer
nicktindall bb63a1e
Restore original shard count
nicktindall 240f00e
Make less verbose
nicktindall d89c2b9
Fix naming
nicktindall a46ea9b
Merge branch 'main' into fix_type_ds_autosharding
nicktindall fc9d9f8
Improve reasoning for choosing max uptime to weight index contributio…
nicktindall b64b9dc
Add justification for extrapolation
nicktindall 01364b2
Merge branch 'main' into fix_type_ds_autosharding
nicktindall 3cae4f7
Merge branch 'main' into fix_type_ds_autosharding
nicktindall File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 129990 | ||
| summary: Make forecast write load accurate when shard numbers change | ||
| area: Allocation | ||
| type: bug | ||
| issues: [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
|
|
||
| import org.apache.logging.log4j.Level; | ||
| import org.apache.logging.log4j.core.LogEvent; | ||
| import org.apache.lucene.util.hnsw.IntToIntFunction; | ||
| import org.elasticsearch.cluster.metadata.DataStream; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadataStats; | ||
|
|
@@ -24,16 +25,19 @@ | |
| import org.elasticsearch.test.MockLog; | ||
| import org.elasticsearch.threadpool.TestThreadPool; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.hamcrest.Matcher; | ||
| import org.junit.After; | ||
| import org.junit.Before; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.OptionalDouble; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad; | ||
| import static org.hamcrest.Matchers.closeTo; | ||
|
|
@@ -42,6 +46,7 @@ | |
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.greaterThan; | ||
| import static org.hamcrest.Matchers.is; | ||
| import static org.hamcrest.Matchers.lessThan; | ||
|
|
||
| public class LicensedWriteLoadForecasterTests extends ESTestCase { | ||
| ThreadPool threadPool; | ||
|
|
@@ -67,33 +72,15 @@ public void testWriteLoadForecastIsAddedToWriteIndex() { | |
|
|
||
| writeLoadForecaster.refreshLicense(); | ||
|
|
||
| final ProjectMetadata.Builder metadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); | ||
| final String dataStreamName = "logs-es"; | ||
| final int numberOfBackingIndices = 10; | ||
| final int numberOfShards = randomIntBetween(1, 5); | ||
| final List<Index> backingIndices = new ArrayList<>(); | ||
| for (int i = 0; i < numberOfBackingIndices; i++) { | ||
| final IndexMetadata indexMetadata = createIndexMetadata( | ||
| DataStream.getDefaultBackingIndexName(dataStreamName, i), | ||
| numberOfShards, | ||
| randomIndexWriteLoad(numberOfShards), | ||
| System.currentTimeMillis() - (maxIndexAge.millis() / 2) | ||
| ); | ||
| backingIndices.add(indexMetadata.getIndex()); | ||
| metadataBuilder.put(indexMetadata, false); | ||
| } | ||
|
|
||
| final IndexMetadata writeIndexMetadata = createIndexMetadata( | ||
| DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices), | ||
| numberOfShards, | ||
| null, | ||
| System.currentTimeMillis() | ||
| final ProjectMetadata.Builder metadataBuilder = createMetadataBuilderWithDataStream( | ||
| dataStreamName, | ||
| numberOfBackingIndices, | ||
| randomIntBetween(1, 5), | ||
| maxIndexAge | ||
| ); | ||
| backingIndices.add(writeIndexMetadata.getIndex()); | ||
| metadataBuilder.put(writeIndexMetadata, false); | ||
|
|
||
| final DataStream dataStream = createDataStream(dataStreamName, backingIndices); | ||
| metadataBuilder.put(dataStream); | ||
| final DataStream dataStream = metadataBuilder.dataStream(dataStreamName); | ||
|
|
||
| final ProjectMetadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( | ||
| dataStream.getName(), | ||
|
|
@@ -253,7 +240,7 @@ public void testWriteLoadForecast() { | |
| ) | ||
| ); | ||
| assertThat(writeLoadForecast.isPresent(), is(true)); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(equalTo(14.4))); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(equalTo(72.0))); | ||
| } | ||
|
|
||
| { | ||
|
|
@@ -264,14 +251,14 @@ public void testWriteLoadForecast() { | |
| .withShardWriteLoad(1, 24, 999, 999, 5) | ||
| .withShardWriteLoad(2, 24, 999, 999, 5) | ||
| .withShardWriteLoad(3, 24, 999, 999, 5) | ||
| .withShardWriteLoad(4, 24, 999, 999, 4) | ||
| .withShardWriteLoad(4, 24, 999, 999, 5) | ||
| .build(), | ||
| // Since this shard uptime is really low, it doesn't add much to the avg | ||
| IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 999, 999, 1).build() | ||
| ) | ||
| ); | ||
| assertThat(writeLoadForecast.isPresent(), is(true)); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(equalTo(15.36))); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(closeTo(72.59, 0.01))); | ||
| } | ||
|
|
||
| { | ||
|
|
@@ -283,7 +270,7 @@ public void testWriteLoadForecast() { | |
| ) | ||
| ); | ||
| assertThat(writeLoadForecast.isPresent(), is(true)); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(equalTo(12.0))); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(equalTo(16.0))); | ||
| } | ||
|
|
||
| { | ||
|
|
@@ -302,7 +289,7 @@ public void testWriteLoadForecast() { | |
| ) | ||
| ); | ||
| assertThat(writeLoadForecast.isPresent(), is(true)); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(closeTo(15.83, 0.01))); | ||
| assertThat(writeLoadForecast.getAsDouble(), is(closeTo(31.66, 0.01))); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -404,4 +391,163 @@ public boolean innerMatch(LogEvent event) { | |
| ); | ||
| }, LicensedWriteLoadForecaster.class, collectingLoggingAssertion); | ||
| } | ||
|
|
||
| public void testShardIncreaseDoesNotIncreaseTotalLoad() { | ||
| testShardChangeDoesNotChangeTotalForecastLoad(ShardCountChange.INCREASE); | ||
| } | ||
|
|
||
| public void testShardDecreaseDoesNotDecreaseTotalLoad() { | ||
| testShardChangeDoesNotChangeTotalForecastLoad(ShardCountChange.DECREASE); | ||
| } | ||
|
|
||
| private void testShardChangeDoesNotChangeTotalForecastLoad(ShardCountChange shardCountChange) { | ||
| final TimeValue maxIndexAge = TimeValue.timeValueDays(7); | ||
| final AtomicBoolean hasValidLicense = new AtomicBoolean(true); | ||
| final AtomicInteger licenseCheckCount = new AtomicInteger(); | ||
| final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> { | ||
| licenseCheckCount.incrementAndGet(); | ||
| return hasValidLicense.get(); | ||
| }, threadPool, maxIndexAge); | ||
| writeLoadForecaster.refreshLicense(); | ||
|
|
||
| final String dataStreamName = randomIdentifier(); | ||
| final ProjectMetadata.Builder originalMetadata = writeLoadForecaster.withWriteLoadForecastForWriteIndex( | ||
| dataStreamName, | ||
| createMetadataBuilderWithDataStream(dataStreamName, randomIntBetween(5, 15), shardCountChange.originalShardCount(), maxIndexAge) | ||
| ); | ||
|
|
||
| // Generate the same data stream, but with a different number of shards in the write index | ||
| final ProjectMetadata.Builder changedShardCountMetadata = writeLoadForecaster.withWriteLoadForecastForWriteIndex( | ||
| dataStreamName, | ||
| updateWriteIndexShardCount(dataStreamName, originalMetadata, shardCountChange) | ||
| ); | ||
|
|
||
| IndexMetadata originalWriteIndexMetadata = originalMetadata.getSafe(originalMetadata.dataStream(dataStreamName).getWriteIndex()); | ||
| IndexMetadata changedShardCountWriteIndexMetadata = changedShardCountMetadata.getSafe( | ||
| changedShardCountMetadata.dataStream(dataStreamName).getWriteIndex() | ||
| ); | ||
|
|
||
| // The shard count changed | ||
| assertThat( | ||
| changedShardCountWriteIndexMetadata.getNumberOfShards(), | ||
| shardCountChange.expectedChangeFromOriginal(originalWriteIndexMetadata.getNumberOfShards()) | ||
| ); | ||
| // But the total write-load did not | ||
| assertThat( | ||
| changedShardCountWriteIndexMetadata.getNumberOfShards() * writeLoadForecaster.getForecastedWriteLoad( | ||
| changedShardCountWriteIndexMetadata | ||
| ).getAsDouble(), | ||
| closeTo( | ||
| originalWriteIndexMetadata.getNumberOfShards() * writeLoadForecaster.getForecastedWriteLoad(originalWriteIndexMetadata) | ||
| .getAsDouble(), | ||
| 0.01 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are sometimes small rounding errors |
||
| ) | ||
| ); | ||
| } | ||
|
|
||
| public enum ShardCountChange implements IntToIntFunction { | ||
| INCREASE(1, 15) { | ||
| @Override | ||
| public int apply(int originalShardCount) { | ||
| return randomIntBetween(originalShardCount + 1, originalShardCount * 3); | ||
| } | ||
|
|
||
| public Matcher<Integer> expectedChangeFromOriginal(int originalShardCount) { | ||
| return greaterThan(originalShardCount); | ||
| } | ||
| }, | ||
| DECREASE(10, 30) { | ||
| @Override | ||
| public int apply(int originalShardCount) { | ||
| return randomIntBetween(1, originalShardCount - 1); | ||
| } | ||
|
|
||
| public Matcher<Integer> expectedChangeFromOriginal(int originalShardCount) { | ||
| return lessThan(originalShardCount); | ||
| } | ||
| }; | ||
|
|
||
| private final int originalMinimumShardCount; | ||
| private final int originalMaximumShardCount; | ||
|
|
||
| ShardCountChange(int originalMinimumShardCount, int originalMaximumShardCount) { | ||
| this.originalMinimumShardCount = originalMinimumShardCount; | ||
| this.originalMaximumShardCount = originalMaximumShardCount; | ||
| } | ||
|
|
||
| public int originalShardCount() { | ||
| return randomIntBetween(originalMinimumShardCount, originalMaximumShardCount); | ||
| } | ||
|
|
||
| abstract Matcher<Integer> expectedChangeFromOriginal(int originalShardCount); | ||
| } | ||
|
|
||
| private ProjectMetadata.Builder updateWriteIndexShardCount( | ||
| String dataStreamName, | ||
| ProjectMetadata.Builder originalMetadata, | ||
| ShardCountChange shardCountChange | ||
| ) { | ||
| final ProjectMetadata.Builder updatedShardCountMetadata = ProjectMetadata.builder(originalMetadata.getId()); | ||
|
|
||
| final DataStream originalDataStream = originalMetadata.dataStream(dataStreamName); | ||
| final Index existingWriteIndex = Objects.requireNonNull(originalDataStream.getWriteIndex()); | ||
| final IndexMetadata originalWriteIndexMetadata = originalMetadata.getSafe(existingWriteIndex); | ||
|
|
||
| // Copy all non-write indices over unchanged | ||
| final List<IndexMetadata> backingIndexMetadatas = originalDataStream.getIndices() | ||
| .stream() | ||
| .filter(index -> index != existingWriteIndex) | ||
| .map(originalMetadata::getSafe) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| // Create a new write index with an updated shard count | ||
| final IndexMetadata writeIndexMetadata = createIndexMetadata( | ||
| DataStream.getDefaultBackingIndexName(dataStreamName, backingIndexMetadatas.size()), | ||
| shardCountChange.apply(originalWriteIndexMetadata.getNumberOfShards()), | ||
| null, | ||
| System.currentTimeMillis() | ||
| ); | ||
| backingIndexMetadatas.add(writeIndexMetadata); | ||
| backingIndexMetadatas.forEach(indexMetadata -> updatedShardCountMetadata.put(indexMetadata, false)); | ||
|
|
||
| final DataStream dataStream = createDataStream( | ||
| dataStreamName, | ||
| backingIndexMetadatas.stream().map(IndexMetadata::getIndex).toList() | ||
| ); | ||
| updatedShardCountMetadata.put(dataStream); | ||
| return updatedShardCountMetadata; | ||
| } | ||
|
|
||
| private ProjectMetadata.Builder createMetadataBuilderWithDataStream( | ||
| String dataStreamName, | ||
| int numberOfBackingIndices, | ||
| int numberOfShards, | ||
| TimeValue maxIndexAge | ||
| ) { | ||
| final ProjectMetadata.Builder metadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); | ||
| final List<Index> backingIndices = new ArrayList<>(); | ||
| for (int i = 0; i < numberOfBackingIndices; i++) { | ||
| final IndexMetadata indexMetadata = createIndexMetadata( | ||
| DataStream.getDefaultBackingIndexName(dataStreamName, i), | ||
| numberOfShards, | ||
| randomIndexWriteLoad(numberOfShards), | ||
| System.currentTimeMillis() - (maxIndexAge.millis() / 2) | ||
| ); | ||
| backingIndices.add(indexMetadata.getIndex()); | ||
| metadataBuilder.put(indexMetadata, false); | ||
| } | ||
|
|
||
| final IndexMetadata writeIndexMetadata = createIndexMetadata( | ||
| DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices), | ||
| numberOfShards, | ||
| null, | ||
| System.currentTimeMillis() | ||
| ); | ||
| backingIndices.add(writeIndexMetadata.getIndex()); | ||
| metadataBuilder.put(writeIndexMetadata, false); | ||
|
|
||
| final DataStream dataStream = createDataStream(dataStreamName, backingIndices); | ||
| metadataBuilder.put(dataStream); | ||
| return metadataBuilder; | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed a typo too