Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
db5f8c5
WIP POC for expression join
julian-elastic Aug 21, 2025
d253a55
Change Parser to allow AND of expressions
julian-elastic Sep 3, 2025
b8f9ba6
Add more UTs and bugfixes
julian-elastic Sep 4, 2025
5cdb9c1
Address code review comments and bugfix
julian-elastic Sep 5, 2025
8703c28
Merge branch 'main' into expressionJoin_v4
julian-elastic Sep 5, 2025
0e02b5c
Fix compile error
julian-elastic Sep 5, 2025
f50d894
Address more code review comments
julian-elastic Sep 8, 2025
0ebaf9c
Address more code review comments
julian-elastic Sep 8, 2025
26358d6
Address code review comments part 3
julian-elastic Sep 9, 2025
188a870
Enhance UT coverage
julian-elastic Sep 9, 2025
537cdbf
Fix UTs
julian-elastic Sep 10, 2025
4716191
Grammar change
julian-elastic Sep 10, 2025
fb85f31
Address more feedback, refactoring
julian-elastic Sep 10, 2025
ea6f6b9
Remove matchFields from JoinConfig, HashJoinExec
alex-spies Sep 11, 2025
bc1a6ec
Revert JoinConfig back to Record
alex-spies Sep 11, 2025
31e8682
Stop using deprecated c'tor in test
alex-spies Sep 11, 2025
221d2d4
Fix some more tests
alex-spies Sep 11, 2025
f0ff7ce
Fix issue with matchFields refactor
julian-elastic Sep 11, 2025
8b75924
Merge branch 'main' into expressionJoin_v4
julian-elastic Sep 11, 2025
7354a2d
Address more code review comments
julian-elastic Sep 11, 2025
ed34ca4
Serialization changes for JoinConfig, comments
julian-elastic Sep 11, 2025
2046178
UT fixes
julian-elastic Sep 12, 2025
bef7514
UT fixes
julian-elastic Sep 12, 2025
64d105c
Change grammar, enforce capabilities
julian-elastic Sep 12, 2025
0e2fc54
Add test cases to HeapAttackIT.java
julian-elastic Sep 12, 2025
fe8c080
Parameterize LookupFromIndexOperatorTests
julian-elastic Sep 12, 2025
d849ff9
Add security tests
julian-elastic Sep 15, 2025
daa69e7
Bugfix
julian-elastic Sep 15, 2025
0de2092
Merge branch 'main' into expressionJoin_v4
julian-elastic Sep 16, 2025
d8076d2
Bugfix
julian-elastic Sep 16, 2025
b10c360
Address last code review comments
julian-elastic Sep 16, 2025
adb9290
Fix non-snapshot UT errors
julian-elastic Sep 16, 2025
99064c0
Merge branch 'main' into expressionJoin_v4
julian-elastic Sep 17, 2025
c3c27ff
Update docs/changelog/134098.yaml
julian-elastic Sep 17, 2025
142fcd9
Merge remote-tracking branch 'origin/expressionJoin_v4' into expressi…
julian-elastic Sep 17, 2025
9c4bf1f
More fixes for non-snapshot UT errors
julian-elastic Sep 17, 2025
bc48e62
Merge branch 'main' into expressionJoin_v4
julian-elastic Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134098.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134098
summary: LOOKUP JOIN with expressions
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
| field from the left index | field from the lookup index |
| --- | --- |
| boolean | boolean |
| byte | half_float, float, double, scaled_float, byte, short, integer, long |
| byte | byte, short, integer, long, half_float, float, double, scaled_float |
| date | date |
| date_nanos | date_nanos |
| double | half_float, float, double, scaled_float, byte, short, integer, long |
| float | half_float, float, double, scaled_float, byte, short, integer, long |
| half_float | half_float, float, double, scaled_float, byte, short, integer, long |
| integer | half_float, float, double, scaled_float, byte, short, integer, long |
| integer | byte, short, integer, long, half_float, float, double, scaled_float |
| ip | ip |
| keyword | keyword |
| long | half_float, float, double, scaled_float, byte, short, integer, long |
| long | byte, short, integer, long, half_float, float, double, scaled_float |
| scaled_float | half_float, float, double, scaled_float, byte, short, integer, long |
| short | half_float, float, double, scaled_float, byte, short, integer, long |
| short | byte, short, integer, long, half_float, float, double, scaled_float |
| text | keyword |

Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ static TransportVersion def(int id) {
public static final TransportVersion TIMESERIES_DEFAULT_LIMIT = def(9_160_0_00);
public static final TransportVersion INFERENCE_API_OPENAI_HEADERS = def(9_161_0_00);
public static final TransportVersion NEW_SEMANTIC_QUERY_INTERCEPTORS = def(9_162_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_163_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ private Response query(String query, String filterPath) throws IOException {
.setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(6).millis())).build())
.setWarningsHandler(WarningsHandler.PERMISSIVE)
);
logger.info("Running query:" + query);
return runQuery(() -> client().performRequest(request));
}

