Skip to content

Commit a1c97d2

Browse files
committed
more fixes
1 parent 8aa7e8e commit a1c97d2

File tree

7 files changed

+65
-48
lines changed

7 files changed

+65
-48
lines changed

.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
303303
onlyRemotes = true;
304304
}
305305
final String remoteIndices;
306-
if (canUseRemoteIndicesOnly() && randomBoolean()) {
306+
if (onlyRemotes) {
307307
remoteIndices = Arrays.stream(localIndices)
308308
.map(index -> unquoteAndRequoteAsRemote(index.trim(), true))
309309
.collect(Collectors.joining(","));
@@ -319,7 +319,7 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
319319
String[] parts = commands[0].split("\\s+");
320320
assert parts.length >= 2 : commands[0];
321321
String[] indices = parts[1].split(",");
322-
if (canUseRemoteIndicesOnly() && randomBoolean()) {
322+
if (onlyRemotes) {
323323
parts[1] = Arrays.stream(indices)
324324
.map(index -> unquoteAndRequoteAsRemote(index.trim(), true))
325325
.collect(Collectors.joining(","));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.plan.logical.join;
99

10-
import org.elasticsearch.index.IndexMode;
1110
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
1211
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
1312
import org.elasticsearch.xpack.esql.common.Failures;
@@ -16,7 +15,6 @@
1615
import org.elasticsearch.xpack.esql.core.tree.Source;
1716
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1817
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
19-
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2018
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2119
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
2220
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
@@ -92,42 +90,15 @@ public String telemetryLabel() {
9290
public void postAnalysisVerification(Failures failures) {
9391
super.postAnalysisVerification(failures);
9492
if (isRemote) {
95-
checkRemoteJoin(this, failures);
93+
checkRemoteJoin(failures);
9694
}
97-
// TODO: this is probably not necessary anymore as we check it in analysis stage?
98-
// right().forEachDown(EsRelation.class, esr -> {
99-
// var indexNameWithModes = esr.indexNameWithModes();
100-
// if (indexNameWithModes.size() != 1) {
101-
// failures.add(
102-
// fail(
103-
// esr,
104-
// "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] indices",
105-
// esr.indexPattern(),
106-
// indexNameWithModes.size()
107-
// )
108-
// );
109-
// return;
110-
// }
111-
// var indexAndMode = indexNameWithModes.entrySet().iterator().next();
112-
// if (indexAndMode.getValue() != IndexMode.LOOKUP) {
113-
// failures.add(
114-
// fail(
115-
// esr,
116-
// "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] in [{}] mode",
117-
// esr.indexPattern(),
118-
// indexAndMode.getKey(),
119-
// indexAndMode.getValue()
120-
// )
121-
// );
122-
// }
123-
// });
12495
}
12596

126-
private static void checkRemoteJoin(LogicalPlan plan, Failures failures) {
97+
private void checkRemoteJoin(Failures failures) {
12798
boolean[] agg = { false };
12899
boolean[] enrichCoord = { false };
129100

130-
plan.forEachUp(UnaryPlan.class, u -> {
101+
this.forEachUp(UnaryPlan.class, u -> {
131102
if (u instanceof Aggregate) {
132103
agg[0] = true;
133104
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -723,23 +723,32 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
723723
throw new IllegalArgumentException("can't plan [" + join + "]");
724724
}
725725

726-
var maybeEntry = localSourceExec.indexNameWithModes()
727-
.entrySet()
728-
.stream()
729-
.filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias))
730-
.findFirst();
731-
732-
if (maybeEntry.isEmpty()) {
733-
throw new IllegalArgumentException(
734-
"can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias)
726+
Map.Entry<String, IndexMode> entry;
727+
if (localSourceExec.indexNameWithModes().size() == 1) {
728+
entry = localSourceExec.indexNameWithModes().entrySet().iterator().next();
729+
} else {
730+
var maybeEntry = localSourceExec.indexNameWithModes()
731+
.entrySet()
732+
.stream()
733+
.filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias))
734+
.findFirst();
735+
entry = maybeEntry.orElseThrow(
736+
() -> new IllegalArgumentException(
737+
"can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias)
738+
)
735739
);
736740
}
737-
var entry = maybeEntry.get();
738741

739742
if (entry.getValue() != IndexMode.LOOKUP) {
740743
throw new IllegalArgumentException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]");
741744
}
742-
String indexName = RemoteClusterAware.splitIndexName(entry.getKey())[1];
745+
String[] indexSplit = RemoteClusterAware.splitIndexName(entry.getKey());
746+
if (indexSplit[0] != null && clusterAlias.equals(indexSplit[0]) == false) {
747+
throw new IllegalArgumentException(
748+
"can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias)
749+
);
750+
}
751+
String indexName = indexSplit[1];
743752
if (join.leftFields().size() != join.rightFields().size()) {
744753
throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count");
745754
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,13 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child)
207207
// Once we reached FragmentExec, we stuff our Enrich under it
208208
if (f instanceof FragmentExec) {
209209
hasFragment.set(true);
210-
return new FragmentExec(logical);
210+
// FIXME: hack to remove duplicate limits. This is probably not the right way to do it.
211+
return new FragmentExec(logical.transformUp(Limit.class, l -> {
212+
if (l.duplicated()) {
213+
return l.child();
214+
}
215+
return l;
216+
}));
211217
}
212218
if (f instanceof EnrichExec enrichExec) {
213219
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
@@ -221,6 +227,9 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child)
221227
return unaryExec.child();
222228
}
223229
}
230+
if (f instanceof LookupJoinExec lj) {
231+
return lj.right();
232+
}
224233
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
225234
return f;
226235
});

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,10 @@ private PreAnalysisResult receiveLookupIndexResolution(
440440
IndexResolution newIndexResolution
441441
) {
442442
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, newIndexResolution.unavailableClusters());
443-
if (newIndexResolution.isValid() == false || executionInfo.getClusters().isEmpty()) {
444-
// If the index resolution is invalid, don't bother with the rest of the analysis
443+
if (newIndexResolution.isValid() == false
444+
|| executionInfo.getClusters().isEmpty()
445+
|| executionInfo.isCrossClusterSearch() == false) {
446+
// If the index resolution is invalid, or we're not dealing with CCS, don't bother with the rest of the analysis
445447
return result.addLookupIndexResolution(index, newIndexResolution);
446448
}
447449
// Collect resolved clusters from the index resolution, verify that each cluster has a single resolution for the lookup index
@@ -483,6 +485,24 @@ private PreAnalysisResult receiveLookupIndexResolution(
483485
}
484486
});
485487

488+
// If all indices resolve to the same name, we can use that for BWC
489+
var indexNames = clustersWithResolvedIndices.values()
490+
.stream()
491+
.map(n -> RemoteClusterAware.splitIndexName(n)[1])
492+
.collect(Collectors.toSet());
493+
if (indexNames.size() == 1) {
494+
String indexName = indexNames.iterator().next();
495+
var newIndex = new EsIndex(index, newIndexResolution.get().mapping(), Map.of(indexName, IndexMode.LOOKUP));
496+
newIndexResolution = IndexResolution.valid(
497+
newIndex,
498+
newIndex.concreteIndices(),
499+
newIndexResolution.getUnavailableShards(),
500+
newIndexResolution.unavailableClusters()
501+
);
502+
} else {
503+
// TODO: validate remotes to be able to handle multiple indices in LOOKUP JOIN
504+
}
505+
486506
return result.addLookupIndexResolution(index, newIndexResolution);
487507

488508
}

0 commit comments

Comments
 (0)