|
30 | 30 | import org.junit.Before; |
31 | 31 |
|
32 | 32 | import java.util.ArrayList; |
| 33 | +import java.util.HashSet; |
33 | 34 | import java.util.List; |
34 | 35 | import java.util.Map; |
35 | 36 | import java.util.Objects; |
36 | 37 | import java.util.OptionalDouble; |
| 38 | +import java.util.Set; |
37 | 39 | import java.util.concurrent.TimeUnit; |
38 | 40 | import java.util.concurrent.atomic.AtomicBoolean; |
39 | 41 | import java.util.concurrent.atomic.AtomicInteger; |
| 42 | +import java.util.function.LongSupplier; |
40 | 43 | import java.util.stream.Collectors; |
| 44 | +import java.util.stream.IntStream; |
41 | 45 |
|
42 | 46 | import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad; |
43 | 47 | import static org.hamcrest.Matchers.closeTo; |
|
47 | 51 | import static org.hamcrest.Matchers.greaterThan; |
48 | 52 | import static org.hamcrest.Matchers.is; |
49 | 53 | import static org.hamcrest.Matchers.lessThan; |
| 54 | +import static org.hamcrest.Matchers.not; |
| 55 | +import static org.hamcrest.Matchers.notANumber; |
50 | 56 |
|
51 | 57 | public class LicensedWriteLoadForecasterTests extends ESTestCase { |
52 | 58 | ThreadPool threadPool; |
@@ -294,14 +300,18 @@ public void testWriteLoadForecast() { |
294 | 300 | } |
295 | 301 |
|
296 | 302 | private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { |
| 303 | + return randomIndexWriteLoad(numberOfShards, () -> randomLongBetween(1, 10)); |
| 304 | + } |
| 305 | + |
| 306 | + private IndexWriteLoad randomIndexWriteLoad(int numberOfShards, LongSupplier uptimeSupplier) { |
297 | 307 | IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); |
298 | 308 | for (int shardId = 0; shardId < numberOfShards; shardId++) { |
299 | 309 | builder.withShardWriteLoad( |
300 | 310 | shardId, |
301 | 311 | randomDoubleBetween(0, 64, true), |
302 | 312 | randomDoubleBetween(0, 64, true), |
303 | 313 | randomDoubleBetween(0, 64, true), |
304 | | - randomLongBetween(1, 10) |
| 314 | + uptimeSupplier.getAsLong() |
305 | 315 | ); |
306 | 316 | } |
307 | 317 | return builder.build(); |
@@ -445,6 +455,71 @@ private void testShardChangeDoesNotChangeTotalForecastLoad(ShardCountChange shar |
445 | 455 | ); |
446 | 456 | } |
447 | 457 |
|
| 458 | + public void testCanHandleIndicesWithMissingShardWriteLoadsOrZeroUptime() { |
| 459 | + final TimeValue maxIndexAge = TimeValue.timeValueDays(7); |
| 460 | + final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge); |
| 461 | + writeLoadForecaster.refreshLicense(); |
| 462 | + |
| 463 | + final ProjectMetadata.Builder metadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); |
| 464 | + final String dataStreamName = "logs-es"; |
| 465 | + final int numberOfBackingIndices = 10; |
| 466 | + final int numberOfShards = randomIntBetween(1, 5); |
| 467 | + final int numberOfIndicesWithZeroUptime = randomBoolean() |
| 468 | + ? numberOfBackingIndices |
| 469 | + : randomIntBetween(1, numberOfBackingIndices - 1); |
| 470 | + final Set<Integer> indicesWithZeroUptime = new HashSet<>( |
| 471 | + randomSubsetOf(numberOfIndicesWithZeroUptime, IntStream.range(0, numberOfBackingIndices).boxed().collect(Collectors.toSet())) |
| 472 | + ); |
| 473 | + logger.info( |
| 474 | + "--> indices with zero uptime: {}/{} ({})", |
| 475 | + numberOfIndicesWithZeroUptime, |
| 476 | + numberOfBackingIndices, |
| 477 | + indicesWithZeroUptime |
| 478 | + ); |
| 479 | + final List<Index> backingIndices = new ArrayList<>(); |
| 480 | + for (int i = 0; i < numberOfBackingIndices; i++) { |
| 481 | + final IndexMetadata indexMetadata = createIndexMetadata( |
| 482 | + DataStream.getDefaultBackingIndexName(dataStreamName, i), |
| 483 | + numberOfShards, |
| 484 | + indicesWithZeroUptime.contains(i) |
| 485 | + ? indexWriteLoadWithMissingOrZeroUptime(numberOfShards) |
| 486 | + : randomIndexWriteLoad(numberOfShards), |
| 487 | + System.currentTimeMillis() - (maxIndexAge.millis() / 2) |
| 488 | + ); |
| 489 | + backingIndices.add(indexMetadata.getIndex()); |
| 490 | + metadataBuilder.put(indexMetadata, false); |
| 491 | + } |
| 492 | + |
| 493 | + final IndexMetadata writeIndexMetadata = createIndexMetadata( |
| 494 | + DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices), |
| 495 | + numberOfShards, |
| 496 | + null, |
| 497 | + System.currentTimeMillis() |
| 498 | + ); |
| 499 | + backingIndices.add(writeIndexMetadata.getIndex()); |
| 500 | + metadataBuilder.put(writeIndexMetadata, false); |
| 501 | + |
| 502 | + final DataStream dataStream = createDataStream(dataStreamName, backingIndices); |
| 503 | + metadataBuilder.put(dataStream); |
| 504 | + |
| 505 | + final ProjectMetadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( |
| 506 | + dataStream.getName(), |
| 507 | + metadataBuilder |
| 508 | + ); |
| 509 | + final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex()); |
| 510 | + final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndex); |
| 511 | + |
| 512 | + final boolean someIndicesHadUptime = numberOfIndicesWithZeroUptime != numberOfBackingIndices; |
| 513 | + assertThat(forecastedWriteLoad.isPresent(), is(someIndicesHadUptime)); |
| 514 | + if (someIndicesHadUptime) { |
| 515 | + assertThat(forecastedWriteLoad.getAsDouble(), not(notANumber())); |
| 516 | + } |
| 517 | + } |
| 518 | + |
| 519 | + private IndexWriteLoad indexWriteLoadWithMissingOrZeroUptime(int numShards) { |
| 520 | + return randomBoolean() ? IndexWriteLoad.builder(numShards).build() : randomIndexWriteLoad(numShards, () -> 0); |
| 521 | + } |
| 522 | + |
448 | 523 | public enum ShardCountChange implements IntToIntFunction { |
449 | 524 | INCREASE(1, 15) { |
450 | 525 | @Override |
|
0 commit comments