Skip to content

Commit cd2433d

Browse files
authored
Validate missing shards after the coordinator rewrite (#116382)
The coordinate rewrite can skip searching shards when the query filters on `@timestamp`, event.ingested or the _tier field. We currently check for missing shards across all the indices that are the query is running against however, some shards/indices might not play a role in the query at all after the coordinator rewrite. This moves the check for missing shards **after** we've run the coordinator rewrite so we validate only the shards that will be searched by the query.
1 parent 9af7af1 commit cd2433d

File tree

4 files changed

+247
-40
lines changed

4 files changed

+247
-40
lines changed

docs/changelog/116382.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116382
2+
summary: Validate missing shards after the coordinator rewrite
3+
area: Search
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ private static boolean assertSearchCoordinationThread() {
131131
@Override
132132
public void run() {
133133
assert assertSearchCoordinationThread();
134-
checkNoMissingShards();
135134
runCoordinatorRewritePhase();
136135
}
137136

@@ -175,7 +174,10 @@ private void runCoordinatorRewritePhase() {
175174
if (matchedShardLevelRequests.isEmpty()) {
176175
finishPhase();
177176
} else {
178-
new Round(new GroupShardsIterator<>(matchedShardLevelRequests)).run();
177+
GroupShardsIterator<SearchShardIterator> matchingShards = new GroupShardsIterator<>(matchedShardLevelRequests);
178+
// verify missing shards only for the shards that we hit for the query
179+
checkNoMissingShards(matchingShards);
180+
new Round(matchingShards).run();
179181
}
180182
}
181183

@@ -185,9 +187,9 @@ private void consumeResult(boolean canMatch, ShardSearchRequest request) {
185187
results.consumeResult(result, () -> {});
186188
}
187189

188-
private void checkNoMissingShards() {
190+
private void checkNoMissingShards(GroupShardsIterator<SearchShardIterator> shards) {
189191
assert assertSearchCoordinationThread();
190-
doCheckNoMissingShards(getName(), request, shardsIts);
192+
doCheckNoMissingShards(getName(), request, shards);
191193
}
192194

193195
private Map<SendingTarget, List<SearchShardIterator>> groupByNode(GroupShardsIterator<SearchShardIterator> shards) {

server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 230 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.Strings;
2929
import org.elasticsearch.common.UUIDs;
3030
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.core.Tuple;
3132
import org.elasticsearch.index.Index;
3233
import org.elasticsearch.index.IndexVersion;
3334
import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -68,6 +69,7 @@
6869
import java.util.Set;
6970
import java.util.concurrent.ConcurrentHashMap;
7071
import java.util.concurrent.CountDownLatch;
72+
import java.util.concurrent.TimeUnit;
7173
import java.util.concurrent.atomic.AtomicInteger;
7274
import java.util.concurrent.atomic.AtomicReference;
7375
import java.util.function.BiConsumer;
@@ -77,6 +79,7 @@
7779
import static org.elasticsearch.core.Types.forciblyCast;
7880
import static org.hamcrest.Matchers.equalTo;
7981
import static org.hamcrest.Matchers.greaterThan;
82+
import static org.hamcrest.Matchers.instanceOf;
8083
import static org.hamcrest.Matchers.lessThanOrEqualTo;
8184
import static org.mockito.Mockito.mock;
8285

@@ -1087,6 +1090,137 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedTsdb() throws Exce
10871090
);
10881091
}
10891092

1093+
public void testCanMatchFilteringOnCoordinatorWithMissingShards() throws Exception {
1094+
// we'll test that we're executing _tier coordinator rewrite for indices (data stream backing or regular) without any @timestamp
1095+
// or event.ingested fields
1096+
// for both data stream backing and regular indices we'll have one index in hot and one UNASSIGNED (targeting warm though).
1097+
// the warm indices will be skipped as our queries will filter based on _tier: hot and the can match phase will not report error the
1098+
// missing index even if allow_partial_search_results is false (because the warm index would've not been part of the search anyway)
1099+
1100+
Map<Index, Settings.Builder> indexNameToSettings = new HashMap<>();
1101+
ClusterState state = ClusterState.EMPTY_STATE;
1102+
1103+
String dataStreamName = randomAlphaOfLengthBetween(10, 20);
1104+
Index warmDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1), UUIDs.base64UUID());
1105+
indexNameToSettings.put(
1106+
warmDataStreamIndex,
1107+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmDataStreamIndex.getUUID())
1108+
.put(DataTier.TIER_PREFERENCE, "data_warm,data_hot")
1109+
);
1110+
Index hotDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), UUIDs.base64UUID());
1111+
indexNameToSettings.put(
1112+
hotDataStreamIndex,
1113+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotDataStreamIndex.getUUID())
1114+
.put(DataTier.TIER_PREFERENCE, "data_hot")
1115+
);
1116+
DataStream dataStream = DataStreamTestHelper.newInstance(dataStreamName, List.of(warmDataStreamIndex, hotDataStreamIndex));
1117+
1118+
Index warmRegularIndex = new Index("warm-index", UUIDs.base64UUID());
1119+
indexNameToSettings.put(
1120+
warmRegularIndex,
1121+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmRegularIndex.getUUID())
1122+
.put(DataTier.TIER_PREFERENCE, "data_warm,data_hot")
1123+
);
1124+
Index hotRegularIndex = new Index("hot-index", UUIDs.base64UUID());
1125+
indexNameToSettings.put(
1126+
hotRegularIndex,
1127+
settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotRegularIndex.getUUID())
1128+
.put(DataTier.TIER_PREFERENCE, "data_hot")
1129+
);
1130+
1131+
List<Index> allIndices = new ArrayList<>(4);
1132+
allIndices.addAll(dataStream.getIndices());
1133+
allIndices.add(warmRegularIndex);
1134+
allIndices.add(hotRegularIndex);
1135+
1136+
List<Index> hotIndices = List.of(hotRegularIndex, hotDataStreamIndex);
1137+
List<Index> warmIndices = List.of(warmRegularIndex, warmDataStreamIndex);
1138+
1139+
for (Index index : allIndices) {
1140+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName())
1141+
.settings(indexNameToSettings.get(index))
1142+
.numberOfShards(1)
1143+
.numberOfReplicas(0);
1144+
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadataBuilder);
1145+
state = ClusterState.builder(state).metadata(metadataBuilder).build();
1146+
}
1147+
1148+
ClusterState finalState = state;
1149+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider = new CoordinatorRewriteContextProvider(
1150+
parserConfig(),
1151+
mock(Client.class),
1152+
System::currentTimeMillis,
1153+
() -> finalState,
1154+
(index) -> null
1155+
);
1156+
1157+
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
1158+
.filter(QueryBuilders.termQuery(CoordinatorRewriteContext.TIER_FIELD_NAME, "data_hot"));
1159+
1160+
{
1161+
// test that a search doesn't fail if the query filters out the unassigned shards
1162+
// via _tier (coordinator rewrite will eliminate the shards that don't match)
1163+
assignShardsAndExecuteCanMatchPhase(
1164+
List.of(dataStream),
1165+
List.of(hotRegularIndex, warmRegularIndex),
1166+
coordinatorRewriteContextProvider,
1167+
boolQueryBuilder,
1168+
List.of(),
1169+
null,
1170+
warmIndices,
1171+
false,
1172+
(updatedSearchShardIterators, requests) -> {
1173+
var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList();
1174+
var nonSkippedShards = updatedSearchShardIterators.stream()
1175+
.filter(searchShardIterator -> searchShardIterator.skip() == false)
1176+
.toList();
1177+
1178+
boolean allSkippedShardAreFromWarmIndices = skippedShards.stream()
1179+
.allMatch(shardIterator -> warmIndices.contains(shardIterator.shardId().getIndex()));
1180+
assertThat(allSkippedShardAreFromWarmIndices, equalTo(true));
1181+
boolean allNonSkippedShardAreHotIndices = nonSkippedShards.stream()
1182+
.allMatch(shardIterator -> hotIndices.contains(shardIterator.shardId().getIndex()));
1183+
assertThat(allNonSkippedShardAreHotIndices, equalTo(true));
1184+
boolean allRequestMadeToHotIndices = requests.stream()
1185+
.allMatch(request -> hotIndices.contains(request.shardId().getIndex()));
1186+
assertThat(allRequestMadeToHotIndices, equalTo(true));
1187+
}
1188+
);
1189+
}
1190+
1191+
{
1192+
// test that a search does fail if the query does NOT filter ALL the
1193+
// unassigned shards
1194+
CountDownLatch latch = new CountDownLatch(1);
1195+
Tuple<CanMatchPreFilterSearchPhase, List<ShardSearchRequest>> canMatchPhaseAndRequests = getCanMatchPhaseAndRequests(
1196+
List.of(dataStream),
1197+
List.of(hotRegularIndex, warmRegularIndex),
1198+
coordinatorRewriteContextProvider,
1199+
boolQueryBuilder,
1200+
List.of(),
1201+
null,
1202+
List.of(hotRegularIndex, warmRegularIndex, warmDataStreamIndex),
1203+
false,
1204+
new ActionListener<>() {
1205+
@Override
1206+
public void onResponse(GroupShardsIterator<SearchShardIterator> searchShardIterators) {
1207+
fail(null, "unexpected success with result [%s] while expecting to handle failure with [%s]", searchShardIterators);
1208+
latch.countDown();
1209+
}
1210+
1211+
@Override
1212+
public void onFailure(Exception e) {
1213+
assertThat(e, instanceOf(SearchPhaseExecutionException.class));
1214+
latch.countDown();
1215+
}
1216+
}
1217+
);
1218+
1219+
canMatchPhaseAndRequests.v1().start();
1220+
latch.await(10, TimeUnit.SECONDS);
1221+
}
1222+
}
1223+
10901224
private void assertAllShardsAreQueried(List<SearchShardIterator> updatedSearchShardIterators, List<ShardSearchRequest> requests) {
10911225
int skippedShards = (int) updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).count();
10921226