Expand Down Expand Up @@ -735,28 +736,48 @@ private Map<String, Object> fetchMvLongs() throws IOException {
public void testLookupExplosion() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, false);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFields() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 990;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, false);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionExpression() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, true);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFieldsExpression() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 399;// only join on 399 columns due to max expression size of 400
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, true);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyMatchesManyFields() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries, false));
}

public void testLookupExplosionManyMatches() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, false));
}

public void testLookupExplosionManyMatchesExpression() throws IOException {
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, true));
}

public void testLookupExplosionManyMatchesFiltered() throws IOException {
Expand All @@ -768,9 +789,21 @@ public void testLookupExplosionManyMatchesFiltered() throws IOException {
int reductionFactor = 1000; // reduce the number of matches by this factor
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
assertTrue(0 == lookupEntries % reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, false);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));
}

public void testLookupExplosionManyMatchesFilteredExpression() throws IOException {
// This test will only work with the expanding join optimization
// that pushes the filter to the right side of the lookup.
// Without the optimization, it will fail with circuit_breaking_exception
int sensorDataCount = 10000;
int lookupEntries = 10000;
int reductionFactor = 1000; // reduce the number of matches by this factor
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
assertTrue(0 == lookupEntries % reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, true);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));
}

public void testLookupExplosionNoFetch() throws IOException {
Expand All @@ -797,17 +830,33 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
}

private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
throws IOException {
private Map<String, Object> lookupExplosion(
int sensorDataCount,
int lookupEntries,
int joinFieldsCount,
int lookupEntriesToKeep,
boolean expressionBasedJoin
) throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount, expressionBasedJoin);
StringBuilder query = startQuery();
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON ");
for (int i = 0; i < joinFieldsCount; i++) {
if (i != 0) {
query.append(",");
if (expressionBasedJoin) {
for (int i = 0; i < joinFieldsCount; i++) {
if (i != 0) {
query.append(" AND ");
}
query.append("id_left").append(i);
query.append("==");
query.append("id_right").append(i);
}
} else {
for (int i = 0; i < joinFieldsCount; i++) {
if (i != 0) {
query.append(",");
}
query.append("id").append(i);
}
query.append("id").append(i);
}
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
Expand All @@ -826,7 +875,7 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri

private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries, 1);
lookupExplosionData(sensorDataCount, lookupEntries, 1, false);
StringBuilder query = startQuery();
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}");
return responseAsMap(query(query.toString(), null));
Expand All @@ -836,14 +885,15 @@ private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int look
}
}

private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount) throws IOException {
initSensorData(sensorDataCount, 1, joinFieldCount);
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount);
private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount, boolean expressionBasedJoin)
throws IOException {
initSensorData(sensorDataCount, 1, joinFieldCount, expressionBasedJoin);
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount, expressionBasedJoin);
}

