|  | 
| 30 | 30 | import org.junit.Before; | 
| 31 | 31 | 
 | 
| 32 | 32 | import java.util.ArrayList; | 
|  | 33 | +import java.util.HashSet; | 
| 33 | 34 | import java.util.List; | 
| 34 | 35 | import java.util.Map; | 
| 35 | 36 | import java.util.Objects; | 
| 36 | 37 | import java.util.OptionalDouble; | 
|  | 38 | +import java.util.Set; | 
| 37 | 39 | import java.util.concurrent.TimeUnit; | 
| 38 | 40 | import java.util.concurrent.atomic.AtomicBoolean; | 
| 39 | 41 | import java.util.concurrent.atomic.AtomicInteger; | 
|  | 42 | +import java.util.function.LongSupplier; | 
| 40 | 43 | import java.util.stream.Collectors; | 
|  | 44 | +import java.util.stream.IntStream; | 
| 41 | 45 | 
 | 
| 42 | 46 | import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad; | 
| 43 | 47 | import static org.hamcrest.Matchers.closeTo; | 
|  | 
| 47 | 51 | import static org.hamcrest.Matchers.greaterThan; | 
| 48 | 52 | import static org.hamcrest.Matchers.is; | 
| 49 | 53 | import static org.hamcrest.Matchers.lessThan; | 
|  | 54 | +import static org.hamcrest.Matchers.not; | 
|  | 55 | +import static org.hamcrest.Matchers.notANumber; | 
| 50 | 56 | 
 | 
| 51 | 57 | public class LicensedWriteLoadForecasterTests extends ESTestCase { | 
| 52 | 58 |     ThreadPool threadPool; | 
| @@ -294,14 +300,18 @@ public void testWriteLoadForecast() { | 
| 294 | 300 |     } | 
| 295 | 301 | 
 | 
| 296 | 302 |     private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { | 
|  | 303 | +        return randomIndexWriteLoad(numberOfShards, () -> randomLongBetween(1, 10)); | 
|  | 304 | +    } | 
|  | 305 | + | 
|  | 306 | +    private IndexWriteLoad randomIndexWriteLoad(int numberOfShards, LongSupplier uptimeSupplier) { | 
| 297 | 307 |         IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); | 
| 298 | 308 |         for (int shardId = 0; shardId < numberOfShards; shardId++) { | 
| 299 | 309 |             builder.withShardWriteLoad( | 
| 300 | 310 |                 shardId, | 
| 301 | 311 |                 randomDoubleBetween(0, 64, true), | 
| 302 | 312 |                 randomDoubleBetween(0, 64, true), | 
| 303 | 313 |                 randomDoubleBetween(0, 64, true), | 
| 304 |  | -                randomLongBetween(1, 10) | 
|  | 314 | +                uptimeSupplier.getAsLong() | 
| 305 | 315 |             ); | 
| 306 | 316 |         } | 
| 307 | 317 |         return builder.build(); | 
| @@ -445,6 +455,71 @@ private void testShardChangeDoesNotChangeTotalForecastLoad(ShardCountChange shar | 
| 445 | 455 |         ); | 
| 446 | 456 |     } | 
| 447 | 457 | 
 | 
