Skip to content

Commit 71dfb06

Browse files
authored
Enable _tier based coordinator rewrites for all indices (not just mounted indices) (#115797)
As part of #114990 we enabled using the `_tier` field as part of the coordinator rewrite in order to skip shards that do not match a `_tier` filter, but only for fully/partially mounted indices. This PR enhances the previous work by allowing a coordinator rewrite to skip shards that will not match the `_tier` query for all indices (irrespective of their lifecycle state i.e. hot and warm indices can now skip shards based on the `_tier` query) Note however, that hot/warm indices will not automatically take advantage of the `can_match` coordinator rewrite (like read only indices do) but only the search requests that surpass the `pre_filter_shard_size` threshold will. Relates to [#114910](#114910)
1 parent 64e4c38 commit 71dfb06

File tree

6 files changed

+258
-39
lines changed

6 files changed

+258
-39
lines changed

docs/changelog/115797.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115797
2+
summary: Enable `_tier` based coordinator rewrites for all indices (not just mounted
3+
indices)
4+
area: Search
5+
type: enhancement
6+
issues: []

server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
3939

4040
public static final String TIER_FIELD_NAME = "_tier";
4141

42-
private static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
42+
static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
4343
@Override
4444
public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
4545
throw new UnsupportedOperationException("fetching field values is not supported on the coordinator node");
@@ -69,6 +69,7 @@ public Query existsQuery(SearchExecutionContext context) {
6969
}
7070
};
7171

72+
@Nullable
7273
private final DateFieldRangeInfo dateFieldRangeInfo;
7374
private final String tier;
7475

