|
24 | 24 | import org.elasticsearch.cluster.node.DiscoveryNode; |
25 | 25 | import org.elasticsearch.cluster.node.DiscoveryNodeUtils; |
26 | 26 | import org.elasticsearch.cluster.routing.GroupShardsIterator; |
| 27 | +import org.elasticsearch.cluster.routing.allocation.DataTier; |
27 | 28 | import org.elasticsearch.common.Strings; |
28 | 29 | import org.elasticsearch.common.UUIDs; |
29 | 30 | import org.elasticsearch.common.settings.Settings; |
30 | 31 | import org.elasticsearch.index.Index; |
31 | 32 | import org.elasticsearch.index.IndexVersion; |
32 | 33 | import org.elasticsearch.index.mapper.DateFieldMapper; |
33 | 34 | import org.elasticsearch.index.query.BoolQueryBuilder; |
| 35 | +import org.elasticsearch.index.query.CoordinatorRewriteContext; |
34 | 36 | import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; |
35 | 37 | import org.elasticsearch.index.query.QueryBuilder; |
| 38 | +import org.elasticsearch.index.query.QueryBuilders; |
36 | 39 | import org.elasticsearch.index.query.RangeQueryBuilder; |
37 | 40 | import org.elasticsearch.index.query.TermQueryBuilder; |
38 | 41 | import org.elasticsearch.index.shard.IndexLongFieldRange; |
@@ -476,6 +479,98 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedUsingEventIngested |
476 | 479 | doCanMatchFilteringOnCoordinatorThatCanBeSkipped(IndexMetadata.EVENT_INGESTED_FIELD_NAME); |
477 | 480 | } |
478 | 481 |
|
| 482 | + public void testCanMatchFilteringOnCoordinatorSkipsBasedOnTier() throws Exception { |
| 483 | + // we'll test that we're executing _tier coordinator rewrite for indices (data stream backing or regular) without any @timestamp |
| 484 | + // or event.ingested fields |
| 485 | + // for both data stream backing and regular indices we'll have one index in hot and one in warm. the warm indices will be skipped as |
| 486 | + // our queries will filter based on _tier: hot |
| 487 | + |
| 488 | + Map<Index, Settings.Builder> indexNameToSettings = new HashMap<>(); |
| 489 | + ClusterState state = ClusterState.EMPTY_STATE; |
| 490 | + |
| 491 | + String dataStreamName = randomAlphaOfLengthBetween(10, 20); |
| 492 | + Index warmDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1), UUIDs.base64UUID()); |
| 493 | + indexNameToSettings.put( |
| 494 | + warmDataStreamIndex, |
| 495 | + settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmDataStreamIndex.getUUID()) |
| 496 | + .put(DataTier.TIER_PREFERENCE, "data_warm,data_hot") |
| 497 | + ); |
| 498 | + Index hotDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), UUIDs.base64UUID()); |
| 499 | + indexNameToSettings.put( |
| 500 | + hotDataStreamIndex, |
| 501 | + settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotDataStreamIndex.getUUID()) |
| 502 | + .put(DataTier.TIER_PREFERENCE, "data_hot") |
| 503 | + ); |
| 504 | + DataStream dataStream = DataStreamTestHelper.newInstance(dataStreamName, List.of(warmDataStreamIndex, hotDataStreamIndex)); |
| 505 | + |
| 506 | + Index warmRegularIndex = new Index("warm-index", UUIDs.base64UUID()); |
| 507 | + indexNameToSettings.put( |
| 508 | + warmRegularIndex, |
| 509 | + settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmRegularIndex.getUUID()) |
| 510 | + .put(DataTier.TIER_PREFERENCE, "data_warm,data_hot") |
| 511 | + ); |
| 512 | + Index hotRegularIndex = new Index("hot-index", UUIDs.base64UUID()); |
| 513 | + indexNameToSettings.put( |
| 514 | + hotRegularIndex, |
| 515 | + settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotRegularIndex.getUUID()) |
| 516 | + .put(DataTier.TIER_PREFERENCE, "data_hot") |
| 517 | + ); |
| 518 | + |
| 519 | + List<Index> allIndices = new ArrayList<>(4); |
| 520 | + allIndices.addAll(dataStream.getIndices()); |
| 521 | + allIndices.add(warmRegularIndex); |
| 522 | + allIndices.add(hotRegularIndex); |
| 523 | + |
| 524 | + List<Index> hotIndices = List.of(hotRegularIndex, hotDataStreamIndex); |
| 525 | + List<Index> warmIndices = List.of(warmRegularIndex, warmDataStreamIndex); |
| 526 | + |
| 527 | + for (Index index : allIndices) { |
| 528 | + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName()) |
| 529 | + .settings(indexNameToSettings.get(index)) |
| 530 | + .numberOfShards(1) |
| 531 | + .numberOfReplicas(0); |
| 532 | + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadataBuilder); |
| 533 | + state = ClusterState.builder(state).metadata(metadataBuilder).build(); |
| 534 | + } |
| 535 | + |
| 536 | + ClusterState finalState = state; |
| 537 | + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider = new CoordinatorRewriteContextProvider( |
| 538 | + parserConfig(), |
| 539 | + mock(Client.class), |
| 540 | + System::currentTimeMillis, |
| 541 | + () -> finalState, |
| 542 | + (index) -> null |
| 543 | + ); |
| 544 | + |
| 545 | + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery() |
| 546 | + .filter(QueryBuilders.termQuery(CoordinatorRewriteContext.TIER_FIELD_NAME, "data_hot")); |
| 547 | + |
| 548 | + assignShardsAndExecuteCanMatchPhase( |
| 549 | + List.of(dataStream), |
| 550 | + List.of(hotRegularIndex, warmRegularIndex), |
| 551 | + coordinatorRewriteContextProvider, |
| 552 | + boolQueryBuilder, |
| 553 | + List.of(), |
| 554 | + null, |
| 555 | + (updatedSearchShardIterators, requests) -> { |
| 556 | + var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList(); |
| 557 | + var nonSkippedShards = updatedSearchShardIterators.stream() |
| 558 | + .filter(searchShardIterator -> searchShardIterator.skip() == false) |
| 559 | + .toList(); |
| 560 | + |
| 561 | + boolean allSkippedShardAreFromWarmIndices = skippedShards.stream() |
| 562 | + .allMatch(shardIterator -> warmIndices.contains(shardIterator.shardId().getIndex())); |
| 563 | + assertThat(allSkippedShardAreFromWarmIndices, equalTo(true)); |
| 564 | + boolean allNonSkippedShardAreHotIndices = nonSkippedShards.stream() |
| 565 | + .allMatch(shardIterator -> hotIndices.contains(shardIterator.shardId().getIndex())); |
| 566 | + assertThat(allNonSkippedShardAreHotIndices, equalTo(true)); |
| 567 | + boolean allRequestMadeToHotIndices = requests.stream() |
| 568 | + .allMatch(request -> hotIndices.contains(request.shardId().getIndex())); |
| 569 | + assertThat(allRequestMadeToHotIndices, equalTo(true)); |
| 570 | + } |
| 571 | + ); |
| 572 | + } |
| 573 | + |
479 | 574 | public void doCanMatchFilteringOnCoordinatorThatCanBeSkipped(String timestampField) throws Exception { |
480 | 575 | Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); |
481 | 576 | Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); |
|
0 commit comments