@@ -1111,6 +1245,69 @@ private void assignShardsAndExecuteCanMatchPhase(
11111245
SuggestBuilder suggest,
11121246
BiConsumer<List<SearchShardIterator>, List<ShardSearchRequest>> canMatchResultsConsumer
11131247
) throws Exception {
1248+
assignShardsAndExecuteCanMatchPhase(
1249+
dataStreams,
1250+
regularIndices,
1251+
contextProvider,
1252+
query,
1253+
aggregations,
1254+
suggest,
1255+
List.of(),
1256+
true,
1257+
canMatchResultsConsumer
1258+
);
1259+
}
1260+
1261+
private void assignShardsAndExecuteCanMatchPhase(
1262+
List<DataStream> dataStreams,
1263+
List<Index> regularIndices,
1264+
CoordinatorRewriteContextProvider contextProvider,
1265+
QueryBuilder query,
1266+
List<AggregationBuilder> aggregations,
1267+
SuggestBuilder suggest,
1268+
List<Index> unassignedIndices,
1269+
boolean allowPartialResults,
1270+
BiConsumer<List<SearchShardIterator>, List<ShardSearchRequest>> canMatchResultsConsumer
1271+
) throws Exception {
1272+
AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
1273+
CountDownLatch latch = new CountDownLatch(1);
1274+
Tuple<CanMatchPreFilterSearchPhase, List<ShardSearchRequest>> canMatchAndShardRequests = getCanMatchPhaseAndRequests(
1275+
dataStreams,
1276+
regularIndices,
1277+
contextProvider,
1278+
query,
1279+
aggregations,
1280+
suggest,
1281+
unassignedIndices,
1282+
allowPartialResults,
1283+
ActionTestUtils.assertNoFailureListener(iter -> {
1284+
result.set(iter);
1285+
latch.countDown();
1286+
})
1287+
);
1288+
1289+
canMatchAndShardRequests.v1().start();
1290+
latch.await();
1291+
1292+
List<SearchShardIterator> updatedSearchShardIterators = new ArrayList<>();
1293+
for (SearchShardIterator updatedSearchShardIterator : result.get()) {
1294+
updatedSearchShardIterators.add(updatedSearchShardIterator);
1295+
}
1296+
1297+
canMatchResultsConsumer.accept(updatedSearchShardIterators, canMatchAndShardRequests.v2());
1298+
}
1299+
1300+
private Tuple<CanMatchPreFilterSearchPhase, List<ShardSearchRequest>> getCanMatchPhaseAndRequests(
1301+
List<DataStream> dataStreams,
1302+
List<Index> regularIndices,
1303+
CoordinatorRewriteContextProvider contextProvider,
1304+
QueryBuilder query,
1305+
List<AggregationBuilder> aggregations,
1306+
SuggestBuilder suggest,
1307+
List<Index> unassignedIndices,
1308+
boolean allowPartialResults,
1309+
ActionListener<GroupShardsIterator<SearchShardIterator>> canMatchActionListener
1310+
) {
11141311
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
11151312
DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node_1");
11161313
DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node_2");
@@ -1136,23 +1333,31 @@ private void assignShardsAndExecuteCanMatchPhase(
11361333
// and none is assigned, the phase is considered as failed meaning that the next phase won't be executed
11371334
boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false;
11381335
int numShards = randomIntBetween(1, 6);
1139-
originalShardIters.addAll(
1140-
getShardsIter(dataStreamIndex, originalIndices, numShards, false, withAssignedPrimaries ? primaryNode : null, null)
1141-
);
1142-
atLeastOnePrimaryAssigned |= withAssignedPrimaries;
1336+
if (unassignedIndices.contains(dataStreamIndex)) {
1337+
originalShardIters.addAll(getShardsIter(dataStreamIndex, originalIndices, numShards, false, null, null));
1338+
} else {
1339+
originalShardIters.addAll(
1340+
getShardsIter(dataStreamIndex, originalIndices, numShards, false, withAssignedPrimaries ? primaryNode : null, null)
1341+
);
1342+
atLeastOnePrimaryAssigned |= withAssignedPrimaries;
1343+
}
11431344
}
11441345
}
11451346

