Skip to content

Commit 59d98b2

Browse files
committed
more fixes
1 parent d99bd56 commit 59d98b2

File tree

8 files changed

+54
-40
lines changed

8 files changed

+54
-40
lines changed

.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,20 @@ public void testLookupJoinAcrossClusters() throws IOException {
7070
assertCCSExecutionInfoDetails(executionInfo);
7171
}
7272

73-
// populateLookupIndex(LOCAL_CLUSTER, "values_lookup2", 5);
74-
// populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup2", 5);
73+
populateLookupIndex(LOCAL_CLUSTER, "values_lookup2", 5);
74+
populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup2", 5);
7575
// FIXME: this currently does not work
76-
// try (
77-
// EsqlQueryResponse resp = runQuery(
78-
// "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key | LOOKUP JOIN values_lookup2 ON
79-
// lookup_tag",
80-
// randomBoolean()
81-
// )
82-
// ) {
83-
// List<List<Object>> values = getValuesList(resp);
84-
// assertThat(values, hasSize(20));
85-
//
86-
// }
76+
try (
77+
EsqlQueryResponse resp = runQuery(
78+
"FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key "
79+
+ "| LOOKUP JOIN values_lookup2 ON lookup_tag",
80+
randomBoolean()
81+
)
82+
) {
83+
List<List<Object>> values = getValuesList(resp);
84+
assertThat(values, hasSize(20));
85+
86+
}
8787
}
8888

8989
public void testLookupJoinMissingRemoteIndex() throws IOException {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S
8383

8484
private final JoinConfig config;
8585
private List<Attribute> lazyOutput;
86+
private transient boolean isRemote = false;
8687

8788
public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {
8889
super(source, left, right);
@@ -257,7 +258,10 @@ public boolean equals(Object obj) {
257258
}
258259

259260
Join other = (Join) obj;
260-
return config.equals(other.config) && Objects.equals(left(), other.left()) && Objects.equals(right(), other.right());
261+
return config.equals(other.config)
262+
&& Objects.equals(left(), other.left())
263+
&& Objects.equals(right(), other.right())
264+
&& isRemote == other.isRemote;
261265
}
262266

263267
@Override
@@ -295,4 +299,13 @@ private static boolean comparableTypes(Attribute left, Attribute right) {
295299
}
296300
return leftType.noText() == rightType.noText();
297301
}
302+
303+
public boolean isRemote() {
304+
return isRemote;
305+
}
306+
307+
public Join setRemote(boolean remote) {
308+
isRemote = remote;
309+
return this;
310+
}
298311
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
*/
3333
public class LookupJoin extends Join implements SurrogateLogicalPlan, PostAnalysisVerificationAware, TelemetryAware {
3434

35-
private boolean isRemote = false;
36-
3735
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List<Attribute> joinFields) {
3836
this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList());
3937
}
@@ -60,7 +58,7 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig
6058
@Override
6159
public LogicalPlan surrogate() {
6260
// TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now
63-
return new Join(source(), left(), right(), config());
61+
return new Join(source(), left(), right(), config()).setRemote(isRemote());
6462
}
6563

6664
@Override
@@ -90,7 +88,7 @@ public String telemetryLabel() {
9088
@Override
9189
public void postAnalysisVerification(Failures failures) {
9290
super.postAnalysisVerification(failures);
93-
if (isRemote) {
91+
if (isRemote()) {
9492
checkRemoteJoin(failures);
9593
}
9694
}
@@ -120,12 +118,4 @@ private void checkRemoteJoin(Failures failures) {
120118
}
121119
}
122120

123-
public boolean isRemote() {
124-
return isRemote;
125-
}
126-
127-
public LookupJoin setRemote(boolean remote) {
128-
isRemote = remote;
129-
return this;
130-
}
131121
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
201201
return MapperUtils.mapUnary(unary, mappedChild);
202202
}
203203

204-
private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child) {
204+
private PhysicalPlan mapToFragmentExec(Join logical, PhysicalPlan child) {
205205
Holder<Boolean> hasFragment = new Holder<>(false);
206206
Holder<Boolean> forceLocal = new Holder<>(false);
207207

@@ -244,10 +244,17 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child)
244244
if (f instanceof LookupJoinExec lj) {
245245
return lj.right();
246246
}
247+
if (f instanceof MergeExec) {
248+
forceLocal.set(true);
249+
return f;
250+
}
247251
return f;
248252
});
249253

250254
if (forceLocal.get()) {
255+
if (logical.isRemote()) {
256+
throw new EsqlIllegalArgumentException("Remote joins are not supported in this context");
257+
}
251258
return null;
252259
}
253260

@@ -279,7 +286,7 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
279286
}
280287

281288
if (FRAGMENT_EXEC_HACK_ENABLED) {
282-
var leftPlan = mapToFragmentExec(bp, left);
289+
var leftPlan = mapToFragmentExec(join, left);
283290
if (leftPlan != null) {
284291
return leftPlan;
285292
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,12 +441,23 @@ private PreAnalysisResult receiveLookupIndexResolution(
441441
IndexResolution newIndexResolution
442442
) {
443443
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, newIndexResolution.unavailableClusters());
444-
if (newIndexResolution.isValid() == false
445-
|| executionInfo.getClusters().isEmpty()
446-
|| executionInfo.isCrossClusterSearch() == false) {
444+
if (newIndexResolution.isValid() == false) {
447445
// If the index resolution is invalid, or we're not dealing with CCS, don't bother with the rest of the analysis
448446
return result.addLookupIndexResolution(index, newIndexResolution);
449447
}
448+
if (executionInfo.getClusters().isEmpty() || executionInfo.isCrossClusterSearch() == false) {
449+
// Local only case, still do some checks
450+
if (newIndexResolution.get().indexNameWithModes().size() > 1) {
451+
throw new VerificationException("multiple resolutions for lookup index [" + index + "] in local cluster");
452+
}
453+
var indexMode = newIndexResolution.get().indexNameWithModes().entrySet().iterator().next().getValue();
454+
if (indexMode != IndexMode.LOOKUP) {
455+
throw new VerificationException(
456+
"invalid [" + index + "] resolution in lookup mode to an index in [" + indexMode + "] mode"
457+
);
458+
}
459+
return result.addLookupIndexResolution(index, newIndexResolution);
460+
}
450461
// Collect resolved clusters from the index resolution, verify that each cluster has a single resolution for the lookup index
451462
Map<String, String> clustersWithResolvedIndices = new HashMap<>(newIndexResolution.resolvedIndices().size());
452463
newIndexResolution.get().indexNameWithModes().forEach((indexName, indexMode) -> {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2898,6 +2898,7 @@ public void testVerifierOnMissingReferences() {
28982898
assertThat(e.getMessage(), containsString(" > 10[INTEGER]]] optimized incorrectly due to missing references [emp_no{f}#"));
28992899
}
29002900

2901+
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/118531")
29012902
public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception {
29022903
assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
29032904

@@ -7784,7 +7785,7 @@ public void testLookupJoinFieldLoadingDropAllFields() throws Exception {
77847785
}
77857786

77867787
private void assertLookupJoinFieldNames(String query, TestDataSource data, List<Set<String>> expectedFieldNames) {
7787-
assertLookupJoinFieldNames(query, data, expectedFieldNames, false);
7788+
assertLookupJoinFieldNames(query, data, expectedFieldNames, true);
77887789
}
77897790

77907791
private void assertLookupJoinFieldNames(

0 commit comments

Comments
 (0)