Skip to content

Commit 6d14dfc

Browse files
committed
handle concrete indices
1 parent 74517fb commit 6d14dfc

File tree

9 files changed

+65
-31
lines changed

9 files changed

+65
-31
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9216000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
inference_api_eis_authorization_persistent_task,9215000
1+
esql_concrete_indices,9216000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
6464
public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial");
6565

6666
private static final TransportVersion ESQL_QUERY_PLANNING_DURATION = TransportVersion.fromName("esql_query_planning_duration");
67+
private static final TransportVersion CONCRETE_INDICES_VERSION = TransportVersion.fromName("esql_concrete_indices");
6768

6869
// Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
6970
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
@@ -398,9 +399,13 @@ public static class Cluster implements ToXContentFragment, Writeable {
398399

399400
private final String clusterAlias;
400401
/**
401-
* This cluster's indices as specified in the query.
402+
* This cluster's indices as specified in the query. They may contain aliases, patterns, etc.
402403
*/
403404
private final String originalIndices;
405+
/**
406+
* This cluster's concrete/resolved indices.
407+
*/
408+
private final String concreteIndices;
404409
private final boolean skipUnavailable;
405410
private final Cluster.Status status;
406411
private final Integer totalShards;
@@ -427,7 +432,7 @@ public String toString() {
427432
}
428433

429434
public Cluster(String clusterAlias, String originalIndices) {
430-
this(clusterAlias, originalIndices, true, Cluster.Status.RUNNING, null, null, null, null, null, null);
435+
this(clusterAlias, originalIndices, null, true, Cluster.Status.RUNNING, null, null, null, null, null, null);
431436
}
432437

433438
/**
@@ -439,7 +444,7 @@ public Cluster(String clusterAlias, String originalIndices) {
439444
* @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings
440445
*/
441446
public Cluster(String clusterAlias, String originalIndices, boolean skipUnavailable) {
442-
this(clusterAlias, originalIndices, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null);
447+
this(clusterAlias, originalIndices, null, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null);
443448
}
444449

445450
/**
@@ -451,12 +456,13 @@ public Cluster(String clusterAlias, String originalIndices, boolean skipUnavaila
451456
* @param status current status of the search on this Cluster
452457
*/
453458
public Cluster(String clusterAlias, String originalIndices, boolean skipUnavailable, Cluster.Status status) {
454-
this(clusterAlias, originalIndices, skipUnavailable, status, null, null, null, null, null, null);
459+
this(clusterAlias, originalIndices, null, skipUnavailable, status, null, null, null, null, null, null);
455460
}
456461

457462
public Cluster(
458463
String clusterAlias,
459464
String originalIndices,
465+
String concreteIndices,
460466
boolean skipUnavailable,
461467
Cluster.Status status,
462468
Integer totalShards,
@@ -471,6 +477,7 @@ public Cluster(
471477
assert status != null : "status of Cluster cannot be null";
472478
this.clusterAlias = clusterAlias;
473479
this.originalIndices = originalIndices;
480+
this.concreteIndices = concreteIndices;
474481
this.skipUnavailable = skipUnavailable;
475482
this.status = status;
476483
this.totalShards = totalShards;
@@ -484,6 +491,7 @@ public Cluster(
484491
public Cluster(StreamInput in) throws IOException {
485492
this.clusterAlias = in.readString();
486493
this.originalIndices = in.readString();
494+
this.concreteIndices = in.getTransportVersion().supports(CONCRETE_INDICES_VERSION) ? in.readString() : null;
487495
this.status = Cluster.Status.valueOf(in.readString().toUpperCase(Locale.ROOT));
488496
this.totalShards = in.readOptionalInt();
489497
this.successfulShards = in.readOptionalInt();
@@ -498,6 +506,9 @@ public Cluster(StreamInput in) throws IOException {
498506
public void writeTo(StreamOutput out) throws IOException {
499507
out.writeString(clusterAlias);
500508
out.writeString(originalIndices);
509+
if (out.getTransportVersion().supports(CONCRETE_INDICES_VERSION)) {
510+
out.writeString(concreteIndices);
511+
}
501512
out.writeString(status.toString());
502513
out.writeOptionalInt(totalShards);
503514
out.writeOptionalInt(successfulShards);
@@ -508,6 +519,22 @@ public void writeTo(StreamOutput out) throws IOException {
508519
out.writeCollection(failures);
509520
}
510521

522+
public Cluster withConcreteIndices(String concreteIndices) {
523+
return new Cluster(
524+
clusterAlias,
525+
originalIndices,
526+
concreteIndices,
527+
skipUnavailable,
528+
status,
529+
totalShards,
530+
successfulShards,
531+
skippedShards,
532+
failedShards,
533+
failures,
534+
took
535+
);
536+
}
537+
511538
/**
512539
* Since the Cluster object is immutable, use this Builder class to create
513540
* a new Cluster object using the "copyFrom" Cluster passed in and set only
@@ -540,6 +567,7 @@ public Cluster build() {
540567
return new Cluster(
541568
original.getClusterAlias(),
542569
original.getOriginalIndices(),
570+
original.getConcreteIndices(),
543571
original.isSkipUnavailable(),
544572
status != null ? status : original.getStatus(),
545573
totalShards != null ? totalShards : original.getTotalShards(),
@@ -632,11 +660,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
632660
return builder;
633661
}
634662

635-
@Override
636-
public boolean isFragment() {
637-
return ToXContentFragment.super.isFragment();
638-
}
639-
640663
public String getClusterAlias() {
641664
return clusterAlias;
642665
}
@@ -645,6 +668,10 @@ public String getOriginalIndices() {
645668
return originalIndices;
646669
}
647670

671+
public String getConcreteIndices() {
672+
return concreteIndices;
673+
}
674+
648675
public boolean isSkipUnavailable() {
649676
return skipUnavailable;
650677
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.elasticsearch.xpack.esql.stats.SearchStats;
6161

6262
import java.util.ArrayList;
63-
import java.util.LinkedHashSet;
6463
import java.util.List;
6564
import java.util.Set;
6665
import java.util.function.Consumer;
@@ -153,18 +152,6 @@ private static ReducedPlan getPhysicalPlanReduction(int estimatedRowSize, Physic
153152
return new ReducedPlan(EstimatesRowSize.estimateRowSize(estimatedRowSize, plan));
154153
}
155154

156-
/**
157-
* Returns a set of concrete indices after resolving the original indices specified in the FROM command.
158-
*/
159-
public static Set<String> planConcreteIndices(PhysicalPlan plan) {
160-
if (plan == null) {
161-
return Set.of();
162-
}
163-
var indices = new LinkedHashSet<String>();
164-
forEachRelation(plan, relation -> indices.addAll(relation.concreteIndices()));
165-
return indices;
166-
}
167-
168155
public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) {
169156
return plan.anyMatch(e -> {
170157
if (e instanceof FragmentExec f) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,10 @@ public void executePlan(
326326
listener.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + dataNodePlan));
327327
return;
328328
}
329-
Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService()
330-
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new));
329+
330+
// Returns the concrete/resolved indices used in the FROM command of the query.
331+
Map<String, OriginalIndices> clusterToConcreteIndices = getIndices(execInfo, EsqlExecutionInfo.Cluster::getConcreteIndices);
332+
331333
QueryPragmas queryPragmas = configuration.pragmas();
332334
Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);
333335
if (dataNodePlan == null) {
@@ -370,7 +372,8 @@ public void executePlan(
370372
return;
371373
}
372374
}
373-
Map<String, OriginalIndices> clusterToOriginalIndices = getOriginalIndices(execInfo);
375+
// Gets the original indices specified in the FROM command of the query. We need the original query to resolve alias filters.
376+
Map<String, OriginalIndices> clusterToOriginalIndices = getIndices(execInfo, EsqlExecutionInfo.Cluster::getOriginalIndices);
374377
var localOriginalIndices = clusterToOriginalIndices.remove(LOCAL_CLUSTER);
375378
var localConcreteIndices = clusterToConcreteIndices.remove(LOCAL_CLUSTER);
376379
/*
@@ -800,14 +803,14 @@ public String getDescription() {
800803
}
801804
}
802805

803-
/**
804-
* Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters.
805-
*/
806-
private static Map<String, OriginalIndices> getOriginalIndices(EsqlExecutionInfo executionInfo) {
806+
private static Map<String, OriginalIndices> getIndices(
807+
EsqlExecutionInfo executionInfo,
808+
Function<EsqlExecutionInfo.Cluster, String> getter
809+
) {
807810
return Maps.transformValues(
808811
executionInfo.getClusters(),
809812
cluster -> new OriginalIndices(
810-
Strings.commaDelimitedListToStringArray(cluster.getOriginalIndices()),
813+
Strings.commaDelimitedListToStringArray(getter.apply(cluster)),
811814
SearchRequest.DEFAULT_INDICES_OPTIONS
812815
)
813816
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Objects;
4242
import java.util.Set;
4343

44+
import static java.util.stream.Collectors.groupingBy;
4445
import static java.util.stream.Collectors.joining;
4546
import static java.util.stream.Collectors.toSet;
4647

@@ -165,6 +166,15 @@ static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExec
165166
.collect(joining(","));
166167
}
167168

169+
static void updateExecutionInfoWithResolvedConcreteIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
170+
indexResolution.resolvedIndices()
171+
.stream()
172+
.collect(groupingBy(RemoteClusterAware::parseClusterAlias, joining(",")))
173+
.forEach((clusterAlias, indices) -> {
174+
executionInfo.swapCluster(clusterAlias, (k, v) -> v.withConcreteIndices(indices));
175+
});
176+
}
177+
168178
static void updateExecutionInfoWithUnavailableClusters(
169179
EsqlExecutionInfo execInfo,
170180
Map<String, List<FieldCapabilitiesFailure>> failures

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,7 @@ private void preAnalyzeMainIndices(
854854
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
855855
preAnalysis.useDenseVectorWhenNotSupported(),
856856
listener.delegateFailureAndWrap((l, indexResolution) -> {
857+
EsqlCCSUtils.updateExecutionInfoWithResolvedConcreteIndices(executionInfo, indexResolution.inner());
857858
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
858859
l.onResponse(
859860
result.withIndices(indexPattern, indexResolution.inner())

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ EsqlExecutionInfo createExecutionInfo() {
165165
(k, v) -> new EsqlExecutionInfo.Cluster(
166166
"",
167167
"logs-1",
168+
"logs-1",
168169
false,
169170
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
170171
10,
@@ -180,6 +181,7 @@ EsqlExecutionInfo createExecutionInfo() {
180181
(k, v) -> new EsqlExecutionInfo.Cluster(
181182
"remote1",
182183
"remote1:logs-1",
184+
"remote1:logs-1",
183185
true,
184186
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
185187
12,
@@ -564,6 +566,7 @@ private static EsqlExecutionInfo.Cluster parseCluster(String clusterAlias, XCont
564566
return new EsqlExecutionInfo.Cluster(
565567
clusterAlias,
566568
indexExpression,
569+
indexExpression,
567570
true,
568571
EsqlExecutionInfo.Cluster.Status.valueOf(status.toUpperCase(Locale.ROOT)),
569572
totalShardsFinal,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public void testLogPartialFailures() {
6464
(k, v) -> new EsqlExecutionInfo.Cluster(
6565
LOCAL_CLUSTER_ALIAS,
6666
"idx",
67+
"idx",
6768
false,
6869
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
6970
10,
@@ -97,6 +98,7 @@ public void testLogPartialFailuresRemote() {
9798
(k, v) -> new EsqlExecutionInfo.Cluster(
9899
"remote_cluster",
99100
"idx",
101+
"idx",
100102
false,
101103
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
102104
10,

0 commit comments

Comments
 (0)