@@ -85,7 +86,7 @@ public CoordinatorRewriteContext(
8586
XContentParserConfiguration parserConfig,
8687
Client client,
8788
LongSupplier nowInMillis,
88-
DateFieldRangeInfo dateFieldRangeInfo,
89+
@Nullable DateFieldRangeInfo dateFieldRangeInfo,
8990
String tier
9091
) {
9192
super(
@@ -116,9 +117,9 @@ public CoordinatorRewriteContext(
116117
*/
117118
@Nullable
118119
public MappedFieldType getFieldType(String fieldName) {
119-
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
120+
if (dateFieldRangeInfo != null && DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
120121
return dateFieldRangeInfo.timestampFieldType();
121-
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
122+
} else if (dateFieldRangeInfo != null && IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
122123
return dateFieldRangeInfo.eventIngestedFieldType();
123124
} else if (TIER_FIELD_NAME.equals(fieldName)) {
124125
return TIER_FIELD_TYPE;
@@ -133,9 +134,9 @@ public MappedFieldType getFieldType(String fieldName) {
133134
*/
134135
@Nullable
135136
public IndexLongFieldRange getFieldRange(String fieldName) {
136-
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
137+
if (dateFieldRangeInfo != null && DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
137138
return dateFieldRangeInfo.timestampRange();
138-
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
139+
} else if (dateFieldRangeInfo != null && IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
139140
return dateFieldRangeInfo.eventIngestedRange();
140141
} else {
141142
return null;

server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,35 +52,37 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
5252
return null;
5353
}
5454
DateFieldRangeInfo dateFieldRangeInfo = mappingSupplier.apply(index);
55-
// we've now added a coordinator rewrite based on the _tier field so the requirement
56-
// for the timestamps fields to be present is artificial (we could do a coordinator
57-
// rewrite only based on the _tier field) and we might decide to remove this artificial
58-
// limitation to enable coordinator rewrites based on _tier for hot and warm indices
59-
// (currently the _tier coordinator rewrite is only available for mounted and partially mounted
60-
// indices)
61-
if (dateFieldRangeInfo == null) {
62-
return null;
63-
}
64-
DateFieldMapper.DateFieldType timestampFieldType = dateFieldRangeInfo.timestampFieldType();
6555
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
6656
IndexLongFieldRange eventIngestedRange = indexMetadata.getEventIngestedRange();
57+
DateFieldMapper.DateFieldType timestampFieldType = null;
58+
if (dateFieldRangeInfo != null) {
59+
timestampFieldType = dateFieldRangeInfo.timestampFieldType();
6760

68-
if (timestampRange.containsAllShardRanges() == false) {
69-
// if @timestamp range is not present or not ready in cluster state, fallback to using time series range (if present)
70-
timestampRange = indexMetadata.getTimeSeriesTimestampRange(timestampFieldType);
71-
// if timestampRange in the time series is null AND the eventIngestedRange is not ready for use, return null (no coord rewrite)
72-
if (timestampRange == null && eventIngestedRange.containsAllShardRanges() == false) {
73-
return null;
61+
if (timestampRange.containsAllShardRanges() == false) {
62+
// if @timestamp range is not present or not ready in cluster state, fallback to using time series range (if present)
63+
timestampRange = indexMetadata.getTimeSeriesTimestampRange(timestampFieldType);
64+
// if timestampRange in the time series is null AND the eventIngestedRange is not ready for use, return null (no coord
65+
// rewrite)
66+
if (timestampRange == null && eventIngestedRange.containsAllShardRanges() == false) {
67+
return null;
68+
}
7469
}
7570
}
7671

77-
// the DateFieldRangeInfo from the mappingSupplier only has field types, but not ranges
78-
// so create a new object with ranges pulled from cluster state
7972
return new CoordinatorRewriteContext(
8073
parserConfig,
8174
client,
8275
nowInMillis,
83-
new DateFieldRangeInfo(timestampFieldType, timestampRange, dateFieldRangeInfo.eventIngestedFieldType(), eventIngestedRange),
76+
dateFieldRangeInfo == null
77+
? null
78+
// the DateFieldRangeInfo from the mappingSupplier only has field types, but not ranges
79+
// so create a new object with ranges pulled from cluster state
80+
: new DateFieldRangeInfo(
81+
timestampFieldType,
82+
timestampRange,
83+
dateFieldRangeInfo.eventIngestedFieldType(),
84+
eventIngestedRange
85+
),
8486
indexMetadata.getTierPreference().isEmpty() == false ? indexMetadata.getTierPreference().getFirst() : ""
8587
);
8688
}

server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
2525
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2626
import org.elasticsearch.cluster.routing.GroupShardsIterator;
27+
import org.elasticsearch.cluster.routing.allocation.DataTier;
2728
import org.elasticsearch.common.Strings;
2829
import org.elasticsearch.common.UUIDs;
2930
import org.elasticsearch.common.settings.Settings;
3031
import org.elasticsearch.index.Index;
3132
import org.elasticsearch.index.IndexVersion;
3233
import org.elasticsearch.index.mapper.DateFieldMapper;
3334
import org.elasticsearch.index.query.BoolQueryBuilder;
35+
import org.elasticsearch.index.query.CoordinatorRewriteContext;
3436
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
3537
import org.elasticsearch.index.query.QueryBuilder;
38+
import org.elasticsearch.index.query.QueryBuilders;
3639
import org.elasticsearch.index.query.RangeQueryBuilder;
3740
import org.elasticsearch.index.query.TermQueryBuilder;
3841
import org.elasticsearch.index.shard.IndexLongFieldRange;
@@ -476,6 +479,98 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedUsingEventIngested
476479
doCanMatchFilteringOnCoordinatorThatCanBeSkipped(IndexMetadata.EVENT_INGESTED_FIELD_NAME);
477480
}
478481

482+
public void testCanMatchFilteringOnCoordinatorSkipsBasedOnTier() throws Exception {
483+
// we'll test that we're executing _tier coordinator rewrite for indices (data stream backing or regular) without any @timestamp
484+
// or event.ingested fields
485+
// for both data stream backing and regular indices we'll have one index in hot and one in warm. the warm indices will be skipped as
486+
// our queries will filter based on _tier: hot
487+
488+
Map<Index, Settings.Builder> indexNameToSettings = new HashMap<>();
489+
ClusterState state = ClusterState.EMPTY_STATE;
490+
491+
String dataStreamName = randomAlphaOfLengthBetween(10, 20);
492+
Index warmDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1), UUIDs.base64UUID());
493+
indexNameToSettings.put(
494+
warmDataStreamIndex,
495+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmDataStreamIndex.getUUID())
496+
.put(DataTier.TIER_PREFERENCE, "data_warm,data_hot")
497+
);
498+
Index hotDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), UUIDs.base64UUID());
499+
indexNameToSettings.put(
500+
hotDataStreamIndex,
501+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotDataStreamIndex.getUUID())
502+
.put(DataTier.TIER_PREFERENCE, "data_hot")
503+
);
504+
DataStream dataStream = DataStreamTestHelper.newInstance(dataStreamName, List.of(warmDataStreamIndex, hotDataStreamIndex));
505+
506+
Index warmRegularIndex = new Index("warm-index", UUIDs.base64UUID());
507+
indexNameToSettings.put(
508+
warmRegularIndex,
509+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmRegularIndex.getUUID())
510+
.put(DataTier.TIER_PREFERENCE, "data_warm,data_hot")
511+
);
512+
Index hotRegularIndex = new Index("hot-index", UUIDs.base64UUID());
513+
indexNameToSettings.put(
514+
hotRegularIndex,
515+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotRegularIndex.getUUID())
516+
.put(DataTier.TIER_PREFERENCE, "data_hot")
517+
);
518+
519+
List<Index> allIndices = new ArrayList<>(4);
520+
allIndices.addAll(dataStream.getIndices());
521+
allIndices.add(warmRegularIndex);
522+
allIndices.add(hotRegularIndex);
523+
524+
List<Index> hotIndices = List.of(hotRegularIndex, hotDataStreamIndex);
525+
List<Index> warmIndices = List.of(warmRegularIndex, warmDataStreamIndex);
526+
527+
for (Index index : allIndices) {
528+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName())
529+
.settings(indexNameToSettings.get(index))
530+
.numberOfShards(1)
531+
.numberOfReplicas(0);
532+
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadataBuilder);
533+
state = ClusterState.builder(state).metadata(metadataBuilder).build();
534+
}
535+
536+
ClusterState finalState = state;
537+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider = new CoordinatorRewriteContextProvider(
538+
parserConfig(),
539+
mock(Client.class),
540+
System::currentTimeMillis,
541+
() -> finalState,
542+
(index) -> null
543+
);
544+
545+
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
546+
.filter(QueryBuilders.termQuery(CoordinatorRewriteContext.TIER_FIELD_NAME, "data_hot"));
547+
548+
assignShardsAndExecuteCanMatchPhase(
549+
List.of(dataStream),
550+
List.of(hotRegularIndex, warmRegularIndex),
551+
coordinatorRewriteContextProvider,
552+
boolQueryBuilder,
553+
List.of(),
554+
null,
555+
(updatedSearchShardIterators, requests) -> {
556+
var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList();
557+
var nonSkippedShards = updatedSearchShardIterators.stream()
558+
.filter(searchShardIterator -> searchShardIterator.skip() == false)
559+
.toList();
560+
561+
boolean allSkippedShardAreFromWarmIndices = skippedShards.stream()
562+
.allMatch(shardIterator -> warmIndices.contains(shardIterator.shardId().getIndex()));
563+
assertThat(allSkippedShardAreFromWarmIndices, equalTo(true));
564+
boolean allNonSkippedShardAreHotIndices = nonSkippedShards.stream()
565+
.allMatch(shardIterator -> hotIndices.contains(shardIterator.shardId().getIndex()));
566+
assertThat(allNonSkippedShardAreHotIndices, equalTo(true));
567+
boolean allRequestMadeToHotIndices = requests.stream()
568+
.allMatch(request -> hotIndices.contains(request.shardId().getIndex()));
569+
assertThat(allRequestMadeToHotIndices, equalTo(true));
570+
}
571+
);
572+
}
573+
479574
public void doCanMatchFilteringOnCoordinatorThatCanBeSkipped(String timestampField) throws Exception {
480575
Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID());
481576
Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID());

