From 3b7b72216c2b3f40ae393dba2ab02034ec64fa71 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 11 Nov 2025 11:06:21 +0100 Subject: [PATCH 1/4] Reuse grouped indices --- .../esql/action/CrossClusterAsyncQueryIT.java | 6 +-- .../action/CrossClusterAsyncQueryStopIT.java | 16 +++---- .../esql/action/CrossClusterEnrichIT.java | 2 +- .../esql/action/CrossClusterQueryIT.java | 16 +++---- ...CrossClusterQueryUnavailableRemotesIT.java | 12 ++--- .../CrossClusterQueryWithFiltersIT.java | 2 +- .../esql/plugin/RemoteIndexResolutionIT.java | 2 +- .../xpack/esql/action/EsqlExecutionInfo.java | 45 ++++++++++--------- .../xpack/esql/planner/PlannerUtils.java | 14 ------ .../xpack/esql/plugin/ComputeService.java | 18 +++++++- .../xpack/esql/session/EsqlCCSUtils.java | 6 +-- .../xpack/esql/session/EsqlCCSUtilsTests.java | 38 ++++++++-------- 12 files changed, 90 insertions(+), 87 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java index cb404c0c93006..7a8afba2c7656 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java @@ -168,17 +168,17 @@ public void testAsyncQueriesWithLimit0() throws IOException { EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("logs*")); assertClusterInfoSuccess(remoteCluster, 0); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); assertClusterInfoSuccess(remote2Cluster, 0); - assertThat(remote2Cluster.getIndexExpression(), equalTo("logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("logs*")); assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertClusterInfoSuccess(localCluster, 0); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertClusterMetadataInResponse(resp, responseExpectMeta, 3); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index e9885d155187a..6fe83b9bf17a5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -116,15 +116,15 @@ public void testStopQuery() throws Exception { assertThat(executionInfo.isPartial(), equalTo(true)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("logs-*")); assertClusterInfoSuccess(remoteCluster, remote1NumShards); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("blocking")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs-*")); assertClusterInfoSuccess(localCluster, localNumShards); assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); @@ -214,15 +214,15 @@ public void testStopQueryLocal() throws Exception { assertThat(executionInfo.isPartial(), equalTo(true)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("logs-*")); assertClusterInfoSuccess(remoteCluster, remote1NumShards); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("logs-*")); assertClusterInfoSuccess(remote2Cluster, remote2NumShards); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("blocking")); + assertThat(localCluster.getOriginalIndices(), equalTo("blocking")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); @@ -346,11 +346,11 @@ public void testStopQueryInlineStats() throws Exception { assertThat(executionInfo.isPartial(), equalTo(true)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("blocking")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs-*")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 2); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index 91fd8827ff765..0639793bc8191 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -546,7 +546,7 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf for (EsqlExecutionInfo.Cluster cluster : clusters) { assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); - assertThat(cluster.getIndexExpression(), equalTo("events")); + assertThat(cluster.getOriginalIndices(), equalTo("events")); assertThat(cluster.getTotalShards(), equalTo(1)); assertThat(cluster.getSuccessfulShards(), equalTo(1)); assertThat(cluster.getSkippedShards(), equalTo(0)); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index 1f7a116ee5b4f..55a1ef614da26 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -64,7 +64,7 @@ protected Map skipUnavailableForRemoteClusters() { } protected void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster clusterInfo, int numShards, long overallTookMillis) { - assertThat(clusterInfo.getIndexExpression(), equalTo("logs-*")); + assertThat(clusterInfo.getOriginalIndices(), equalTo("logs-*")); assertThat(clusterInfo.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); super.assertClusterInfoSuccess(clusterInfo, numShards); } @@ -471,7 +471,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu for (ExpectedCluster expectedCluster : expected) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias()); String msg = cluster.getClusterAlias(); - assertThat(msg, cluster.getIndexExpression(), equalTo(expectedCluster.indexExpression())); + assertThat(msg, cluster.getOriginalIndices(), equalTo(expectedCluster.indexExpression())); assertThat(msg, cluster.getStatus(), equalTo(expectedCluster.status())); assertThat(msg, cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(msg, cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); @@ -542,11 +542,11 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("no_such_index*")); assertClusterInfoSuccess(remoteCluster, 0); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("logs-*,no_such_index*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs-*,no_such_index*")); assertClusterInfoSuccess(localCluster, localNumShards); } finally { clearSkipUnavailable(); @@ -588,7 +588,7 @@ public void testCCSExecutionOnSearchesWithLimit0() throws Exception { assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("*")); assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertClusterInfoSuccess(remoteCluster, 0); @@ -792,7 +792,7 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException { assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-2*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("logs-2*")); assertClusterInfoSkipped(remoteCluster); assertThat(remoteCluster.getFailures().getFirst().reason(), containsString("Accessing failing field")); } @@ -813,7 +813,7 @@ public void testRemoteFailureInlinestats() throws IOException { assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-2*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("logs-2*")); assertClusterInfoSkipped(remoteCluster); assertThat(remoteCluster.getFailures().getFirst().reason(), containsString("Accessing failing field")); } @@ -827,7 +827,7 @@ public void testRemoteFailureInlinestats() throws IOException { assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-2*")); + assertThat(remoteCluster.getOriginalIndices(), equalTo("logs-2*")); assertClusterInfoSkipped(remoteCluster); assertThat(remoteCluster.getFailures().getFirst().reason(), containsString("Accessing failing field")); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java index 913d88233d469..348ef10ebe948 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -31,13 +31,13 @@ protected boolean reuseClusters() { } protected void assertClusterInfoSkipped(EsqlExecutionInfo.Cluster clusterInfo, long overallTookMillis) { - assertThat(clusterInfo.getIndexExpression(), equalTo("logs-*")); + assertThat(clusterInfo.getOriginalIndices(), equalTo("logs-*")); assertThat(clusterInfo.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); super.assertClusterInfoSkipped(clusterInfo); } protected void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster clusterInfo, int numShards, long overallTookMillis) { - assertThat(clusterInfo.getIndexExpression(), equalTo("logs-*")); + assertThat(clusterInfo.getOriginalIndices(), equalTo("logs-*")); assertThat(clusterInfo.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); super.assertClusterInfoSuccess(clusterInfo, numShards); } @@ -110,11 +110,11 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); assertClusterInfoSkipped(remote1Cluster); - assertThat(remote2Cluster.getIndexExpression(), equalTo("nomatch*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("nomatch*")); assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("nomatch*")); + assertThat(localCluster.getOriginalIndices(), equalTo("nomatch*")); // local cluster should never be marked as SKIPPED assertClusterInfoSuccess(localCluster, 0); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); @@ -228,11 +228,11 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrueInlineStats() EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); assertClusterInfoSkipped(remote1Cluster); - assertThat(remote2Cluster.getIndexExpression(), equalTo("nomatch*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("nomatch*")); assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("nomatch*")); + assertThat(localCluster.getOriginalIndices(), equalTo("nomatch*")); // local cluster should never be marked as SKIPPED assertClusterInfoSuccess(localCluster, 0); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index dddf7cbd9d5f2..adae6e6e8a410 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -51,7 +51,7 @@ protected boolean reuseClusters() { } protected void assertClusterMetadata(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression, Status status) { - assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); + assertThat(clusterMetatata.getOriginalIndices(), equalTo(indexExpression)); assertThat(clusterMetatata.getStatus(), equalTo(status)); assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/RemoteIndexResolutionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/RemoteIndexResolutionIT.java index 085f58d6c09b2..6ea044c92c7c2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/RemoteIndexResolutionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/RemoteIndexResolutionIT.java @@ -212,7 +212,7 @@ private static List executionInfo(EsqlQueryResponse r .getClusters() .values() .stream() - .map(cluster -> new EsqlResponseExecutionInfo(cluster.getClusterAlias(), cluster.getIndexExpression(), cluster.getStatus())) + .map(cluster -> new EsqlResponseExecutionInfo(cluster.getClusterAlias(), cluster.getOriginalIndices(), cluster.getStatus())) .toList(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 3116978f67696..b86d0112fb8e0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -397,7 +397,10 @@ public static class Cluster implements ToXContentFragment, Writeable { public static final ParseField TOOK = new ParseField("took"); private final String clusterAlias; - private final String indexExpression; // original index expression from the user for this cluster + /** + * This cluster's indices as specified in the query. + */ + private final String originalIndices; private final boolean skipUnavailable; private final Cluster.Status status; private final Integer totalShards; @@ -423,8 +426,8 @@ public String toString() { } } - public Cluster(String clusterAlias, String indexExpression) { - this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null); + public Cluster(String clusterAlias, String originalIndices) { + this(clusterAlias, originalIndices, true, Cluster.Status.RUNNING, null, null, null, null, null, null); } /** @@ -432,28 +435,28 @@ public Cluster(String clusterAlias, String indexExpression) { * * @param clusterAlias clusterAlias as defined in the remote cluster settings or RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY * for the local cluster - * @param indexExpression the original (not resolved/concrete) indices expression provided for this cluster. + * @param originalIndices the original (not resolved/concrete) indices expression provided for this cluster. * @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings */ - public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) { - this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null); + public Cluster(String clusterAlias, String originalIndices, boolean skipUnavailable) { + this(clusterAlias, originalIndices, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null); } /** * Create a Cluster with a new Status other than the default of RUNNING. * @param clusterAlias clusterAlias as defined in the remote cluster settings or RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY * for the local cluster - * @param indexExpression the original (not resolved/concrete) indices expression provided for this cluster. + * @param originalIndices the original (not resolved/concrete) indices expression provided for this cluster. * @param skipUnavailable whether cluster is marked as skip_unavailable in remote cluster settings * @param status current status of the search on this Cluster */ - public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) { - this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null); + public Cluster(String clusterAlias, String originalIndices, boolean skipUnavailable, Cluster.Status status) { + this(clusterAlias, originalIndices, skipUnavailable, status, null, null, null, null, null, null); } public Cluster( String clusterAlias, - String indexExpression, + String originalIndices, boolean skipUnavailable, Cluster.Status status, Integer totalShards, @@ -464,10 +467,10 @@ public Cluster( TimeValue took ) { assert clusterAlias != null : "clusterAlias cannot be null"; - assert indexExpression != null : "indexExpression of Cluster cannot be null"; + assert originalIndices != null : "indexExpression of Cluster cannot be null"; assert status != null : "status of Cluster cannot be null"; this.clusterAlias = clusterAlias; - this.indexExpression = indexExpression; + this.originalIndices = originalIndices; this.skipUnavailable = skipUnavailable; this.status = status; this.totalShards = totalShards; @@ -480,7 +483,7 @@ public Cluster( public Cluster(StreamInput in) throws IOException { this.clusterAlias = in.readString(); - this.indexExpression = in.readString(); + this.originalIndices = in.readString(); this.status = Cluster.Status.valueOf(in.readString().toUpperCase(Locale.ROOT)); this.totalShards = in.readOptionalInt(); this.successfulShards = in.readOptionalInt(); @@ -494,7 +497,7 @@ public Cluster(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterAlias); - out.writeString(indexExpression); + out.writeString(originalIndices); out.writeString(status.toString()); out.writeOptionalInt(totalShards); out.writeOptionalInt(successfulShards); @@ -536,7 +539,7 @@ public Builder(Cluster copyFrom) { public Cluster build() { return new Cluster( original.getClusterAlias(), - original.getIndexExpression(), + original.getOriginalIndices(), original.isSkipUnavailable(), status != null ? status : original.getStatus(), totalShards != null ? totalShards : original.getTotalShards(), @@ -599,7 +602,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(name); { builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString()); - builder.field(INDICES_FIELD.getPreferredName(), indexExpression); + builder.field(INDICES_FIELD.getPreferredName(), originalIndices); if (took != null && status != Status.RUNNING) { builder.field(TOOK.getPreferredName(), took.millis()); } @@ -638,8 +641,8 @@ public String getClusterAlias() { return clusterAlias; } - public String getIndexExpression() { - return indexExpression; + public String getOriginalIndices() { + return originalIndices; } public boolean isSkipUnavailable() { @@ -684,7 +687,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Cluster cluster = (Cluster) o; return Objects.equals(clusterAlias, cluster.clusterAlias) - && Objects.equals(indexExpression, cluster.indexExpression) + && Objects.equals(originalIndices, cluster.originalIndices) && status == cluster.status && Objects.equals(totalShards, cluster.totalShards) && Objects.equals(successfulShards, cluster.successfulShards) @@ -695,7 +698,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(clusterAlias, indexExpression, status, totalShards, successfulShards, skippedShards, failedShards, took); + return Objects.hash(clusterAlias, originalIndices, status, totalShards, successfulShards, skippedShards, failedShards, took); } @Override @@ -717,7 +720,7 @@ public String toString() { + ", took=" + took + ", indexExpression='" - + indexExpression + + originalIndices + '\'' + ", skipUnavailable=" + skipUnavailable diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 2ea6c3c8f5ed3..5959da45ff616 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.planner; import org.elasticsearch.TransportVersion; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.aggregation.AggregatorMode; @@ -68,7 +67,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static java.util.Arrays.asList; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.NONE; @@ -167,18 +165,6 @@ public static Set planConcreteIndices(PhysicalPlan plan) { return indices; } - /** - * Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. - */ - public static String[] planOriginalIndices(PhysicalPlan plan) { - if (plan == null) { - return Strings.EMPTY_ARRAY; - } - var indices = new LinkedHashSet(); - forEachRelation(plan, relation -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(relation.indexPattern())))); - return indices.toArray(String[]::new); - } - public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) { return plan.anyMatch(e -> { if (e instanceof FragmentExec f) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ba49b683d9d1c..670f52df738f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -15,7 +15,9 @@ import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; @@ -368,8 +370,7 @@ public void executePlan( return; } } - Map clusterToOriginalIndices = transportService.getRemoteClusterService() - .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan)); + Map clusterToOriginalIndices = getOriginalIndices(execInfo); var localOriginalIndices = clusterToOriginalIndices.remove(LOCAL_CLUSTER); var localConcreteIndices = clusterToConcreteIndices.remove(LOCAL_CLUSTER); /* @@ -798,4 +799,17 @@ public String getDescription() { return "group [" + parentDescription.get() + "]"; } } + + /** + * Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. + */ + private static Map getOriginalIndices(EsqlExecutionInfo executionInfo) { + return Maps.transformValues( + executionInfo.getClusters(), + cluster -> new OriginalIndices( + Strings.commaDelimitedListToStringArray(cluster.getOriginalIndices()), + SearchRequest.DEFAULT_INDICES_OPTIONS + ) + ); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index a9a5b354fd029..1b99db5edd0f9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -213,8 +213,8 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { - final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); - if (concreteIndexRequested(executionInfo.getCluster(c).getIndexExpression())) { + final String indexExpression = executionInfo.getCluster(c).getOriginalIndices(); + if (concreteIndexRequested(executionInfo.getCluster(c).getOriginalIndices())) { String error = Strings.format( "Unknown index [%s]", (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression) @@ -340,7 +340,7 @@ public static void initCrossClusterState( executionInfo.swapCluster(clusterAlias, (k, v) -> { var indexExpr = Strings.arrayToCommaDelimitedString(indices.indices()); if (v != null) { - indexExpr = v.getIndexExpression() + "," + indexExpr; + indexExpr = v.getOriginalIndices() + "," + indexExpr; } return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 4504e2fdec96b..37f2f199c56b3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -128,15 +128,15 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { assertNull(executionInfo.overallTook()); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getOriginalIndices(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); } @@ -179,15 +179,15 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { assertNull(executionInfo.overallTook()); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getOriginalIndices(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } } @@ -227,15 +227,15 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, Set.of(indexResolution)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getOriginalIndices(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } @@ -270,11 +270,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, Set.of(indexResolution)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getOriginalIndices(), equalTo("*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); @@ -283,7 +283,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertThat(remote1Cluster.getFailedShards(), equalTo(0)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } @@ -312,16 +312,16 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, Set.of(indexResolution)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getOriginalIndices(), equalTo("*")); // since remote1 is in the failures Map (passed to IndexResolution.valid), assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("mylogs1*,mylogs2*,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote2Cluster.getTotalShards(), equalTo(0)); @@ -353,7 +353,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, Set.of(indexResolution)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); @@ -402,16 +402,16 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, Set.of(indexResolution)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getOriginalIndices(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getOriginalIndices(), equalTo("*")); // skipped since remote1 is in the failures Map assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); + assertThat(remote2Cluster.getOriginalIndices(), equalTo("mylogs1*,mylogs2*,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); } } From 6d14dfc9f68e2914427579b515b666225c6d3602 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 12 Nov 2025 10:28:56 +0100 Subject: [PATCH 2/4] handle concrete indices --- .../referable/esql_concrete_indices.csv | 1 + .../resources/transport/upper_bounds/9.3.csv | 2 +- .../xpack/esql/action/EsqlExecutionInfo.java | 45 +++++++++++++++---- .../xpack/esql/planner/PlannerUtils.java | 13 ------ .../xpack/esql/plugin/ComputeService.java | 19 ++++---- .../xpack/esql/session/EsqlCCSUtils.java | 10 +++++ .../xpack/esql/session/EsqlSession.java | 1 + .../esql/action/EsqlQueryResponseTests.java | 3 ++ .../action/EsqlResponseListenerTests.java | 2 + 9 files changed, 65 insertions(+), 31 deletions(-) create mode 100644 server/src/main/resources/transport/definitions/referable/esql_concrete_indices.csv diff --git a/server/src/main/resources/transport/definitions/referable/esql_concrete_indices.csv b/server/src/main/resources/transport/definitions/referable/esql_concrete_indices.csv new file mode 100644 index 0000000000000..8d6255874d132 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_concrete_indices.csv @@ -0,0 +1 @@ +9216000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 6b0edb76f268f..1f55311f30cb7 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -inference_api_eis_authorization_persistent_task,9215000 +esql_concrete_indices,9216000 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index b86d0112fb8e0..5c6eafb0cb111 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -64,6 +64,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial"); private static final TransportVersion ESQL_QUERY_PLANNING_DURATION = TransportVersion.fromName("esql_query_planning_duration"); + private static final TransportVersion CONCRETE_INDICES_VERSION = TransportVersion.fromName("esql_concrete_indices"); // Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query // The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search. @@ -398,9 +399,13 @@ public static class Cluster implements ToXContentFragment, Writeable { private final String clusterAlias; /** - * This cluster's indices as specified in the query. + * This cluster's indices as specified in the query. They may contain aliases, patterns, etc. */ private final String originalIndices; + /** + * This cluster's concrete/resolved indices. + */ + private final String concreteIndices; private final boolean skipUnavailable; private final Cluster.Status status; private final Integer totalShards; @@ -427,7 +432,7 @@ public String toString() { } public Cluster(String clusterAlias, String originalIndices) { - this(clusterAlias, originalIndices, true, Cluster.Status.RUNNING, null, null, null, null, null, null); + this(clusterAlias, originalIndices, null, true, Cluster.Status.RUNNING, null, null, null, null, null, null); } /** @@ -439,7 +444,7 @@ public Cluster(String clusterAlias, String originalIndices) { * @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings */ public Cluster(String clusterAlias, String originalIndices, boolean skipUnavailable) { - this(clusterAlias, originalIndices, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null); + this(clusterAlias, originalIndices, null, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null); } /** @@ -451,12 +456,13 @@ public Cluster(String clusterAlias, String originalIndices, boolean skipUnavaila * @param status current status of the search on this Cluster */ public Cluster(String clusterAlias, String originalIndices, boolean skipUnavailable, Cluster.Status status) { - this(clusterAlias, originalIndices, skipUnavailable, status, null, null, null, null, null, null); + this(clusterAlias, originalIndices, null, skipUnavailable, status, null, null, null, null, null, null); } public Cluster( String clusterAlias, String originalIndices, + String concreteIndices, boolean skipUnavailable, Cluster.Status status, Integer totalShards, @@ -471,6 +477,7 @@ public Cluster( assert status != null : "status of Cluster cannot be null"; this.clusterAlias = clusterAlias; this.originalIndices = originalIndices; + this.concreteIndices = concreteIndices; this.skipUnavailable = skipUnavailable; this.status = status; this.totalShards = totalShards; @@ -484,6 +491,7 @@ public Cluster( public Cluster(StreamInput in) throws IOException { this.clusterAlias = in.readString(); this.originalIndices = in.readString(); + this.concreteIndices = in.getTransportVersion().supports(CONCRETE_INDICES_VERSION) ? in.readString() : null; this.status = Cluster.Status.valueOf(in.readString().toUpperCase(Locale.ROOT)); this.totalShards = in.readOptionalInt(); this.successfulShards = in.readOptionalInt(); @@ -498,6 +506,9 @@ public Cluster(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterAlias); out.writeString(originalIndices); + if (out.getTransportVersion().supports(CONCRETE_INDICES_VERSION)) { + out.writeString(concreteIndices); + } out.writeString(status.toString()); out.writeOptionalInt(totalShards); out.writeOptionalInt(successfulShards); @@ -508,6 +519,22 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(failures); } + public Cluster withConcreteIndices(String concreteIndices) { + return new Cluster( + clusterAlias, + originalIndices, + concreteIndices, + skipUnavailable, + status, + totalShards, + successfulShards, + skippedShards, + failedShards, + failures, + took + ); + } + /** * Since the Cluster object is immutable, use this Builder class to create * a new Cluster object using the "copyFrom" Cluster passed in and set only @@ -540,6 +567,7 @@ public Cluster build() { return new Cluster( original.getClusterAlias(), original.getOriginalIndices(), + original.getConcreteIndices(), original.isSkipUnavailable(), status != null ? status : original.getStatus(), totalShards != null ? totalShards : original.getTotalShards(), @@ -632,11 +660,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - @Override - public boolean isFragment() { - return ToXContentFragment.super.isFragment(); - } - public String getClusterAlias() { return clusterAlias; } @@ -645,6 +668,10 @@ public String getOriginalIndices() { return originalIndices; } + public String getConcreteIndices() { + return concreteIndices; + } + public boolean isSkipUnavailable() { return skipUnavailable; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 5959da45ff616..c825d92d254d8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -60,7 +60,6 @@ import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; @@ -153,18 +152,6 @@ private static ReducedPlan getPhysicalPlanReduction(int estimatedRowSize, Physic return new ReducedPlan(EstimatesRowSize.estimateRowSize(estimatedRowSize, plan)); } - /** - * Returns a set of concrete indices after resolving the original indices specified in the FROM command. - */ - public static Set planConcreteIndices(PhysicalPlan plan) { - if (plan == null) { - return Set.of(); - } - var indices = new LinkedHashSet(); - forEachRelation(plan, relation -> indices.addAll(relation.concreteIndices())); - return indices; - } - public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) { return plan.anyMatch(e -> { if (e instanceof FragmentExec f) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 670f52df738f2..944666df27080 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -326,8 +326,10 @@ public void executePlan( listener.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + dataNodePlan)); return; } - Map clusterToConcreteIndices = transportService.getRemoteClusterService() - .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new)); + + // Returns the concrete/resolved indices used in the FROM command of the query. + Map clusterToConcreteIndices = getIndices(execInfo, EsqlExecutionInfo.Cluster::getConcreteIndices); + QueryPragmas queryPragmas = configuration.pragmas(); Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask); if (dataNodePlan == null) { @@ -370,7 +372,8 @@ public void executePlan( return; } } - Map clusterToOriginalIndices = getOriginalIndices(execInfo); + // Gets the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. + Map clusterToOriginalIndices = getIndices(execInfo, EsqlExecutionInfo.Cluster::getOriginalIndices); var localOriginalIndices = clusterToOriginalIndices.remove(LOCAL_CLUSTER); var localConcreteIndices = clusterToConcreteIndices.remove(LOCAL_CLUSTER); /* @@ -800,14 +803,14 @@ public String getDescription() { } } - /** - * Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. - */ - private static Map getOriginalIndices(EsqlExecutionInfo executionInfo) { + private static Map getIndices( + EsqlExecutionInfo executionInfo, + Function getter + ) { return Maps.transformValues( executionInfo.getClusters(), cluster -> new OriginalIndices( - Strings.commaDelimitedListToStringArray(cluster.getOriginalIndices()), + Strings.commaDelimitedListToStringArray(getter.apply(cluster)), SearchRequest.DEFAULT_INDICES_OPTIONS ) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 1b99db5edd0f9..ba0ff2bd5ea17 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -41,6 +41,7 @@ import java.util.Objects; import java.util.Set; +import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toSet; @@ -165,6 +166,15 @@ static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExec .collect(joining(",")); } + static void updateExecutionInfoWithResolvedConcreteIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + indexResolution.resolvedIndices() + .stream() + .collect(groupingBy(RemoteClusterAware::parseClusterAlias, joining(","))) + .forEach((clusterAlias, indices) -> { + executionInfo.swapCluster(clusterAlias, (k, v) -> v.withConcreteIndices(indices)); + }); + } + static void updateExecutionInfoWithUnavailableClusters( EsqlExecutionInfo execInfo, Map> failures diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 4bde66c6a9842..f7420113c7852 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -854,6 +854,7 @@ private void preAnalyzeMainIndices( preAnalysis.useAggregateMetricDoubleWhenNotSupported(), preAnalysis.useDenseVectorWhenNotSupported(), listener.delegateFailureAndWrap((l, indexResolution) -> { + EsqlCCSUtils.updateExecutionInfoWithResolvedConcreteIndices(executionInfo, indexResolution.inner()); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures()); l.onResponse( result.withIndices(indexPattern, indexResolution.inner()) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index edac6b1c5b3eb..bf0bd4f4b85c8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -165,6 +165,7 @@ EsqlExecutionInfo createExecutionInfo() { (k, v) -> new EsqlExecutionInfo.Cluster( "", "logs-1", + "logs-1", false, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 10, @@ -180,6 +181,7 @@ EsqlExecutionInfo createExecutionInfo() { (k, v) -> new EsqlExecutionInfo.Cluster( "remote1", "remote1:logs-1", + "remote1:logs-1", true, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 12, @@ -564,6 +566,7 @@ private static EsqlExecutionInfo.Cluster parseCluster(String clusterAlias, XCont return new EsqlExecutionInfo.Cluster( clusterAlias, indexExpression, + indexExpression, true, EsqlExecutionInfo.Cluster.Status.valueOf(status.toUpperCase(Locale.ROOT)), totalShardsFinal, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java index c96af8b00ab99..cdd06ee5908ee 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java @@ -64,6 +64,7 @@ public void testLogPartialFailures() { (k, v) -> new EsqlExecutionInfo.Cluster( LOCAL_CLUSTER_ALIAS, "idx", + "idx", false, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 10, @@ -97,6 +98,7 @@ public void testLogPartialFailuresRemote() { (k, v) -> new EsqlExecutionInfo.Cluster( "remote_cluster", "idx", + "idx", false, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 10, From de654098cf1424389ec6bce209a15b3d70288cbc Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 12 Nov 2025 11:20:16 +0100 Subject: [PATCH 3/4] fix index grouping --- .../elasticsearch/xpack/esql/plugin/IndexResolutionIT.java | 7 ++++--- .../org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java | 6 +++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java index 6ba92a4cf47f2..bbd7d3296ac4d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java @@ -232,9 +232,10 @@ public void testPartialResolution() { assertAcked(client().admin().indices().prepareCreate("index-2")); indexRandom(true, "index-2", 1); - try (var response = run(syncEsqlQueryRequest().query("FROM index-1,nonexisting-1"))) { - assertOk(response); // okay when present index is empty - } + // TODO + // try (var response = run(syncEsqlQueryRequest().query("FROM index-1,nonexisting-1"))) { + // assertOk(response); // okay when present index is empty + // } expectThrows( IndexNotFoundException.class, equalTo("no such index [nonexisting-1]"), // fails when present index is non-empty diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index ba0ff2bd5ea17..31e1ea8843d25 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -43,6 +43,7 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toSet; public class EsqlCCSUtils { @@ -169,7 +170,10 @@ static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExec static void updateExecutionInfoWithResolvedConcreteIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { indexResolution.resolvedIndices() .stream() - .collect(groupingBy(RemoteClusterAware::parseClusterAlias, joining(","))) + .map(RemoteClusterAware::splitIndexName) + .collect( + groupingBy(it -> it[0] == null ? RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY : it[0], mapping(it -> it[1], joining(","))) + ) .forEach((clusterAlias, indices) -> { executionInfo.swapCluster(clusterAlias, (k, v) -> v.withConcreteIndices(indices)); }); From 9660bb1ce0ee6e9add6a0bba31c28706d50f0069 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 13 Nov 2025 15:18:22 +0100 Subject: [PATCH 4/4] handle selection from empty --- .../esql/action/QueryExecutionMetadataIT.java | 2 +- .../xpack/esql/plugin/IndexResolutionIT.java | 25 ++++++++++++------- .../xpack/esql/plugin/ComputeService.java | 20 ++++++--------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/QueryExecutionMetadataIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/QueryExecutionMetadataIT.java index 6e0594e5c1f08..96acfa77b98b4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/QueryExecutionMetadataIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/QueryExecutionMetadataIT.java @@ -29,7 +29,7 @@ protected Map skipUnavailableForRemoteClusters() { } protected void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster clusterInfo, int numShards, long overallTookMillis) { - assertThat(clusterInfo.getIndexExpression(), equalTo("logs-*")); + assertThat(clusterInfo.getOriginalIndices(), equalTo("logs-*")); assertThat(clusterInfo.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); super.assertClusterInfoSuccess(clusterInfo, numShards); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java index bbd7d3296ac4d..d1f2aa83a08e5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/IndexResolutionIT.java @@ -229,28 +229,35 @@ public void testUnavailableIndex() { public void testPartialResolution() { assertAcked(client().admin().indices().prepareCreate("index-1")); - assertAcked(client().admin().indices().prepareCreate("index-2")); - indexRandom(true, "index-2", 1); + indexRandom(true, "index-1", 1); - // TODO - // try (var response = run(syncEsqlQueryRequest().query("FROM index-1,nonexisting-1"))) { - // assertOk(response); // okay when present index is empty - // } expectThrows( IndexNotFoundException.class, equalTo("no such index [nonexisting-1]"), // fails when present index is non-empty - () -> run(syncEsqlQueryRequest().query("FROM index-2,nonexisting-1")) + () -> run(syncEsqlQueryRequest().query("FROM index-1,nonexisting-1")) ); expectThrows( IndexNotFoundException.class, equalTo("no such index [nonexisting-1]"), // fails when present index is non-empty even if allow_partial=true - () -> run(syncEsqlQueryRequest().query("FROM index-2,nonexisting-1").allowPartialResults(true)) + () -> run(syncEsqlQueryRequest().query("FROM index-1,nonexisting-1").allowPartialResults(true)) ); expectThrows( IndexNotFoundException.class, equalTo("no such index [nonexisting-1]"), // only the first missing index is reported - () -> run(syncEsqlQueryRequest().query("FROM index-2,nonexisting-1,nonexisting-2")) + () -> run(syncEsqlQueryRequest().query("FROM index-1,nonexisting-1,nonexisting-2")) ); + + assertAcked(client().admin().indices().prepareCreate("index-2").setMapping("field", "type=keyword")); + expectThrows( + IndexNotFoundException.class, + equalTo("no such index [nonexisting-1]"), // fails when present index has no documents and non-empty mapping + () -> run(syncEsqlQueryRequest().query("FROM index-2,nonexisting-1")) + ); + + assertAcked(client().admin().indices().prepareCreate("index-3")); + try (var response = run(syncEsqlQueryRequest().query("FROM index-3,nonexisting-1"))) { + assertOk(response); // passes when the present index has no fields and no documents + } } public void testResolutionWithFilter() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 88282d04a909c..29ba7b508cf4c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -334,12 +334,6 @@ public void executePlan( QueryPragmas queryPragmas = configuration.pragmas(); Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask); if (dataNodePlan == null) { - if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) { - String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices; - assert false : error; - listener.onFailure(new IllegalStateException(error)); - return; - } var computeContext = new ComputeContext( newChildSession(sessionId), profileDescription(profileQualifier, "single"), @@ -365,13 +359,13 @@ public void executePlan( runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute()); return; } - } else { - if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) { - var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan; - assert false : error; - listener.onFailure(new IllegalStateException(error)); - return; - } + } + + if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) { + var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan; + assert false : error; + listener.onFailure(new IllegalStateException(error)); + return; } // Gets the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. Map clusterToOriginalIndices = getIndices(execInfo, EsqlExecutionInfo.Cluster::getOriginalIndices);