|  | 458 | +    public void testCanHandleIndicesWithMissingShardWriteLoadsOrZeroUptime() { | 
|  | 459 | +        final TimeValue maxIndexAge = TimeValue.timeValueDays(7); | 
|  | 460 | +        final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge); | 
|  | 461 | +        writeLoadForecaster.refreshLicense(); | 
|  | 462 | + | 
|  | 463 | +        final ProjectMetadata.Builder metadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); | 
|  | 464 | +        final String dataStreamName = "logs-es"; | 
|  | 465 | +        final int numberOfBackingIndices = 10; | 
|  | 466 | +        final int numberOfShards = randomIntBetween(1, 5); | 
|  | 467 | +        final int numberOfIndicesWithZeroUptime = randomBoolean() | 
|  | 468 | +            ? numberOfBackingIndices | 
|  | 469 | +            : randomIntBetween(1, numberOfBackingIndices - 1); | 
|  | 470 | +        final Set<Integer> indicesWithZeroUptime = new HashSet<>( | 
|  | 471 | +            randomSubsetOf(numberOfIndicesWithZeroUptime, IntStream.range(0, numberOfBackingIndices).boxed().collect(Collectors.toSet())) | 
|  | 472 | +        ); | 
|  | 473 | +        logger.info( | 
|  | 474 | +            "--> indices with zero uptime: {}/{} ({})", | 
|  | 475 | +            numberOfIndicesWithZeroUptime, | 
|  | 476 | +            numberOfBackingIndices, | 
|  | 477 | +            indicesWithZeroUptime | 
|  | 478 | +        ); | 
|  | 479 | +        final List<Index> backingIndices = new ArrayList<>(); | 
|  | 480 | +        for (int i = 0; i < numberOfBackingIndices; i++) { | 
|  | 481 | +            final IndexMetadata indexMetadata = createIndexMetadata( | 
|  | 482 | +                DataStream.getDefaultBackingIndexName(dataStreamName, i), | 
|  | 483 | +                numberOfShards, | 
|  | 484 | +                indicesWithZeroUptime.contains(i) | 
|  | 485 | +                    ? indexWriteLoadWithMissingOrZeroUptime(numberOfShards) | 
|  | 486 | +                    : randomIndexWriteLoad(numberOfShards), | 
|  | 487 | +                System.currentTimeMillis() - (maxIndexAge.millis() / 2) | 
|  | 488 | +            ); | 
|  | 489 | +            backingIndices.add(indexMetadata.getIndex()); | 
|  | 490 | +            metadataBuilder.put(indexMetadata, false); | 
|  | 491 | +        } | 
|  | 492 | + | 
|  | 493 | +        final IndexMetadata writeIndexMetadata = createIndexMetadata( | 
|  | 494 | +            DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices), | 
|  | 495 | +            numberOfShards, | 
|  | 496 | +            null, | 
|  | 497 | +            System.currentTimeMillis() | 
|  | 498 | +        ); | 
|  | 499 | +        backingIndices.add(writeIndexMetadata.getIndex()); | 
|  | 500 | +        metadataBuilder.put(writeIndexMetadata, false); | 
|  | 501 | + | 
|  | 502 | +        final DataStream dataStream = createDataStream(dataStreamName, backingIndices); | 
|  | 503 | +        metadataBuilder.put(dataStream); | 
|  | 504 | + | 
|  | 505 | +        final ProjectMetadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( | 
|  | 506 | +            dataStream.getName(), | 
|  | 507 | +            metadataBuilder | 
|  | 508 | +        ); | 
|  | 509 | +        final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex()); | 
|  | 510 | +        final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndex); | 
|  | 511 | + | 
|  | 512 | +        final boolean someIndicesHadUptime = numberOfIndicesWithZeroUptime != numberOfBackingIndices; | 
|  | 513 | +        assertThat(forecastedWriteLoad.isPresent(), is(someIndicesHadUptime)); | 
|  | 514 | +        if (someIndicesHadUptime) { | 
|  | 515 | +            assertThat(forecastedWriteLoad.getAsDouble(), not(notANumber())); | 
|  | 516 | +        } | 
|  | 517 | +    } | 
|  | 518 | + | 
|  | 519 | +    private IndexWriteLoad indexWriteLoadWithMissingOrZeroUptime(int numShards) { | 
|  | 520 | +        return randomBoolean() ? IndexWriteLoad.builder(numShards).build() : randomIndexWriteLoad(numShards, () -> 0); | 
|  | 521 | +    } | 
|  | 522 | + | 
| 448 | 523 |     public enum ShardCountChange implements IntToIntFunction { | 
| 449 | 524 |         INCREASE(1, 15) { | 
| 450 | 525 |             @Override | 
|  | 
0 commit comments