Skip to content

ESQL: Push filters on right hand-side of joins #132635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/132635.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 132635
summary: Push filters on right hand-side of joins
area: ES|QL
type: enhancement
issues:
- 130024
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ static TransportVersion def(int id) {
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_FILTER = def(9_138_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.index.mapper.GeoShapeQueryable;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.internal.AliasFilter;

Expand All @@ -56,19 +57,33 @@ public abstract class QueryList {
protected final Block block;
@Nullable
protected final OnlySingleValueParams onlySingleValueParams;
@Nullable
protected final QueryBuilder filterQueryBuilder;

protected QueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
OnlySingleValueParams onlySingleValueParams
) {
this(field, searchExecutionContext, aliasFilter, block, onlySingleValueParams, null);
}

private QueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
OnlySingleValueParams onlySingleValueParams,
QueryBuilder filterQueryBuilder
) {
this.searchExecutionContext = searchExecutionContext;
this.aliasFilter = aliasFilter;
this.field = field;
this.block = block;
this.onlySingleValueParams = onlySingleValueParams;
this.filterQueryBuilder = filterQueryBuilder;
}

/**
Expand All @@ -87,6 +102,27 @@ int getPositionCount() {
*/
public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage);

public QueryList withFilterQueryBuilder(QueryBuilder filterQueryBuilder) {
return new QueryList(
this.field,
this.searchExecutionContext,
this.aliasFilter,
this.block,
this.onlySingleValueParams,
filterQueryBuilder
) {
@Override
public QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage) {
return QueryList.this.onlySingleValues(warnings, multiValueWarningMessage);
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return QueryList.this.doGetQuery(position, firstValueIndex, valueCount);
}
};
}

