Skip to content

Commit 73f5125

Browse files
authored
Run coordinating can_match in field-caps (#127734)
Currently, we don't run the coordinating can_match to skip unmatched shards in field-caps. Most of the time, this is fine, but the current field-caps fails when the target shards are unavailable, even if they don't match the index filter. This change integrates the coordinating can_match into field-caps to prevent failures in such cases.
1 parent f912323 commit 73f5125

File tree

8 files changed

+267
-6
lines changed

8 files changed

+267
-6
lines changed

docs/changelog/127734.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127734
2+
summary: Run coordinating `can_match` in field-caps
3+
area: ES|QL
4+
type: enhancement
5+
issues: []
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.fieldcaps;
11+
12+
import org.apache.lucene.document.LongPoint;
13+
import org.apache.lucene.index.DirectoryReader;
14+
import org.apache.lucene.index.PointValues;
15+
import org.elasticsearch.action.NoShardAvailableActionException;
16+
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.CollectionUtils;
21+
import org.elasticsearch.index.IndexSettings;
22+
import org.elasticsearch.index.engine.EngineConfig;
23+
import org.elasticsearch.index.engine.EngineFactory;
24+
import org.elasticsearch.index.engine.InternalEngine;
25+
import org.elasticsearch.index.engine.InternalEngineFactory;
26+
import org.elasticsearch.index.query.RangeQueryBuilder;
27+
import org.elasticsearch.index.shard.IndexLongFieldRange;
28+
import org.elasticsearch.index.shard.ShardLongFieldRange;
29+
import org.elasticsearch.plugins.EnginePlugin;
30+
import org.elasticsearch.plugins.Plugin;
31+
import org.elasticsearch.test.ESIntegTestCase;
32+
33+
import java.io.IOException;
34+
import java.io.UncheckedIOException;
35+
import java.util.Arrays;
36+
import java.util.Collection;
37+
import java.util.Optional;
38+
39+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
40+
import static org.hamcrest.Matchers.empty;
41+
import static org.hamcrest.Matchers.equalTo;
42+
import static org.hamcrest.Matchers.hasSize;
43+
import static org.hamcrest.Matchers.instanceOf;
44+
45+
public class FieldCapsWithFilterIT extends ESIntegTestCase {
46+
@Override
47+
protected boolean addMockInternalEngine() {
48+
return false;
49+
}
50+
51+
private static class EngineWithExposingTimestamp extends InternalEngine {
52+
EngineWithExposingTimestamp(EngineConfig engineConfig) {
53+
super(engineConfig);
54+
assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index";
55+
}
56+
57+
@Override
58+
public ShardLongFieldRange getRawFieldRange(String field) {
59+
try (Searcher searcher = acquireSearcher("test")) {
60+
final DirectoryReader directoryReader = searcher.getDirectoryReader();
61+
62+
final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field);
63+
final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field);
64+
if (minPackedValue == null || maxPackedValue == null) {
65+
assert minPackedValue == null && maxPackedValue == null
66+
: Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue);
67+
return ShardLongFieldRange.EMPTY;
68+
}
69+
70+
return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0));
71+
} catch (IOException e) {
72+
throw new UncheckedIOException(e);
73+
}
74+
}
75+
}
76+
77+
public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin {
78+
@Override
79+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
80+
if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
81+
return Optional.of(EngineWithExposingTimestamp::new);
82+
} else {
83+
return Optional.of(new InternalEngineFactory());
84+
}
85+
}
86+
}
87+
88+
@Override
89+
protected Collection<Class<? extends Plugin>> nodePlugins() {
90+
return CollectionUtils.appendToCopy(super.nodePlugins(), ExposingTimestampEnginePlugin.class);
91+
}
92+
93+
void createIndexAndIndexDocs(String index, Settings.Builder indexSettings, long timestamp, boolean exposeTimestamp) throws Exception {
94+
Client client = client();
95+
assertAcked(
96+
client.admin()
97+
.indices()
98+
.prepareCreate(index)
99+
.setSettings(indexSettings)
100+
.setMapping("@timestamp", "type=date", "position", "type=long")
101+
);
102+
int numDocs = between(100, 500);
103+
for (int i = 0; i < numDocs; i++) {
104+
client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get();
105+
}
106+
if (exposeTimestamp) {
107+
client.admin().indices().prepareClose(index).get();
108+
client.admin()
109+
.indices()
110+
.prepareUpdateSettings(index)
111+
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
112+
.get();
113+
client.admin().indices().prepareOpen(index).get();
114+
assertBusy(() -> {
115+
IndexLongFieldRange timestampRange = clusterService().state().metadata().getProject().index(index).getTimestampRange();
116+
assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges());
117+
});
118+
} else {
119+
client.admin().indices().prepareRefresh(index).get();
120+
}
121+
}
122+
123+
public void testSkipUnmatchedShards() throws Exception {
124+
long oldTimestamp = randomLongBetween(10_000_000, 20_000_000);
125+
long newTimestamp = randomLongBetween(30_000_000, 50_000_000);
126+
String redNode = internalCluster().startDataOnlyNode();
127+
String blueNode = internalCluster().startDataOnlyNode();
128+
createIndexAndIndexDocs(
129+
"index_old",
130+
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", redNode),
131+
oldTimestamp,
132+
true
133+
);
134+
internalCluster().stopNode(redNode);
135+
createIndexAndIndexDocs(
136+
"index_new",
137+
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", blueNode),
138+
newTimestamp,
139+
false
140+
);
141+
// fails without index filter
142+
{
143+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
144+
request.indices("index_*");
145+
request.fields("*");
146+
request.setMergeResults(false);
147+
if (randomBoolean()) {
148+
request.indexFilter(new RangeQueryBuilder("@timestamp").from(oldTimestamp));
149+
}
150+
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
151+
assertThat(response.getIndexResponses(), hasSize(1));
152+
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
153+
assertThat(response.getFailures(), hasSize(1));
154+
assertThat(response.getFailures().get(0).getIndices(), equalTo(new String[] { "index_old" }));
155+
assertThat(response.getFailures().get(0).getException(), instanceOf(NoShardAvailableActionException.class));
156+
}
157+
// skip unavailable shards with index filter
158+
{
159+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
160+
request.indices("index_*");
161+
request.fields("*");
162+
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp));
163+
request.setMergeResults(false);
164+
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
165+
assertThat(response.getIndexResponses(), hasSize(1));
166+
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
167+
assertThat(response.getFailures(), empty());
168+
}
169+
// skip both indices on the coordinator, one the data nodes
170+
{
171+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
172+
request.indices("index_*");
173+
request.fields("*");
174+
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp * 2L));
175+
request.setMergeResults(false);
176+
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
177+
assertThat(response.getIndexResponses(), empty());
178+
assertThat(response.getFailures(), empty());
179+
}
180+
}
181+
}

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,10 @@ static void unblockOnRewrite() {
906906

907907
@Override
908908
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
909+
// skip rewriting on the coordinator
910+
if (queryRewriteContext.convertToCoordinatorRewriteContext() != null) {
911+
return this;
912+
}
909913
try {
910914
blockingLatch.await();
911915
} catch (InterruptedException e) {

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ static TransportVersion def(int id) {
246246
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
247247
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
248248
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
249+
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
249250

250251
/*
251252
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.tasks.CancellableTask;
2323
import org.elasticsearch.tasks.Task;
2424
import org.elasticsearch.tasks.TaskId;
25+
import org.elasticsearch.transport.RemoteClusterAware;
2526
import org.elasticsearch.xcontent.ToXContentObject;
2627
import org.elasticsearch.xcontent.XContentBuilder;
2728

@@ -37,6 +38,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
3738
public static final String NAME = "field_caps_request";
3839
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
3940

41+
private String clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
42+
4043
private String[] indices = Strings.EMPTY_ARRAY;
4144
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
4245
private String[] fields = Strings.EMPTY_ARRAY;
@@ -67,6 +70,11 @@ public FieldCapabilitiesRequest(StreamInput in) throws IOException {
6770
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
6871
includeEmptyFields = in.readBoolean();
6972
}
73+
if (in.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
74+
clusterAlias = in.readOptionalString();
75+
} else {
76+
clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
77+
}
7078
}
7179

7280
public FieldCapabilitiesRequest() {}
@@ -90,6 +98,14 @@ public void setMergeResults(boolean mergeResults) {
9098
this.mergeResults = mergeResults;
9199
}
92100

101+
void clusterAlias(String clusterAlias) {
102+
this.clusterAlias = clusterAlias;
103+
}
104+
105+
String clusterAlias() {
106+
return clusterAlias;
107+
}
108+
93109
@Override
94110
public void writeTo(StreamOutput out) throws IOException {
95111
super.writeTo(out);
@@ -108,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
108124
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
109125
out.writeBoolean(includeEmptyFields);
110126
}
127+
if (out.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
128+
out.writeOptionalString(clusterAlias);
129+
}
111130
}
112131

113132
@Override

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,14 @@
2626
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2727
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2828
import org.elasticsearch.common.util.concurrent.RunOnce;
29+
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
2930
import org.elasticsearch.index.query.MatchAllQueryBuilder;
31+
import org.elasticsearch.index.query.QueryBuilder;
3032
import org.elasticsearch.index.shard.ShardId;
33+
import org.elasticsearch.search.SearchService;
34+
import org.elasticsearch.search.builder.SearchSourceBuilder;
35+
import org.elasticsearch.search.internal.AliasFilter;
36+
import org.elasticsearch.search.internal.ShardSearchRequest;
3137
import org.elasticsearch.tasks.Task;
3238
import org.elasticsearch.transport.TransportRequestOptions;
3339
import org.elasticsearch.transport.TransportService;
@@ -72,6 +78,7 @@ final class RequestDispatcher {
7278
ClusterService clusterService,
7379
TransportService transportService,
7480
ProjectResolver projectResolver,
81+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
7582
Task parentTask,
7683
FieldCapabilitiesRequest fieldCapsRequest,
7784
OriginalIndices originalIndices,
@@ -105,8 +112,14 @@ final class RequestDispatcher {
105112
onIndexFailure.accept(index, e);
106113
continue;
107114
}
108-
final IndexSelector indexResult = new IndexSelector(shardIts);
109-
if (indexResult.nodeToShards.isEmpty()) {
115+
final IndexSelector indexResult = new IndexSelector(
116+
fieldCapsRequest.clusterAlias(),
117+
shardIts,
118+
fieldCapsRequest.indexFilter(),
119+
nowInMillis,
120+
coordinatorRewriteContextProvider
121+
);
122+
if (indexResult.nodeToShards.isEmpty() && indexResult.unmatchedShardIds.isEmpty()) {
110123
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
111124
} else {
112125
this.indexSelectors.put(index, indexResult);
@@ -255,10 +268,34 @@ private static class IndexSelector {
255268
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
256269
private final Map<ShardId, Exception> failures = new HashMap<>();
257270

258-
IndexSelector(List<ShardIterator> shardIts) {
271+
IndexSelector(
272+
String clusterAlias,
273+
List<ShardIterator> shardIts,
274+
QueryBuilder indexFilter,
275+
long nowInMillis,
276+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
277+
) {
259278
for (ShardIterator shardIt : shardIts) {
260-
for (ShardRouting shard : shardIt) {
261-
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
279+
boolean canMatch = true;
280+
final ShardId shardId = shardIt.shardId();
281+
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
282+
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
283+
if (coordinatorRewriteContext != null) {
284+
var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias);
285+
shardRequest.source(new SearchSourceBuilder().query(indexFilter));
286+
try {
287+
canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext);
288+
} catch (Exception e) {
289+
// treat as if shard is still a potential match
290+
}
291+
}
292+
}
293+
if (canMatch) {
294+
for (ShardRouting shard : shardIt) {
295+
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
296+
}
297+
} else {
298+
unmatchedShardIds.add(shardId);
262299
}
263300
}
264301
}

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ private void doExecuteForked(
250250
clusterService,
251251
transportService,
252252
projectResolver,
253+
indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis),
253254
task,
254255
request,
255256
localIndices,
@@ -273,7 +274,7 @@ private void doExecuteForked(
273274
singleThreadedExecutor,
274275
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
275276
);
276-
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
277+
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
277278
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
278279
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
279280
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
@@ -383,11 +384,13 @@ private static void mergeIndexResponses(
383384
}
384385

385386
private static FieldCapabilitiesRequest prepareRemoteRequest(
387+
String clusterAlias,
386388
FieldCapabilitiesRequest request,
387389
OriginalIndices originalIndices,
388390
long nowInMillis
389391
) {
390392
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
393+
remoteRequest.clusterAlias(clusterAlias);
391394
remoteRequest.setMergeResults(false); // we need to merge on this node
392395
remoteRequest.indicesOptions(originalIndices.indicesOptions());
393396
remoteRequest.indices(originalIndices.indices());

0 commit comments

Comments
 (0)