Skip to content

Commit 935bf0a

Browse files
committed
Cleanups
1 parent cdd05a1 commit 935bf0a

File tree

7 files changed

+48
-11
lines changed

7 files changed

+48
-11
lines changed

docs/changelog/129013.yaml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
pr: 129013
2-
summary: "WIP: Remote lookup join implementation"
2+
summary: "Add remote index support to LOOKUP JOIN"
33
area: ES|QL
4-
type: enhancement
5-
issues: []
4+
type: feature
5+
issues: [ ]
6+
highlight:
7+
title: Add remote index support to LOOKUP JOIN
8+
body: |-
9+
Queries containing LOOKUP JOIN now can be preformed on cross-cluster indices, for example:
10+
```
11+
FROM logs-*, remote:logs-* | LOOKUP JOIN clients on ip | SORT timestamp | LIMIT 100
12+
```

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List<Attri
3737
this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), isRemote);
3838
}
3939

40+
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List<Attribute> joinFields) {
41+
this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), false);
42+
}
43+
4044
public LookupJoin(
4145
Source source,
4246
LogicalPlan left,
@@ -50,6 +54,10 @@ public LookupJoin(
5054
this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields), isRemote);
5155
}
5256

57+
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig) {
58+
this(source, left, right, joinConfig, false);
59+
}
60+
5361
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig, boolean isRemote) {
5462
super(source, left, right, joinConfig, isRemote);
5563
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,9 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
736736
throw new IllegalArgumentException("can't plan [" + join + "]");
737737
}
738738

739+
// After enabling remote joins, we can have one of the two situations here:
740+
// 1. We've just got one entry - this should be the one relevant to the join, and it should be for this cluster
741+
// 2. We have got multiple entries - this means each cluster has its own one, and we should extract one relevant for this cluster
739742
Map.Entry<String, IndexMode> entry;
740743
if (localSourceExec.indexNameWithModes().size() == 1) {
741744
entry = localSourceExec.indexNameWithModes().entrySet().iterator().next();
@@ -756,6 +759,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
756759
throw new IllegalArgumentException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]");
757760
}
758761
String[] indexSplit = RemoteClusterAware.splitIndexName(entry.getKey());
762+
// No prefix is ok, prefix with this cluster is ok, something else is not
759763
if (indexSplit[0] != null && clusterAlias.equals(indexSplit[0]) == false) {
760764
throw new IllegalArgumentException(
761765
"can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias)

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ private void preAnalyzeLookupIndex(
436436
if (executionInfo.getClusters().isEmpty()) {
437437
patternWithRemotes = localPattern;
438438
} else {
439+
// convert index -> cluster1:index,cluster2:index, etc.for each running cluster
439440
patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING)
440441
.map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern))
441442
.collect(Collectors.joining(","));
@@ -462,6 +463,10 @@ private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo execution
462463
}
463464
}
464465

466+
/**
467+
* Receive and process lookup index resolutions from resolveAsMergedMapping.
468+
* This processes the lookup index data for a single index, updates and returns the {@link PreAnalysisResult} result
469+
*/
465470
private PreAnalysisResult receiveLookupIndexResolution(
466471
PreAnalysisResult result,
467472
String index,
@@ -474,9 +479,9 @@ private PreAnalysisResult receiveLookupIndexResolution(
474479
return result.addLookupIndexResolution(index, newIndexResolution);
475480
}
476481
if (executionInfo.getClusters().isEmpty() || executionInfo.isCrossClusterSearch() == false) {
477-
// Local only case, still do some checks
482+
// Local only case, still do some checks, since we moved analysis checks here
478483
if (newIndexResolution.get().indexNameWithModes().size() > 1) {
479-
throw new VerificationException("multiple resolutions for lookup index [" + index + "] in local cluster");
484+
throw new VerificationException("multiple resolutions for lookup index [" + index + "]");
480485
}
481486
var indexMode = newIndexResolution.get().indexNameWithModes().entrySet().iterator().next().getValue();
482487
if (indexMode != IndexMode.LOOKUP) {
@@ -495,7 +500,12 @@ private PreAnalysisResult receiveLookupIndexResolution(
495500
skipClusterOrError(
496501
clusterAlias,
497502
executionInfo,
498-
"invalid [" + indexName + "] resolution in lookup mode to an index in [" + indexMode + "] mode"
503+
"invalid ["
504+
+ indexName
505+
+ "] resolution in lookup mode to an index in ["
506+
+ indexMode
507+
+ "] mode "
508+
+ EsqlCCSUtils.inClusterName(clusterAlias)
499509
);
500510
}
501511
// Each cluster should have only one resolution for the lookup index
@@ -526,6 +536,7 @@ private PreAnalysisResult receiveLookupIndexResolution(
526536
});
527537

528538
// If all indices resolve to the same name, we can use that for BWC
539+
// Older clusters only can handle one name in LOOKUP JOIN
529540
var indexNames = clustersWithResolvedIndices.values()
530541
.stream()
531542
.map(n -> RemoteClusterAware.splitIndexName(n)[1])
@@ -548,6 +559,12 @@ private PreAnalysisResult receiveLookupIndexResolution(
548559

549560
}
550561

562+
/**
563+
* Older clusters can only handle one name in LOCAL JOIN - verify that all the remotes involved
564+
* are recent enough to be able to handle multiple indices.
565+
* This is only checked if there are actually multiple indices, which happens when remotes have a different
566+
* concrete indices aliased to the same index name.
567+
*/
551568
private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
552569
Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
553570
clusters.forEach(cluster -> {
@@ -559,9 +576,11 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
559576
skipClusterOrError(
560577
clusterAlias,
561578
executionInfo,
562-
"remote cluster version ["
579+
"remote cluster ["
580+
+ clusterAlias
581+
+ "] has version ["
563582
+ connection.getTransportVersion()
564-
+ "] does not support multiple indices in LOOKUP JOIN, skipping"
583+
+ "] that does not support multiple indices in LOOKUP JOIN, skipping"
565584
);
566585
}
567586
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,7 @@ public void testPushdownLimitsPastLeftJoin() {
12441244
var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), List.of());
12451245
var join = switch (randomIntBetween(0, 2)) {
12461246
case 0 -> new Join(EMPTY, leftChild, rightChild, joinConfig);
1247-
case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig, false);
1247+
case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig);
12481248
case 2 -> new InlineJoin(EMPTY, leftChild, rightChild, joinConfig);
12491249
default -> throw new IllegalArgumentException();
12501250
};

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2898,7 +2898,6 @@ public void testVerifierOnMissingReferences() {
28982898
assertThat(e.getMessage(), containsString(" > 10[INTEGER]]] optimized incorrectly due to missing references [emp_no{f}#"));
28992899
}
29002900

2901-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/118531")
29022901
public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception {
29032902
assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
29042903

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private static LogicalPlan createInstance(Class<? extends LogicalPlan> clazz, Lo
164164
return new Sample(source, null, child);
165165
}
166166
case "LookupJoin" -> {
167-
return new LookupJoin(source, child, child, List.of(), false);
167+
return new LookupJoin(source, child, child, List.of());
168168
}
169169
case "Limit" -> {
170170
return new Limit(source, null, child);

0 commit comments

Comments
 (0)