server/src/test/java/org/elasticsearch/index/query/QueryRewriteContextTests.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.Collections;
2323

24+
import static org.elasticsearch.index.query.CoordinatorRewriteContext.TIER_FIELD_TYPE;
2425
import static org.hamcrest.Matchers.is;
2526
import static org.hamcrest.Matchers.nullValue;
2627

@@ -86,13 +87,6 @@ public void testGetTierPreference() {
8687

8788
{
8889
// coordinator rewrite context
89-
IndexMetadata metadata = newIndexMeta(
90-
"index",
91-
Settings.builder()
92-
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
93-
.put(DataTier.TIER_PREFERENCE, "data_cold,data_warm,data_hot")
94-
.build()
95-
);
9690
CoordinatorRewriteContext coordinatorRewriteContext = new CoordinatorRewriteContext(
9791
parserConfig(),
9892
null,
@@ -103,15 +97,9 @@ public void testGetTierPreference() {
10397

10498
assertThat(coordinatorRewriteContext.getTierPreference(), is("data_frozen"));
10599
}
100+
106101
{
107102
// coordinator rewrite context empty tier
108-
IndexMetadata metadata = newIndexMeta(
109-
"index",
110-
Settings.builder()
111-
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
112-
.put(DataTier.TIER_PREFERENCE, "data_cold,data_warm,data_hot")
113-
.build()
114-
);
115103
CoordinatorRewriteContext coordinatorRewriteContext = new CoordinatorRewriteContext(
116104
parserConfig(),
117105
null,
@@ -122,6 +110,25 @@ public void testGetTierPreference() {
122110

123111
assertThat(coordinatorRewriteContext.getTierPreference(), is(nullValue()));
124112
}
113+
114+
{
115+
// null date field range info
116+
CoordinatorRewriteContext coordinatorRewriteContext = new CoordinatorRewriteContext(
117+
parserConfig(),
118+
null,
119+
System::currentTimeMillis,
120+
null,
121+
"data_frozen"
122+
);
123+
assertThat(coordinatorRewriteContext.getFieldRange(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
124+
assertThat(coordinatorRewriteContext.getFieldRange(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
125+
// tier field doesn't have a range
126+
assertThat(coordinatorRewriteContext.getFieldRange(CoordinatorRewriteContext.TIER_FIELD_NAME), is(nullValue()));
127+
assertThat(coordinatorRewriteContext.getFieldType(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
128+
assertThat(coordinatorRewriteContext.getFieldType(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
129+
// _tier field type should still work even without the data field info
130+
assertThat(coordinatorRewriteContext.getFieldType(CoordinatorRewriteContext.TIER_FIELD_NAME), is(TIER_FIELD_TYPE));
131+
}
125132
}
126133

127134
public static IndexMetadata newIndexMeta(String name, Settings indexSettings) {

0 commit comments

Comments
 (0)