Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;

import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V4;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V5;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.ASYNC;

public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase {
Expand Down Expand Up @@ -96,7 +96,7 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
return hasCapabilities(List.of(JOIN_LOOKUP_V4.capabilityName()));
return hasCapabilities(List.of(JOIN_LOOKUP_V5.capabilityName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V4;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V5;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
Expand Down Expand Up @@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V4.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V5.capabilityName()));
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down Expand Up @@ -283,8 +283,8 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
// CCS does not yet support JOIN_LOOKUP_V4 and clusters falsely report they have this capability
// return hasCapabilities(List.of(JOIN_LOOKUP_V4.capabilityName()));
// CCS does not yet support JOIN_LOOKUP_V5 and clusters falsely report they have this capability
// return hasCapabilities(List.of(JOIN_LOOKUP_V5.capabilityName()));
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
basicOnTheDataNode
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
Expand All @@ -22,7 +22,7 @@ emp_no:integer | language_code:integer | language_name:keyword
;

basicRow
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW language_code = 1
| LOOKUP JOIN languages_lookup ON language_code
Expand All @@ -33,7 +33,7 @@ language_code:integer | language_name:keyword
;

basicOnTheCoordinator
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| SORT emp_no
Expand All @@ -50,7 +50,7 @@ emp_no:integer | language_code:integer | language_name:keyword
;

subsequentEvalOnTheDataNode
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
Expand All @@ -68,7 +68,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
;

subsequentEvalOnTheCoordinator
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| SORT emp_no
Expand All @@ -85,8 +85,25 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
10003 | 4 | german | 8
;

sortEvalBeforeLookup
required_capability: join_lookup_v5

FROM employees
| SORT emp_no
| EVAL language_code = (emp_no % 10) + 1
| LOOKUP JOIN languages_lookup ON language_code
| KEEP emp_no, language_code, language_name
| LIMIT 3
;

emp_no:integer | language_code:integer | language_name:keyword
10001 | 2 | French
10002 | 3 | Spanish
10003 | 4 | German
;

lookupIPFromRow
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", right = "right"
| LOOKUP JOIN clientips_lookup ON client_ip
Expand All @@ -97,7 +114,7 @@ left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowing
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| LOOKUP JOIN clientips_lookup ON client_ip
Expand All @@ -108,7 +125,7 @@ left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowingKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| EVAL client_ip = client_ip::keyword
Expand All @@ -121,7 +138,7 @@ left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowingKeepReordered
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| EVAL client_ip = client_ip::keyword
Expand All @@ -134,7 +151,7 @@ right | Development | 172.21.0.5
;

lookupIPFromIndex
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -153,7 +170,7 @@ ignoreOrder:true
;

lookupIPFromIndexKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -173,7 +190,7 @@ ignoreOrder:true
;

lookupIPFromIndexStats
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -189,7 +206,7 @@ count:long | env:keyword
;

lookupIPFromIndexStatsKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -206,7 +223,7 @@ count:long | env:keyword
;

lookupMessageFromRow
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", message = "Connected to 10.1.0.1", right = "right"
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -217,7 +234,7 @@ left | Connected to 10.1.0.1 | right | Success
;

lookupMessageFromRowWithShadowing
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right"
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -228,7 +245,7 @@ left | Connected to 10.1.0.1 | right | Success
;

lookupMessageFromRowWithShadowingKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right"
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -240,7 +257,7 @@ left | Connected to 10.1.0.1 | right | Success
;

lookupMessageFromIndex
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -258,7 +275,7 @@ ignoreOrder:true
;

lookupMessageFromIndexKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -277,7 +294,7 @@ ignoreOrder:true
;

lookupMessageFromIndexKeepReordered
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -296,7 +313,7 @@ Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
;

lookupMessageFromIndexStats
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -311,7 +328,7 @@ count:long | type:keyword
;

lookupMessageFromIndexStatsKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public enum Cap {
/**
* LOOKUP JOIN
*/
JOIN_LOOKUP_V4(Build.current().isSnapshot()),
JOIN_LOOKUP_V5(Build.current().isSnapshot()),

/**
* Fix for https://github.com/elastic/elasticsearch/issues/117054
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;

public final class PushDownAndCombineLimits extends OptimizerRules.OptimizerRule<Limit> {

Expand Down Expand Up @@ -63,8 +62,10 @@ public LogicalPlan rule(Limit limit) {
}
}
} else if (limit.child() instanceof Join join) {
if (join.config().type() == JoinTypes.LEFT && join.right() instanceof LocalRelation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why the previous restriction to LocalRelation (even if that's what was expected before). Anyways, should be good this way now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That came from when we only had LOOKUP (aka 🐔 ) and INLINESTATS to consider. Both of these ended up with a LocalRelation on the right, and we didn't want to implement this in all generality because the semantics were not 100% clear.

// This is a hash join from something like a lookup.
if (join.config().type() == JoinTypes.LEFT) {
// NOTE! This is only correct because our LEFT JOINs preserve the number of rows from the left hand side.
// This deviates from SQL semantics. In SQL, multiple matches on the right hand side lead to multiple rows in the output.
// For us, multiple matches on the right hand side are collected into multi-values.
return join.replaceChildren(limit.replaceChild(join.left()), join.right());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public final void test() throws Throwable {
);
assumeFalse(
"lookup join disabled for csv tests",
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V4.capabilityName())
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V5.capabilityName())
);
assumeFalse(
"can't use TERM function in csv tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,7 @@ public void testLookupMatchTypeWrong() {
}

public void testLookupJoinUnknownIndex() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled());

String errorMessage = "Unknown index [foobar]";
IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage);
Expand Down Expand Up @@ -2168,7 +2168,7 @@ public void testLookupJoinUnknownIndex() {
}

public void testLookupJoinUnknownField() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled());

String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name";
String errorMessage = "1:45: Unknown column [last_name] in right side of join";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1964,7 +1964,7 @@ public void testSortByAggregate() {
}

public void testLookupJoinDataTypeMismatch() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled());

query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Or;
import org.elasticsearch.xpack.esql.core.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.util.Holder;
Expand Down Expand Up @@ -112,7 +113,9 @@
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
Expand All @@ -138,6 +141,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TWO;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptySource;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.fieldAttribute;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getFieldAttribute;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.localSource;
Expand Down Expand Up @@ -1291,6 +1295,26 @@ public void testCombineLimits() {
);
}

public void testPushdownLimitsPastLeftJoin() {
var leftChild = emptySource();
var rightChild = new LocalRelation(Source.EMPTY, List.of(fieldAttribute()), LocalSupplier.EMPTY);
assertNotEquals(leftChild, rightChild);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotta be very careful here, you never know.


var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), List.of());
var join = switch (randomIntBetween(0, 2)) {
case 0 -> new Join(EMPTY, leftChild, rightChild, joinConfig);
case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig);
case 2 -> new InlineJoin(EMPTY, leftChild, rightChild, joinConfig);
default -> throw new IllegalArgumentException();
};

var limit = new Limit(EMPTY, L(10), join);

var optimizedPlan = new PushDownAndCombineLimits().rule(limit);

assertEquals(join.replaceChildren(limit.replaceChild(join.left()), join.right()), optimizedPlan);
}

public void testMultipleCombineLimits() {
var numberOfLimits = randomIntBetween(3, 10);
var minimum = randomIntBetween(10, 99);
Expand Down