private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException {
try {
initSensorData(sensorDataCount, 1, 1);
initSensorData(sensorDataCount, 1, 1, false);
initSensorLookupString(lookupEntries, 1, i -> {
int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes());
StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes()));
Expand Down Expand Up @@ -876,7 +926,7 @@ public void testEnrichExplosionManyMatches() throws IOException {

private Map<String, Object> enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
try {
initSensorData(sensorDataCount, 1, 1);
initSensorData(sensorDataCount, 1, 1, false);
initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
try {
StringBuilder query = startQuery();
Expand Down Expand Up @@ -1050,7 +1100,7 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
initIndex("mv_longs", bulk.toString());
}

private void initSensorData(int docCount, int sensorCount, int joinFieldCount) throws IOException {
private void initSensorData(int docCount, int sensorCount, int joinFieldCount, boolean expressionBasedJoin) throws IOException {
logger.info("loading sensor data");
// We cannot go over 1000 fields, due to failed on parsing mappings on index creation
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
Expand All @@ -1061,8 +1111,9 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
"properties": {
"@timestamp": { "type": "date" },
""");
String suffix = expressionBasedJoin ? "_left" : "";
for (int i = 0; i < joinFieldCount; i++) {
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"value": { "type": "double" }
Expand All @@ -1083,7 +1134,7 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
{"create":{}}
{"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate)));
for (int j = 0; j < joinFieldCount; j++) {
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, i % sensorCount));
data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, i % sensorCount));
}
data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1));
if (i % docsPerBulk == docsPerBulk - 1) {
Expand All @@ -1094,8 +1145,13 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
initIndex("sensor_data", data.toString());
}

private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location, int joinFieldsCount)
throws IOException {
private void initSensorLookup(
int lookupEntries,
int sensorCount,
IntFunction<String> location,
int joinFieldsCount,
boolean expressionBasedJoin
) throws IOException {
logger.info("loading sensor lookup");
// cannot go over 1000 fields, due to failed on parsing mappings on index creation
// [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
Expand All @@ -1105,8 +1161,9 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
{
"properties": {
""");
String suffix = expressionBasedJoin ? "_right" : "";
for (int i = 0; i < joinFieldsCount; i++) {
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"location": { "type": "geo_point" },
Expand All @@ -1127,7 +1184,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
{"create":{}}
{"""));
for (int j = 0; j < joinFieldsCount; j++) {
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, sensor));
}
data.append(String.format(Locale.ROOT, """
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
Expand Down Expand Up @@ -1165,7 +1222,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
}

private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
initSensorLookup(lookupEntries, sensorCount, location, 1);
initSensorLookup(lookupEntries, sensorCount, location, 1, false);
logger.info("loading sensor enrich");

Request create = new Request("PUT", "/_enrich/policy/sensor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public final Query getQuery(int position) {
* Returns the query at the given position.
*/
@Nullable
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);

private Query wrapSingleValueQuery(Query query) {
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";
Expand Down Expand Up @@ -155,17 +155,11 @@ private Query wrapSingleValueQuery(Query query) {
}

/**
* Returns a list of term queries for the given field and the input block
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
* Returns a function that reads values from the given block. The function
* takes the offset of the value to read and returns the value as an {@link Object}.
*/
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
public static IntFunction<Object> createBlockValueReader(Block block) {
return switch (block.elementType()) {
case BOOLEAN -> {
BooleanBlock booleanBlock = (BooleanBlock) block;
yield booleanBlock::getBoolean;
Expand Down Expand Up @@ -196,7 +190,20 @@ public static QueryList rawTermQueryList(
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
}

/**
* Returns a list of term queries for the given field and the input block
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block));
}

/**
Expand Down Expand Up @@ -297,7 +304,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
Expand Down Expand Up @@ -360,7 +367,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
Expand Down Expand Up @@ -412,7 +419,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> shapeQuery.apply(firstValueIndex);
Expand Down Expand Up @@ -453,5 +460,5 @@ private IntFunction<Query> shapeQuery() {
}
}

protected record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
public record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected ByteSizeValue enoughMemoryForSimple() {
* asserting both that this throws a {@link CircuitBreakingException} and releases
* all pages.
*/
public final void testSimpleCircuitBreaking() {
public void testSimpleCircuitBreaking() {
/*
* Build the input before building `simple` to handle the rare
* cases where `simple` need some state from the input - mostly
Expand Down
Loading