diff --git a/docs/changelog/127734.yaml b/docs/changelog/127734.yaml new file mode 100644 index 0000000000000..d33b201744c46 --- /dev/null +++ b/docs/changelog/127734.yaml @@ -0,0 +1,5 @@ +pr: 127734 +summary: Run coordinating `can_match` in field-caps +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/fieldcaps/FieldCapsWithFilterIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/fieldcaps/FieldCapsWithFilterIT.java new file mode 100644 index 0000000000000..d53020702ba8b --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/fieldcaps/FieldCapsWithFilterIT.java @@ -0,0 +1,181 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.fieldcaps; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.PointValues; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; + +public class FieldCapsWithFilterIT extends ESIntegTestCase { + @Override + protected boolean addMockInternalEngine() { + return false; + } + + private static class EngineWithExposingTimestamp extends InternalEngine { + EngineWithExposingTimestamp(EngineConfig engineConfig) { + super(engineConfig); + assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index"; + } + + @Override + public ShardLongFieldRange getRawFieldRange(String field) { + try (Searcher searcher = acquireSearcher("test")) { + final DirectoryReader directoryReader = searcher.getDirectoryReader(); + + final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field); + final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field); + if (minPackedValue == null || maxPackedValue == null) { + assert minPackedValue == null && maxPackedValue == null + : Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue); + return ShardLongFieldRange.EMPTY; + } + + return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) { + return Optional.of(EngineWithExposingTimestamp::new); + } else { + return Optional.of(new InternalEngineFactory()); + } + } + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), ExposingTimestampEnginePlugin.class); + } + + void createIndexAndIndexDocs(String index, Settings.Builder indexSettings, long timestamp, boolean exposeTimestamp) throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setSettings(indexSettings) + .setMapping("@timestamp", "type=date", "position", "type=long") + ); + int numDocs = between(100, 500); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get(); + } + if (exposeTimestamp) { + client.admin().indices().prepareClose(index).get(); + client.admin() + .indices() + .prepareUpdateSettings(index) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + .get(); + client.admin().indices().prepareOpen(index).get(); + assertBusy(() -> { + IndexLongFieldRange timestampRange = clusterService().state().metadata().getProject().index(index).getTimestampRange(); + assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges()); + }); + } else { + client.admin().indices().prepareRefresh(index).get(); + } + } + + public void testSkipUnmatchedShards() throws Exception { + long oldTimestamp = randomLongBetween(10_000_000, 20_000_000); + long newTimestamp = randomLongBetween(30_000_000, 50_000_000); + String redNode = internalCluster().startDataOnlyNode(); + String blueNode = internalCluster().startDataOnlyNode(); + createIndexAndIndexDocs( + "index_old", + indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", redNode), + oldTimestamp, + true + ); + internalCluster().stopNode(redNode); + createIndexAndIndexDocs( + "index_new", + indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", blueNode), + newTimestamp, + false + ); + // fails without index filter + { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("index_*"); + request.fields("*"); + request.setMergeResults(false); + if (randomBoolean()) { + request.indexFilter(new RangeQueryBuilder("@timestamp").from(oldTimestamp)); + } + var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request)); + assertThat(response.getIndexResponses(), hasSize(1)); + assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new")); + assertThat(response.getFailures(), hasSize(1)); + assertThat(response.getFailures().get(0).getIndices(), equalTo(new String[] { "index_old" })); + assertThat(response.getFailures().get(0).getException(), instanceOf(NoShardAvailableActionException.class)); + } + // skip unavailable shards with index filter + { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("index_*"); + request.fields("*"); + request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp)); + request.setMergeResults(false); + var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request)); + assertThat(response.getIndexResponses(), hasSize(1)); + assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new")); + assertThat(response.getFailures(), empty()); + } + // skip both indices on the coordinator, one the data nodes + { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("index_*"); + request.fields("*"); + request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp * 2L)); + request.setMergeResults(false); + var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request)); + assertThat(response.getIndexResponses(), empty()); + assertThat(response.getFailures(), empty()); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index 6450b964f1f5b..cdafab819e091 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -906,6 +906,10 @@ static void unblockOnRewrite() { @Override protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + // skip rewriting on the coordinator + if (queryRewriteContext.convertToCoordinatorRewriteContext() != null) { + return this; + } try { blockingLatch.await(); } catch (InterruptedException e) { diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index ee2da3a38dd33..87ac3d9ebe992 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -244,6 +244,7 @@ static TransportVersion def(int id) { public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0); public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00); public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00); + public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java index 88eb2ef4fb13d..efbde6264e91c 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -37,6 +38,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind public static final String NAME = "field_caps_request"; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); + private String clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + private String[] indices = Strings.EMPTY_ARRAY; private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; private String[] fields = Strings.EMPTY_ARRAY; @@ -67,6 +70,11 @@ public FieldCapabilitiesRequest(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { includeEmptyFields = in.readBoolean(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) { + clusterAlias = in.readOptionalString(); + } else { + clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + } } public FieldCapabilitiesRequest() {} @@ -90,6 +98,14 @@ public void setMergeResults(boolean mergeResults) { this.mergeResults = mergeResults; } + void clusterAlias(String clusterAlias) { + this.clusterAlias = clusterAlias; + } + + String clusterAlias() { + return clusterAlias; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -108,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { out.writeBoolean(includeEmptyFields); } + if (out.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) { + out.writeOptionalString(clusterAlias); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index 93095e872858a..c56fd985c9e2b 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -26,8 +26,14 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -72,6 +78,7 @@ final class RequestDispatcher { ClusterService clusterService, TransportService transportService, ProjectResolver projectResolver, + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider, Task parentTask, FieldCapabilitiesRequest fieldCapsRequest, OriginalIndices originalIndices, @@ -105,8 +112,14 @@ final class RequestDispatcher { onIndexFailure.accept(index, e); continue; } - final IndexSelector indexResult = new IndexSelector(shardIts); - if (indexResult.nodeToShards.isEmpty()) { + final IndexSelector indexResult = new IndexSelector( + fieldCapsRequest.clusterAlias(), + shardIts, + fieldCapsRequest.indexFilter(), + nowInMillis, + coordinatorRewriteContextProvider + ); + if (indexResult.nodeToShards.isEmpty() && indexResult.unmatchedShardIds.isEmpty()) { onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy")); } else { this.indexSelectors.put(index, indexResult); @@ -255,10 +268,34 @@ private static class IndexSelector { private final Set unmatchedShardIds = new HashSet<>(); private final Map failures = new HashMap<>(); - IndexSelector(List shardIts) { + IndexSelector( + String clusterAlias, + List shardIts, + QueryBuilder indexFilter, + long nowInMillis, + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider + ) { for (ShardIterator shardIt : shardIts) { - for (ShardRouting shard : shardIt) { - nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); + boolean canMatch = true; + final ShardId shardId = shardIt.shardId(); + if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) { + var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex()); + if (coordinatorRewriteContext != null) { + var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias); + shardRequest.source(new SearchSourceBuilder().query(indexFilter)); + try { + canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext); + } catch (Exception e) { + // treat as if shard is still a potential match + } + } + } + if (canMatch) { + for (ShardRouting shard : shardIt) { + nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); + } + } else { + unmatchedShardIds.add(shardId); } } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 1868cd649f0ee..d64da10e0b2f7 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -250,6 +250,7 @@ private void doExecuteForked( clusterService, transportService, projectResolver, + indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis), task, request, localIndices, @@ -273,7 +274,7 @@ private void doExecuteForked( singleThreadedExecutor, RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ); - FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis); + FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis); ActionListener remoteListener = ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()); @@ -383,11 +384,13 @@ private static void mergeIndexResponses( } private static FieldCapabilitiesRequest prepareRemoteRequest( + String clusterAlias, FieldCapabilitiesRequest request, OriginalIndices originalIndices, long nowInMillis ) { FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); + remoteRequest.clusterAlias(clusterAlias); remoteRequest.setMergeResults(false); // we need to merge on this node remoteRequest.indicesOptions(originalIndices.indicesOptions()); remoteRequest.indices(originalIndices.indices()); diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java index f3b9f32ef2930..ddbdd849e97bf 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -135,6 +136,7 @@ public void testHappyCluster() throws Exception { mockClusterService(clusterState), transportService, TestProjectResolvers.singleProject(projectId), + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -206,6 +208,7 @@ public void testRetryThenOk() throws Exception { mockClusterService(clusterState), transportService, TestProjectResolvers.singleProject(projectId), + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -328,6 +331,7 @@ public void testRetryButFails() throws Exception { mockClusterService(clusterState), transportService, TestProjectResolvers.singleProject(projectId), + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -452,6 +456,7 @@ public void testSuccessWithAnyMatch() throws Exception { mockClusterService(clusterState), transportService, TestProjectResolvers.singleProject(projectId), + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -550,6 +555,7 @@ public void testStopAfterAllShardsUnmatched() throws Exception { mockClusterService(clusterState), transportService, TestProjectResolvers.singleProject(projectId), + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -642,6 +648,7 @@ public void testFailWithSameException() throws Exception { mockClusterService(clusterState), transportService, TestProjectResolvers.singleProject(projectId), + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -1032,4 +1039,8 @@ static ClusterService mockClusterService(ClusterState clusterState) { when(clusterService.operationRouting()).thenReturn(operationRouting); return clusterService; } + + static CoordinatorRewriteContextProvider coordinatorRewriteContextProvider() { + return mock(CoordinatorRewriteContextProvider.class); + } }