11461347
for (Index regularIndex : regularIndices) {
1147-
originalShardIters.addAll(
1148-
getShardsIter(regularIndex, originalIndices, randomIntBetween(1, 6), randomBoolean(), primaryNode, replicaNode)
1149-
);
1348+
if (unassignedIndices.contains(regularIndex)) {
1349+
originalShardIters.addAll(getShardsIter(regularIndex, originalIndices, randomIntBetween(1, 6), false, null, null));
1350+
} else {
1351+
originalShardIters.addAll(
1352+
getShardsIter(regularIndex, originalIndices, randomIntBetween(1, 6), randomBoolean(), primaryNode, replicaNode)
1353+
);
1354+
}
11501355
}
11511356
GroupShardsIterator<SearchShardIterator> shardsIter = GroupShardsIterator.sortAndCreate(originalShardIters);
11521357

11531358
final SearchRequest searchRequest = new SearchRequest();
11541359
searchRequest.indices(indices);
1155-
searchRequest.allowPartialSearchResults(true);
1360+
searchRequest.allowPartialSearchResults(allowPartialResults);
11561361

11571362
final AliasFilter aliasFilter;
11581363
if (aggregations.isEmpty() == false || randomBoolean()) {
@@ -1212,35 +1417,24 @@ public void sendCanMatch(
12121417
);
12131418

12141419
AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
1215-
CountDownLatch latch = new CountDownLatch(1);
1216-
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(
1217-
logger,
1218-
searchTransportService,
1219-
(clusterAlias, node) -> lookup.get(node),
1220-
aliasFilters,
1221-
Collections.emptyMap(),
1222-
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
1223-
searchRequest,
1224-
shardsIter,
1225-
timeProvider,
1226-
null,
1227-
true,
1228-
contextProvider,
1229-
ActionTestUtils.assertNoFailureListener(iter -> {
1230-
result.set(iter);
1231-
latch.countDown();
1232-
})
1420+
return new Tuple<>(
1421+
new CanMatchPreFilterSearchPhase(
1422+
logger,
1423+
searchTransportService,
1424+
(clusterAlias, node) -> lookup.get(node),
1425+
aliasFilters,
1426+
Collections.emptyMap(),
1427+
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
1428+
searchRequest,
1429+
shardsIter,
1430+
timeProvider,
1431+
null,
1432+
true,
1433+
contextProvider,
1434+
canMatchActionListener
1435+
),
1436+
requests
12331437
);
1234-
1235-
canMatchPhase.start();
1236-
latch.await();
1237-
1238-
List<SearchShardIterator> updatedSearchShardIterators = new ArrayList<>();
1239-
for (SearchShardIterator updatedSearchShardIterator : result.get()) {
1240-
updatedSearchShardIterators.add(updatedSearchShardIterator);
1241-
}
1242-
1243-
canMatchResultsConsumer.accept(updatedSearchShardIterators, requests);
12441438
}
12451439

12461440
static class StaticCoordinatorRewriteContextProviderBuilder {

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,9 @@ public void testCanMatchSkipsPartiallyMountedIndicesWhenFrozenNodesUnavailable()
10291029
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("_tier", "data_content");
10301030
List<String> indicesToSearch = List.of(regularIndex, partiallyMountedIndex);
10311031
SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0]))
1032+
// we randomise the partial search results because if shards that do NOT match the query are unavailable
1033+
// the search is not partial
1034+
.allowPartialSearchResults(randomBoolean())
10321035
.source(new SearchSourceBuilder().query(termQueryBuilder));
10331036

