Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133166.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133166
summary: Improve Expanding Lookup Join performance by pushing a filter to the right
side of the lookup join
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ static TransportVersion def(int id) {
public static final TransportVersion DATA_STREAM_WRITE_INDEX_ONLY_SETTINGS = def(9_142_0_00);
public static final TransportVersion SCRIPT_RESCORER = def(9_143_0_00);
public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_145_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,26 +698,42 @@ private Map<String, Object> fetchMvLongs() throws IOException {
public void testLookupExplosion() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFields() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 990;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyMatchesManyFields() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
}

public void testLookupExplosionManyMatches() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
}

public void testLookupExplosionManyMatchesFiltered() throws IOException {
// This test will only work with the expanding join optimization
// that pushes the filter to the right side of the lookup.
// Without the optimization, it will fail with circuit_breaking_exception
int sensorDataCount = 10000;
int lookupEntries = 10000;
int reductionFactor = 1000; // reduce the number of matches by this factor
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
assertTrue(0 == lookupEntries % reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));

}

public void testLookupExplosionNoFetch() throws IOException {
Expand All @@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
}

private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
StringBuilder query = startQuery();
Expand All @@ -755,7 +772,14 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
}
query.append("id").append(i);
}
query.append(" | STATS COUNT(location)\"}");
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
// we add both a Lucine pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);

}
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
Expand Down Expand Up @@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"location": { "type": "geo_point" }
"location": { "type": "geo_point" },
"filter_key": { "type": "integer" }
}
}""");
CreateIndexResponse response = createIndex(
Expand All @@ -1058,7 +1083,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
}
data.append(String.format(Locale.ROOT, """
"location": "POINT(%s)"}\n""", location.apply(sensor)));
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_lookup", data.toString());
data.setLength(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil
return page;
}

private Query nextQuery() {
private Query nextQuery() throws IOException {
++queryPosition;
while (isFinished() == false) {
Query query = queryList.getQuery(queryPosition);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.core.Nullable;

import java.io.IOException;

/**
* An interface to generates queries for the lookup and enrich operators.
* This interface is used to retrieve queries based on a position index.
Expand All @@ -20,7 +22,7 @@ public interface LookupEnrichQueryGenerator {
* Returns the query at the given position.
*/
@Nullable
Query getQuery(int position);
Query getQuery(int position) throws IOException;

/**
* Returns the number of queries in this generator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public int getPositionCount() {
public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage);

@Override
public final Query getQuery(int position) {
public final Query getQuery(int position) throws IOException {
final int valueCount = block.getValueCount(position);
if (onlySingleValueParams != null && valueCount != 1) {
if (valueCount > 1) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public final Query getQuery(int position) {
* Returns the query at the given position.
*/
@Nullable
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount) throws IOException;

private Query wrapSingleValueQuery(Query query) {
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";
Expand Down Expand Up @@ -159,13 +159,8 @@ private Query wrapSingleValueQuery(Query query) {
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
public static IntFunction<Object> createBlockValueReader(Block block) {
return switch (block.elementType()) {
case BOOLEAN -> {
BooleanBlock booleanBlock = (BooleanBlock) block;
yield booleanBlock::getBoolean;
Expand Down Expand Up @@ -196,7 +191,20 @@ public static QueryList rawTermQueryList(
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
}

/**
* Returns a list of term queries for the given field and the input block
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block));
}

/**
Expand Down Expand Up @@ -297,7 +305,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
Expand Down Expand Up @@ -360,7 +368,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
Expand Down Expand Up @@ -412,7 +420,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> shapeQuery.apply(firstValueIndex);
Expand Down
Loading