Skip to content

Commit 201813b

Browse files
committed
Review feedback
1 parent 4440b1b commit 201813b

File tree

7 files changed

+35
-20
lines changed

7 files changed

+35
-20
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,9 @@ static TransportVersion def(int id) {
326326
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
327327
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
328328
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
329-
public static final TransportVersion LOOKUP_JOIN_MANY_INDICES = def(9_113_0_00);
330329

331330
public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_113_0_00);
331+
public static final TransportVersion LOOKUP_JOIN_CCS = def(9_114_0_00);
332332
/*
333333
* STOP! READ THIS FIRST! No, really,
334334
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/PipelineBreaker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,11 @@
77

88
package org.elasticsearch.xpack.esql.plan.logical;
99

10+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
11+
12+
/**
13+
* A {@link LogicalPlan} that cannot be run only on the data nodes, resp. requires to be at least partially run on the coordinator.
14+
* When mapping to a physical plan, the first pipeline breaker will give rise to a {@link FragmentExec}
15+
* that contains the {@link LogicalPlan} that data nodes will execute.
16+
*/
1017
public interface PipelineBreaker {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.xpack.esql.plan.QueryPlan;
4040
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
4141
import org.elasticsearch.xpack.esql.plan.logical.Filter;
42+
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
4243
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4344
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
4445
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -49,7 +50,6 @@
4950
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
5051
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
5152
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
52-
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
5353
import org.elasticsearch.xpack.esql.session.Configuration;
5454
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
5555
import org.elasticsearch.xpack.esql.stats.SearchStats;
@@ -117,7 +117,7 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
117117
}
118118
final FragmentExec fragment = (FragmentExec) fragments.getFirst();
119119

120-
final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
120+
final var pipelineBreakers = fragment.fragment().collectFirstChildren(p -> p instanceof PipelineBreaker);
121121
if (pipelineBreakers.isEmpty()) {
122122
return null;
123123
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
137137
return MapperUtils.mapUnary(unary, mappedChild);
138138
}
139139
// in case of a fragment, push to it any current streaming operator
140-
if (isPipelineBreaker(unary) == false) {
140+
if (unary instanceof PipelineBreaker == false) {
141141
return new FragmentExec(unary);
142142
}
143143
}
@@ -253,10 +253,6 @@ private PhysicalPlan mapFork(Fork fork) {
253253
return new MergeExec(fork.source(), fork.children().stream().map(child -> map(child)).toList(), fork.output());
254254
}
255255

256-
public static boolean isPipelineBreaker(LogicalPlan p) {
257-
return p instanceof PipelineBreaker;
258-
}
259-
260256
private PhysicalPlan addExchangeForFragment(LogicalPlan logical, PhysicalPlan child) {
261257
// in case of fragment, preserve the streaming operator (order-by, limit or topN) for local replanning
262258
// no need to do it for an aggregate since it gets split

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -481,12 +481,20 @@ private PreAnalysisResult receiveLookupIndexResolution(
481481
if (executionInfo.getClusters().isEmpty() || executionInfo.isCrossClusterSearch() == false) {
482482
// Local only case, still do some checks, since we moved analysis checks here
483483
if (newIndexResolution.get().indexNameWithModes().size() > 1) {
484-
throw new VerificationException("multiple resolutions for lookup index [" + index + "]");
484+
throw new VerificationException(
485+
"Lookup Join requires a single lookup mode index; [" + index + "] resolves to multiple indices"
486+
);
485487
}
486-
var indexMode = newIndexResolution.get().indexNameWithModes().entrySet().iterator().next().getValue();
487-
if (indexMode != IndexMode.LOOKUP) {
488+
var indexModeEntry = newIndexResolution.get().indexNameWithModes().entrySet().iterator().next();
489+
if (indexModeEntry.getValue() != IndexMode.LOOKUP) {
488490
throw new VerificationException(
489-
"invalid [" + index + "] resolution in lookup mode to an index in [" + indexMode + "] mode"
491+
"Lookup Join requires a single lookup mode index; ["
492+
+ index
493+
+ "] resolves to ["
494+
+ indexModeEntry.getKey()
495+
+ "] in ["
496+
+ indexModeEntry.getValue()
497+
+ "] mode"
490498
);
491499
}
492500
return result.addLookupIndexResolution(index, newIndexResolution);
@@ -500,20 +508,24 @@ private PreAnalysisResult receiveLookupIndexResolution(
500508
skipClusterOrError(
501509
clusterAlias,
502510
executionInfo,
503-
"invalid ["
511+
"Lookup Join requires a single lookup mode index; ["
512+
+ index
513+
+ "] resolves to ["
504514
+ indexName
505-
+ "] resolution in lookup mode to an index in ["
515+
+ "] in ["
506516
+ indexMode
507-
+ "] mode "
508-
+ EsqlCCSUtils.inClusterName(clusterAlias)
517+
+ "] mode"
509518
);
510519
}
511520
// Each cluster should have only one resolution for the lookup index
512521
if (clustersWithResolvedIndices.containsKey(clusterAlias)) {
513522
skipClusterOrError(
514523
clusterAlias,
515524
executionInfo,
516-
"multiple resolutions for lookup index [" + index + "] " + EsqlCCSUtils.inClusterName(clusterAlias)
525+
"Lookup Join requires a single lookup mode index; ["
526+
+ index
527+
+ "] resolves to multiple indices "
528+
+ EsqlCCSUtils.inClusterName(clusterAlias)
517529
);
518530
} else {
519531
clustersWithResolvedIndices.put(clusterAlias, indexName);
@@ -572,7 +584,7 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
572584
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
573585
// No need to check local, obviously
574586
var connection = remoteClusterService.getConnection(clusterAlias);
575-
if (connection != null && connection.getTransportVersion().before(TransportVersions.LOOKUP_JOIN_MANY_INDICES)) {
587+
if (connection != null && connection.getTransportVersion().before(TransportVersions.LOOKUP_JOIN_CCS)) {
576588
skipClusterOrError(
577589
clusterAlias,
578590
executionInfo,

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ fails with non-lookup index:
160160
catch: "bad_request"
161161

162162
- match: { error.type: "verification_exception" }
163-
- contains: { error.reason: "invalid [test] resolution in lookup mode to an index in [standard] mode" }
163+
- contains: { error.reason: "Lookup Join requires a single lookup mode index; [test] resolves to [test] in [standard] mode" }
164164

165165
---
166166
pattern-multiple:

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ fails when alias or pattern resolves to multiple:
201201
catch: "bad_request"
202202

203203
- match: { error.type: "verification_exception" }
204-
- contains: { error.reason: "multiple resolutions for lookup index [test-lookup-alias-pattern-multiple]" }
204+
- contains: { error.reason: "Lookup Join requires a single lookup mode index; [test-lookup-alias-pattern-multiple] resolves to multiple indices" }
205205

206206
---
207207
alias-pattern-single:

0 commit comments

Comments
 (0)