10341037
assertResponse(client().search(request), searchResponse -> {
@@ -1045,6 +1048,7 @@ public void testCanMatchSkipsPartiallyMountedIndicesWhenFrozenNodesUnavailable()
10451048
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery("_tier", "data_hot", "data_content");
10461049
List<String> indicesToSearch = List.of(regularIndex, partiallyMountedIndex);
10471050
SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0]))
1051+
.allowPartialSearchResults(randomBoolean())
10481052
.source(new SearchSourceBuilder().query(termsQueryBuilder));
10491053

10501054
assertResponse(client().search(request), searchResponse -> {
@@ -1061,6 +1065,7 @@ public void testCanMatchSkipsPartiallyMountedIndicesWhenFrozenNodesUnavailable()
10611065
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_tier", "data_frozen"));
10621066
List<String> indicesToSearch = List.of(regularIndex, partiallyMountedIndex);
10631067
SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0]))
1068+
.allowPartialSearchResults(randomBoolean())
10641069
.source(new SearchSourceBuilder().query(boolQueryBuilder));
10651070

10661071
assertResponse(client().search(request), searchResponse -> {
@@ -1078,6 +1083,7 @@ public void testCanMatchSkipsPartiallyMountedIndicesWhenFrozenNodesUnavailable()
10781083
.mustNot(randomFrom(QueryBuilders.wildcardQuery("_tier", "dat*ozen"), QueryBuilders.prefixQuery("_tier", "data_fro")));
10791084
List<String> indicesToSearch = List.of(regularIndex, partiallyMountedIndex);
10801085
SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0]))
1086+
.allowPartialSearchResults(randomBoolean())
10811087
.source(new SearchSourceBuilder().query(boolQueryBuilder));
10821088

10831089
assertResponse(client().search(request), searchResponse -> {

0 commit comments

Comments
 (0)