final Query getQuery(int position) {
final int valueCount = block.getValueCount(position);
if (onlySingleValueParams != null && valueCount != 1) {
Expand All @@ -100,20 +136,42 @@ final Query getQuery(int position) {
final int firstValueIndex = block.getFirstValueIndex(position);

Query query = doGetQuery(position, firstValueIndex, valueCount);
if (query == null) {
return null;
}

BooleanQuery.Builder builder = new BooleanQuery.Builder();
boolean builderHasClauses = false;

if (aliasFilter != null && aliasFilter != AliasFilter.EMPTY) {
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(query, BooleanClause.Occur.FILTER);
try {
builder.add(aliasFilter.getQueryBuilder().toQuery(searchExecutionContext), BooleanClause.Occur.FILTER);
query = builder.build();
builderHasClauses = true;
} catch (IOException e) {
throw new UncheckedIOException("Error while building query for alias filter", e);
}
}

if (onlySingleValueParams != null) {
query = wrapSingleValueQuery(query);
Query singleValueQuery = wrapSingleValueQuery();
if (singleValueQuery != null) {
builder.add(singleValueQuery, BooleanClause.Occur.FILTER);
builderHasClauses = true;
}
}

if (filterQueryBuilder != null) {
try {
builder.add(filterQueryBuilder.toQuery(searchExecutionContext), BooleanClause.Occur.FILTER);
builderHasClauses = true;
} catch (IOException e) {
throw new UncheckedIOException("Error while building filter query", e);
}
}

if (builderHasClauses) {
builder.add(query, BooleanClause.Occur.FILTER);
return builder.build();
}

return query;
Expand All @@ -125,7 +183,7 @@ final Query getQuery(int position) {
@Nullable
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);

private Query wrapSingleValueQuery(Query query) {
private Query wrapSingleValueQuery() {
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";

SingleValueMatchQuery singleValueQuery = new SingleValueMatchQuery(
Expand All @@ -138,18 +196,11 @@ private Query wrapSingleValueQuery(Query query) {
Query rewrite;
try {
rewrite = singleValueQuery.rewrite(searchExecutionContext.searcher());
if (rewrite instanceof MatchAllDocsQuery) {
// nothing to filter
return query;
}
} catch (IOException e) {
throw new UncheckedIOException("Error while rewriting SingleValueQuery", e);
}

BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(query, BooleanClause.Occur.FILTER);
builder.add(rewrite, BooleanClause.Occur.FILTER);
return builder.build();
return rewrite instanceof MatchAllDocsQuery ? /* nothing to filter */ null : rewrite;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws
"lookup",
new FieldAttribute.FieldName("key"),
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
null,
Source.EMPTY
);
DriverContext driverContext = driverContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -197,6 +198,7 @@ protected abstract QueryList queryList(
AliasFilter aliasFilter,
Block inputBlock,
@Nullable DataType inputDataType,
// @Nullable QueryBuilder filterQueryBuilder,
Warnings warnings
);

Expand Down Expand Up @@ -342,6 +344,9 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
request.inputDataType,
warnings
);
if (request.filterQueryBuilder != null) {
queryList = queryList.withFilterQueryBuilder(request.filterQueryBuilder);
}
var queryOperator = new EnrichQuerySourceOperator(
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
Expand Down Expand Up @@ -511,6 +516,7 @@ abstract static class Request {
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final QueryBuilder filterQueryBuilder;
final Source source;

Request(
Expand All @@ -520,6 +526,7 @@ abstract static class Request {
DataType inputDataType,
Page inputPage,
List<NamedExpression> extractFields,
QueryBuilder filterQueryBuilder,
Source source
) {
this.sessionId = sessionId;
Expand All @@ -528,8 +535,21 @@ abstract static class Request {
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.extractFields = extractFields;
this.filterQueryBuilder = filterQueryBuilder;
this.source = source;
}

Request(
String sessionId,
String index,
String indexPattern,
DataType inputDataType,
Page inputPage,
List<NamedExpression> extractFields,
Source source
) {
this(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, null, source);
}
}

abstract static class TransportRequest extends AbstractTransportRequest implements IndicesRequest {
Expand All @@ -543,6 +563,8 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
@Nullable // may be missing: either no filter to apply, or remote node is older than ESQL_LOOKUP_JOIN_FILTER
final QueryBuilder filterQueryBuilder;
final Source source;
// TODO: Remove this workaround once we have Block RefCount
final Page toRelease;
Expand All @@ -556,6 +578,7 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields,
QueryBuilder filterQueryBuilder,
Source source
) {
this.sessionId = sessionId;
Expand All @@ -565,9 +588,23 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
this.inputPage = inputPage;
this.toRelease = toRelease;
this.extractFields = extractFields;
this.filterQueryBuilder = filterQueryBuilder;
this.source = source;
}

TransportRequest(
String sessionId,
ShardId shardId,
String indexPattern,
DataType inputDataType,
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields,
Source source
) {
this(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, null, source);
}

@Override
public final String[] indices() {
return new String[] { indexPattern };
Expand Down Expand Up @@ -625,6 +662,8 @@ public final String toString() {
+ inputDataType
+ " ,extract_fields="
+ extractFields
+ ", filterQueryBuilder="
+ filterQueryBuilder
+ " ,positions="
+ inputPage.getPositionCount()
+ extraDescription()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
Expand Down Expand Up @@ -48,6 +49,7 @@ public record Factory(
String lookupIndex,
FieldAttribute.FieldName matchField,
List<NamedExpression> loadFields,
QueryBuilder filterQueryBuilder,
Source source
) implements OperatorFactory {
@Override
Expand All @@ -62,6 +64,8 @@ public String describe() {
+ loadFields
+ " inputChannel="
+ inputChannel
+ " filterQueryBuilder="
+ filterQueryBuilder
+ "]";
}

Expand All @@ -79,6 +83,7 @@ public Operator get(DriverContext driverContext) {
lookupIndex,
matchField.string(),
loadFields,
filterQueryBuilder,
source
);
}
Expand All @@ -93,6 +98,7 @@ public Operator get(DriverContext driverContext) {
private final String lookupIndex;
private final String matchField;
private final List<NamedExpression> loadFields;
private final QueryBuilder filterQueryBuilder;
private final Source source;
private long totalTerms = 0L;
/**
Expand All @@ -116,6 +122,7 @@ public LookupFromIndexOperator(
String lookupIndex,
String matchField,
List<NamedExpression> loadFields,
QueryBuilder filterQueryBuilder,
Source source
) {
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
Expand All @@ -128,6 +135,7 @@ public LookupFromIndexOperator(
this.lookupIndex = lookupIndex;
this.matchField = matchField;
this.loadFields = loadFields;
this.filterQueryBuilder = filterQueryBuilder;
this.source = source;
}

Expand All @@ -143,6 +151,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
matchField,
new Page(inputBlock),
loadFields,
filterQueryBuilder,
source
);
lookupService.lookupAsync(
Expand Down Expand Up @@ -200,6 +209,8 @@ public String toString() {
+ loadFields
+ " inputChannel="
+ inputChannel
+ " filterQueryBuilder="
+ filterQueryBuilder
+ "]";
}

Expand Down
Loading