Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128519.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128519
summary: Add support for LOOKUP JOIN on aliases
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED_8_19 = def(8_841_0_39);
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES_ADDED_8_19 = def(8_841_0_40);
public static final TransportVersion DATA_STREAM_OPTIONS_API_REMOVE_INCLUDE_DEFAULTS_8_19 = def(8_841_0_41);
public static final TransportVersion JOIN_ON_ALIASES_8_19 = def(8_841_0_42);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
task.skipTest("esql/40_unsupported_types/unsupported", "TODO: support for subset of metric fields")
task.skipTest("esql/40_unsupported_types/unsupported with sort", "TODO: support for subset of metric fields")
task.skipTest("esql/63_enrich_int_range/Invalid age as double", "TODO: require disable allow_partial_results")
task.skipTest("esql/191_lookup_join_on_datastreams/data streams not supported in LOOKUP JOIN", "Added support for aliases in JOINs")
})


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.internal.AliasFilter;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.IntFunction;
Expand All @@ -45,12 +47,20 @@
*/
public abstract class QueryList {
protected final SearchExecutionContext searchExecutionContext;
protected final AliasFilter aliasFilter;
protected final MappedFieldType field;
protected final Block block;
protected final boolean onlySingleValues;

protected QueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block, boolean onlySingleValues) {
protected QueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
boolean onlySingleValues
) {
this.searchExecutionContext = searchExecutionContext;
this.aliasFilter = aliasFilter;
this.field = field;
this.block = block;
this.onlySingleValues = onlySingleValues;
Expand Down Expand Up @@ -78,6 +88,17 @@ final Query getQuery(int position) {

Query query = doGetQuery(position, firstValueIndex, valueCount);

if (aliasFilter != null && aliasFilter != AliasFilter.EMPTY) {
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(query, BooleanClause.Occur.FILTER);
try {
builder.add(aliasFilter.getQueryBuilder().toQuery(searchExecutionContext), BooleanClause.Occur.FILTER);
query = builder.build();
} catch (IOException e) {
throw new UncheckedIOException("Error while building query for alias filter", e);
}
}

if (onlySingleValues) {
query = wrapSingleValueQuery(query);
}
Expand Down Expand Up @@ -121,7 +142,12 @@ private Query wrapSingleValueQuery(Query query) {
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) {
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
case BOOLEAN -> {
BooleanBlock booleanBlock = (BooleanBlock) block;
Expand Down Expand Up @@ -153,17 +179,22 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
return new TermQueryList(field, searchExecutionContext, block, false, blockToJavaObject);
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, false, blockToJavaObject);
}

/**
* Returns a list of term queries for the given field and the input block of
* {@code ip} field values.
*/
public static QueryList ipTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, BytesRefBlock block) {
public static QueryList ipTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
BytesRefBlock block
) {
BytesRef scratch = new BytesRef();
byte[] ipBytes = new byte[InetAddressPoint.BYTES];
return new TermQueryList(field, searchExecutionContext, block, false, offset -> {
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, false, offset -> {
final var bytes = block.getBytesRef(offset, scratch);
if (ipBytes.length != bytes.length) {
// Lucene only support 16-byte IP addresses, even IPv4 is encoded in 16 bytes
Expand All @@ -178,10 +209,16 @@ public static QueryList ipTermQueryList(MappedFieldType field, SearchExecutionCo
* Returns a list of term queries for the given field and the input block of
* {@code date} field values.
*/
public static QueryList dateTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, LongBlock block) {
public static QueryList dateTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
LongBlock block
) {
return new TermQueryList(
field,
searchExecutionContext,
aliasFilter,
block,
false,
field instanceof RangeFieldMapper.RangeFieldType rangeFieldType
Expand All @@ -193,8 +230,14 @@ public static QueryList dateTermQueryList(MappedFieldType field, SearchExecution
/**
* Returns a list of geo_shape queries for the given field and the input block.
*/
public static QueryList geoShapeQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) {
return new GeoShapeQueryList(field, searchExecutionContext, block, false);

public static QueryList geoShapeQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
return new GeoShapeQueryList(field, searchExecutionContext, aliasFilter, block, false);
}

private static class TermQueryList extends QueryList {
Expand All @@ -203,17 +246,18 @@ private static class TermQueryList extends QueryList {
private TermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
boolean onlySingleValues,
IntFunction<Object> blockValueReader
) {
super(field, searchExecutionContext, block, onlySingleValues);
super(field, searchExecutionContext, aliasFilter, block, onlySingleValues);
this.blockValueReader = blockValueReader;
}

@Override
public TermQueryList onlySingleValues() {
return new TermQueryList(field, searchExecutionContext, block, true, blockValueReader);
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, true, blockValueReader);
}

@Override
Expand Down Expand Up @@ -241,18 +285,19 @@ private static class GeoShapeQueryList extends QueryList {
private GeoShapeQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
boolean onlySingleValues
) {
super(field, searchExecutionContext, block, onlySingleValues);
super(field, searchExecutionContext, aliasFilter, block, onlySingleValues);

this.blockValueReader = blockToGeometry(block);
this.shapeQuery = shapeQuery();
}

@Override
public GeoShapeQueryList onlySingleValues() {
return new GeoShapeQueryList(field, searchExecutionContext, block, true);
return new GeoShapeQueryList(field, searchExecutionContext, aliasFilter, block, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testQueries() throws Exception {
var inputTerms = makeTermsBlock(List.of(List.of("b2"), List.of("c1", "a2"), List.of("z2"), List.of(), List.of("a3"), List.of()))
) {
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
QueryList queryList = QueryList.rawTermQueryList(uidField, directoryData.searchExecutionContext, inputTerms);
QueryList queryList = QueryList.rawTermQueryList(uidField, directoryData.searchExecutionContext, AliasFilter.EMPTY, inputTerms);
assertThat(queryList.getPositionCount(), equalTo(6));
assertThat(queryList.getQuery(0), equalTo(new TermQuery(new Term("uid", new BytesRef("b2")))));
assertThat(queryList.getQuery(1), equalTo(new TermInSetQuery("uid", List.of(new BytesRef("c1"), new BytesRef("a2")))));
Expand Down Expand Up @@ -154,7 +155,12 @@ public void testRandomMatchQueries() throws Exception {
}).toList();

try (var directoryData = makeDirectoryWith(directoryTermsList); var inputTerms = makeTermsBlock(inputTermsList)) {
var queryList = QueryList.rawTermQueryList(directoryData.field, directoryData.searchExecutionContext, inputTerms);
var queryList = QueryList.rawTermQueryList(
directoryData.field,
directoryData.searchExecutionContext,
AliasFilter.EMPTY,
inputTerms
);
int maxPageSize = between(1, 256);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(
Expand Down Expand Up @@ -192,8 +198,12 @@ public void testQueries_OnlySingleValues() throws Exception {
List.of(List.of("b2"), List.of("c1", "a2"), List.of("z2"), List.of(), List.of("a3"), List.of("a3", "a2", "z2", "xx"))
)
) {
QueryList queryList = QueryList.rawTermQueryList(directoryData.field, directoryData.searchExecutionContext, inputTerms)
.onlySingleValues();
QueryList queryList = QueryList.rawTermQueryList(
directoryData.field,
directoryData.searchExecutionContext,
AliasFilter.EMPTY,
inputTerms
).onlySingleValues();
// pos -> terms -> docs
// -----------------------------
// 0 -> [b2] -> []
Expand Down
Loading