Skip to content

Commit 173b661

Browse files
authored
Run coordinating can_match in field-caps (#127734) (#128170)
Currently, we don't run the coordinating can_match to skip unmatched shards in field-caps. Most of the time, this is fine, but the current field-caps fails when the target shards are unavailable, even if they don't match the index filter. This change integrates the coordinating can_match into field-caps to prevent failures in such cases. Relates #127734
1 parent dd0b7cc commit 173b661

File tree

8 files changed

+267
-6
lines changed

8 files changed

+267
-6
lines changed

docs/changelog/127734.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127734
2+
summary: Run coordinating `can_match` in field-caps
3+
area: ES|QL
4+
type: enhancement
5+
issues: []
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.fieldcaps;
11+
12+
import org.apache.lucene.document.LongPoint;
13+
import org.apache.lucene.index.DirectoryReader;
14+
import org.apache.lucene.index.PointValues;
15+
import org.elasticsearch.action.NoShardAvailableActionException;
16+
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.CollectionUtils;
21+
import org.elasticsearch.index.IndexSettings;
22+
import org.elasticsearch.index.engine.EngineConfig;
23+
import org.elasticsearch.index.engine.EngineFactory;
24+
import org.elasticsearch.index.engine.InternalEngine;
25+
import org.elasticsearch.index.engine.InternalEngineFactory;
26+
import org.elasticsearch.index.query.RangeQueryBuilder;
27+
import org.elasticsearch.index.shard.IndexLongFieldRange;
28+
import org.elasticsearch.index.shard.ShardLongFieldRange;
29+
import org.elasticsearch.plugins.EnginePlugin;
30+
import org.elasticsearch.plugins.Plugin;
31+
import org.elasticsearch.test.ESIntegTestCase;
32+
33+
import java.io.IOException;
34+
import java.io.UncheckedIOException;
35+
import java.util.Arrays;
36+
import java.util.Collection;
37+
import java.util.Optional;
38+
39+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
40+
import static org.hamcrest.Matchers.empty;
41+
import static org.hamcrest.Matchers.equalTo;
42+
import static org.hamcrest.Matchers.hasSize;
43+
import static org.hamcrest.Matchers.instanceOf;
44+
45+
public class FieldCapsWithFilterIT extends ESIntegTestCase {
46+
@Override
47+
protected boolean addMockInternalEngine() {
48+
return false;
49+
}
50+
51+
private static class EngineWithExposingTimestamp extends InternalEngine {
52+
EngineWithExposingTimestamp(EngineConfig engineConfig) {
53+
super(engineConfig);
54+
assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index";
55+
}
56+
57+
@Override
58+
public ShardLongFieldRange getRawFieldRange(String field) {
59+
try (Searcher searcher = acquireSearcher("test")) {
60+
final DirectoryReader directoryReader = searcher.getDirectoryReader();
61+
62+
final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field);
63+
final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field);
64+
if (minPackedValue == null || maxPackedValue == null) {
65+
assert minPackedValue == null && maxPackedValue == null
66+
: Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue);
67+
return ShardLongFieldRange.EMPTY;
68+
}
69+
70+
return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0));
71+
} catch (IOException e) {
72+
throw new UncheckedIOException(e);
73+
}
74+
}
75+
}
76+
77+
public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin {
78+
@Override
79+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
80+
if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
81+
return Optional.of(EngineWithExposingTimestamp::new);
82+
} else {
83+
return Optional.of(new InternalEngineFactory());
84+
}
85+
}
86+
}
87+
88+
@Override
89+
protected Collection<Class<? extends Plugin>> nodePlugins() {
90+
return CollectionUtils.appendToCopy(super.nodePlugins(), ExposingTimestampEnginePlugin.class);
91+
}
92+
93+
void createIndexAndIndexDocs(String index, Settings.Builder indexSettings, long timestamp, boolean exposeTimestamp) throws Exception {
94+
Client client = client();
95+
assertAcked(
96+
client.admin()
97+
.indices()
98+
.prepareCreate(index)
99+
.setSettings(indexSettings)
100+
.setMapping("@timestamp", "type=date", "position", "type=long")
101+
);
102+
int numDocs = between(100, 500);
103+
for (int i = 0; i < numDocs; i++) {
104+
client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get();
105+
}
106+
if (exposeTimestamp) {
107+
client.admin().indices().prepareClose(index).get();
108+
client.admin()
109+
.indices()
110+
.prepareUpdateSettings(index)
111+
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
112+
.get();
113+
client.admin().indices().prepareOpen(index).get();
114+
assertBusy(() -> {
115+
IndexLongFieldRange timestampRange = clusterService().state().metadata().index(index).getTimestampRange();
116+
assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges());
117+
});
118+
} else {
119+
client.admin().indices().prepareRefresh(index).get();
120+
}
121+
}
122+
123+
public void testSkipUnmatchedShards() throws Exception {
124+
long oldTimestamp = randomLongBetween(10_000_000, 20_000_000);
125+
long newTimestamp = randomLongBetween(30_000_000, 50_000_000);
126+
String redNode = internalCluster().startDataOnlyNode();
127+
String blueNode = internalCluster().startDataOnlyNode();
128+
createIndexAndIndexDocs(
129+
"index_old",
130+
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", redNode),
131+
oldTimestamp,
132+
true
133+
);
134+
internalCluster().stopNode(redNode);
135+
createIndexAndIndexDocs(
136+
"index_new",
137+
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", blueNode),
138+
newTimestamp,
139+
false
140+
);
141+
// fails without index filter
142+
{
143+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
144+
request.indices("index_*");
145+
request.fields("*");
146+
request.setMergeResults(false);
147+
if (randomBoolean()) {
148+
request.indexFilter(new RangeQueryBuilder("@timestamp").from(oldTimestamp));
149+
}
150+
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
151+
assertThat(response.getIndexResponses(), hasSize(1));
152+
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
153+
assertThat(response.getFailures(), hasSize(1));
154+
assertThat(response.getFailures().get(0).getIndices(), equalTo(new String[] { "index_old" }));
155+
assertThat(response.getFailures().get(0).getException(), instanceOf(NoShardAvailableActionException.class));
156+
}
157+
// skip unavailable shards with index filter
158+
{
159+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
160+
request.indices("index_*");
161+
request.fields("*");
162+
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp));
163+
request.setMergeResults(false);
164+
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
165+
assertThat(response.getIndexResponses(), hasSize(1));
166+
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
167+
assertThat(response.getFailures(), empty());
168+
}
169+
// skip both indices on the coordinator, one the data nodes
170+
{
171+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
172+
request.indices("index_*");
173+
request.fields("*");
174+
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp * 2L));
175+
request.setMergeResults(false);
176+
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
177+
assertThat(response.getIndexResponses(), empty());
178+
assertThat(response.getFailures(), empty());
179+
}
180+
}
181+
}

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,10 @@ static void unblockOnRewrite() {
928928

929929
@Override
930930
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
931+
// skip rewriting on the coordinator
932+
if (queryRewriteContext.convertToCoordinatorRewriteContext() != null) {
933+
return this;
934+
}
931935
try {
932936
blockingLatch.await();
933937
} catch (InterruptedException e) {

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ static TransportVersion def(int id) {
220220
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29);
221221
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_8_19 = def(8_841_0_30);
222222
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED_8_19 = def(8_841_0_31);
223+
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(8_841_0_32);
223224

224225
/*
225226
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.tasks.CancellableTask;
2323
import org.elasticsearch.tasks.Task;
2424
import org.elasticsearch.tasks.TaskId;
25+
import org.elasticsearch.transport.RemoteClusterAware;
2526
import org.elasticsearch.xcontent.ToXContentObject;
2627
import org.elasticsearch.xcontent.XContentBuilder;
2728

@@ -37,6 +38,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
3738
public static final String NAME = "field_caps_request";
3839
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
3940

41+
private String clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
42+
4043
private String[] indices = Strings.EMPTY_ARRAY;
4144
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
4245
private String[] fields = Strings.EMPTY_ARRAY;
@@ -67,6 +70,11 @@ public FieldCapabilitiesRequest(StreamInput in) throws IOException {
6770
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
6871
includeEmptyFields = in.readBoolean();
6972
}
73+
if (in.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
74+
clusterAlias = in.readOptionalString();
75+
} else {
76+
clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
77+
}
7078
}
7179

7280
public FieldCapabilitiesRequest() {}
@@ -90,6 +98,14 @@ public void setMergeResults(boolean mergeResults) {
9098
this.mergeResults = mergeResults;
9199
}
92100

101+
void clusterAlias(String clusterAlias) {
102+
this.clusterAlias = clusterAlias;
103+
}
104+
105+
String clusterAlias() {
106+
return clusterAlias;
107+
}
108+
93109
@Override
94110
public void writeTo(StreamOutput out) throws IOException {
95111
super.writeTo(out);
@@ -108,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
108124
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
109125
out.writeBoolean(includeEmptyFields);
110126
}
127+
if (out.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
128+
out.writeOptionalString(clusterAlias);
129+
}
111130
}
112131

113132
@Override

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@
2424
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2525
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2626
import org.elasticsearch.common.util.concurrent.RunOnce;
27+
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
2728
import org.elasticsearch.index.query.MatchAllQueryBuilder;
29+
import org.elasticsearch.index.query.QueryBuilder;
2830
import org.elasticsearch.index.shard.ShardId;
31+
import org.elasticsearch.search.SearchService;
32+
import org.elasticsearch.search.builder.SearchSourceBuilder;
33+
import org.elasticsearch.search.internal.AliasFilter;
34+
import org.elasticsearch.search.internal.ShardSearchRequest;
2935
import org.elasticsearch.tasks.Task;
3036
import org.elasticsearch.transport.TransportRequestOptions;
3137
import org.elasticsearch.transport.TransportService;
@@ -69,6 +75,7 @@ final class RequestDispatcher {
6975
RequestDispatcher(
7076
ClusterService clusterService,
7177
TransportService transportService,
78+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
7279
Task parentTask,
7380
FieldCapabilitiesRequest fieldCapsRequest,
7481
OriginalIndices originalIndices,
@@ -99,8 +106,14 @@ final class RequestDispatcher {
99106
onIndexFailure.accept(index, e);
100107
continue;
101108
}
102-
final IndexSelector indexResult = new IndexSelector(shardIts);
103-
if (indexResult.nodeToShards.isEmpty()) {
109+
final IndexSelector indexResult = new IndexSelector(
110+
fieldCapsRequest.clusterAlias(),
111+
shardIts,
112+
fieldCapsRequest.indexFilter(),
113+
nowInMillis,
114+
coordinatorRewriteContextProvider
115+
);
116+
if (indexResult.nodeToShards.isEmpty() && indexResult.unmatchedShardIds.isEmpty()) {
104117
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
105118
} else {
106119
this.indexSelectors.put(index, indexResult);
@@ -249,10 +262,34 @@ private static class IndexSelector {
249262
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
250263
private final Map<ShardId, Exception> failures = new HashMap<>();
251264

252-
IndexSelector(List<ShardIterator> shardIts) {
265+
IndexSelector(
266+
String clusterAlias,
267+
List<ShardIterator> shardIts,
268+
QueryBuilder indexFilter,
269+
long nowInMillis,
270+
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
271+
) {
253272
for (ShardIterator shardIt : shardIts) {
254-
for (ShardRouting shard : shardIt) {
255-
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
273+
boolean canMatch = true;
274+
final ShardId shardId = shardIt.shardId();
275+
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
276+
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
277+
if (coordinatorRewriteContext != null) {
278+
var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias);
279+
shardRequest.source(new SearchSourceBuilder().query(indexFilter));
280+
try {
281+
canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext);
282+
} catch (Exception e) {
283+
// treat as if shard is still a potential match
284+
}
285+
}
286+
}
287+
if (canMatch) {
288+
for (ShardRouting shard : shardIt) {
289+
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
290+
}
291+
} else {
292+
unmatchedShardIds.add(shardId);
256293
}
257294
}
258295
}

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ private void doExecuteForked(
240240
final RequestDispatcher requestDispatcher = new RequestDispatcher(
241241
clusterService,
242242
transportService,
243+
indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis),
243244
task,
244245
request,
245246
localIndices,
@@ -263,7 +264,7 @@ private void doExecuteForked(
263264
searchCoordinationExecutor,
264265
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
265266
);
266-
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
267+
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
267268
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
268269
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
269270
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
@@ -350,11 +351,13 @@ private void mergeIndexResponses(
350351
}
351352

352353
private static FieldCapabilitiesRequest prepareRemoteRequest(
354+
String clusterAlias,
353355
FieldCapabilitiesRequest request,
354356
OriginalIndices originalIndices,
355357
long nowInMillis
356358
) {
357359
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
360+
remoteRequest.clusterAlias(clusterAlias);
358361
remoteRequest.setMergeResults(false); // we need to merge on this node
359362
remoteRequest.indicesOptions(originalIndices.indicesOptions());
360363
remoteRequest.indices(originalIndices.indices());

0 commit comments

Comments
 (0)