Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/115091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 115091
summary: Added stricter range type checks and runtime warnings for ENRICH
area: ES|QL
type: bug
issues:
- 107357
- 116799
31 changes: 28 additions & 3 deletions docs/reference/esql/esql-enrich-data.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,33 @@ include::{es-ref-dir}/ingest/apis/enrich/execute-enrich-policy.asciidoc[tag=upda

include::../ingest/enrich.asciidoc[tag=update-enrich-policy]

==== Limitations
==== Enrich Policy Types and Limitations
The {esql} `ENRICH` command supports all three enrich policy types:

`geo_match`::
Matches enrich data to incoming documents based on a <<query-dsl-geo-shape-query,`geo_shape` query>>.
For an example, see <<geo-match-enrich-policy-type>>.

`match`::
Matches enrich data to incoming documents based on a <<query-dsl-term-query,`term` query>>.
For an example, see <<match-enrich-policy-type>>.

`range`::
Matches a number, date, or IP address in incoming documents to a range in the
enrich index based on a <<query-dsl-term-query,`term` query>>. For an example,
see <<range-enrich-policy-type>>.

// tag::limitations[]
The {esql} `ENRICH` command only supports enrich policies of type `match`.
Furthermore, `ENRICH` only supports enriching on a column of type `keyword`.
While all three enrich policy types are supported, there are some limitations to be aware of:

* The `geo_match` enrich policy type only supports the `intersects` spatial relation.
* It is required that the `match_field` in the `ENRICH` command is of the correct type.
For example, if the enrich policy is of type `geo_match`, the `match_field` in the `ENRICH`
command must be of type `geo_point` or `geo_shape`.
Likewise, a `range` enrich policy requires a `match_field` of type `integer`, `long`, `date`, or `ip`,
depending on the type of the range field in the original enrich index.
* However, this constraint is relaxed for `range` policies when the `match_field` is of type `KEYWORD`.
In this case the field values will be parsed during query execution, row by row.
If any value fails to parse, the output values for that row will be set to `null`,
an appropriate warning will be produced and the query will continue to execute.
// end::limitations[]
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS = def(8_793_00_0);
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0);
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,6 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
task.skipTest("esql/80_text/reverse text", "The output type changed from TEXT to KEYWORD.")
task.skipTest("esql/80_text/values function", "The output type changed from TEXT to KEYWORD.")
task.skipTest("privileges/11_builtin/Test get builtin privileges" ,"unnecessary to test compatibility")
task.skipTest("esql/61_enrich_ip/Invalid IP strings", "We switched from exceptions to null+warnings for ENRICH runtime errors")
})

Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,13 @@ public DataType noText() {
return isString(this) ? KEYWORD : this;
}

public boolean isDate() {
return switch (this) {
case DATETIME, DATE_NANOS -> true;
default -> false;
};
}

/**
* Named parameters with default values. It's just easier to do this with
* a builder in java....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand All @@ -37,17 +38,25 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
private int queryPosition = -1;
private final IndexReader indexReader;
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;

// using smaller pages enables quick cancellation and reduces sorting costs
public static final int DEFAULT_MAX_PAGE_SIZE = 256;

public EnrichQuerySourceOperator(BlockFactory blockFactory, int maxPageSize, QueryList queryList, IndexReader indexReader) {
public EnrichQuerySourceOperator(
BlockFactory blockFactory,
int maxPageSize,
QueryList queryList,
IndexReader indexReader,
Warnings warnings
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.queryList = queryList;
this.indexReader = indexReader;
this.searcher = new IndexSearcher(indexReader);
this.warnings = warnings;
}

@Override
Expand All @@ -72,12 +81,18 @@ public Page getOutput() {
}
int totalMatches = 0;
do {
Query query = nextQuery();
if (query == null) {
assert isFinished();
break;
Query query;
try {
query = nextQuery();
if (query == null) {
assert isFinished();
break;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
} catch (Exception e) {
warnings.registerException(e);
continue;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
final var weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
if (weight == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -120,7 +122,8 @@ public void testQueries() throws Exception {
// 3 -> [] -> []
// 4 -> [a1] -> [3]
// 5 -> [] -> []
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader, warnings);
Page p0 = queryOperator.getOutput();
assertNotNull(p0);
assertThat(p0.getPositionCount(), equalTo(6));
Expand Down Expand Up @@ -187,7 +190,8 @@ public void testRandomMatchQueries() throws Exception {
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms);
int maxPageSize = between(1, 256);
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader, warnings);
Map<Integer, Set<Integer>> actualPositions = new HashMap<>();
while (queryOperator.isFinished() == false) {
Page page = queryOperator.getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void testLookupIndex() throws IOException {
DataType.KEYWORD,
"lookup",
"data",
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG)))
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
Source.EMPTY
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ public enum Cap {
*/
RANGEQUERY_FOR_DATETIME,

/**
* Enforce strict type checking on ENRICH range types, and warnings for KEYWORD parsing at runtime. Done in #115091.
*/
ENRICH_STRICT_RANGE_TYPES,

/**
* Fix for non-unique attribute names in ROW and logical plans.
* https://github.com/elastic/elasticsearch/issues/110541
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
import org.elasticsearch.compute.operator.lookup.QueryList;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -166,6 +168,10 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
);
}

public ThreadContext getThreadContext() {
return transportService.getThreadPool().getThreadContext();
}

/**
* Convert a request as sent to {@link #lookupAsync} into a transport request after
* preflight checks have been performed.
Expand Down Expand Up @@ -330,11 +336,18 @@ private void doLookup(T request, CancellableTask task, ActionListener<Page> list
releasables.add(mergePositionsOperator);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
request.source.source().getColumnNumber(),
request.source.text()
);
var queryOperator = new EnrichQuerySourceOperator(
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
searchExecutionContext.getIndexReader()
searchExecutionContext.getIndexReader(),
warnings
);
releasables.add(queryOperator);
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
Expand Down Expand Up @@ -450,13 +463,22 @@ abstract static class Request {
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;

Request(String sessionId, String index, DataType inputDataType, Page inputPage, List<NamedExpression> extractFields) {
Request(
String sessionId,
String index,
DataType inputDataType,
Page inputPage,
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.index = index;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.extractFields = extractFields;
this.source = source;
}
}

Expand All @@ -470,6 +492,7 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;
// TODO: Remove this workaround once we have Block RefCount
final Page toRelease;
final RefCounted refs = AbstractRefCounted.of(this::releasePage);
Expand All @@ -480,14 +503,16 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
DataType inputDataType,
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.shardId = shardId;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.toRelease = toRelease;
this.extractFields = extractFields;
this.source = source;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.elasticsearch.compute.operator.AsyncOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.io.IOException;
Expand All @@ -35,6 +37,8 @@ public final class EnrichLookupOperator extends AsyncOperator {
private final String matchType;
private final String matchField;
private final List<NamedExpression> enrichFields;
private final ResponseHeadersCollector responseHeadersCollector;
private final Source source;
private long totalTerms = 0L;

public record Factory(
Expand All @@ -47,7 +51,8 @@ public record Factory(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) implements OperatorFactory {
@Override
public String describe() {
Expand Down Expand Up @@ -75,7 +80,8 @@ public Operator get(DriverContext driverContext) {
enrichIndex,
matchType,
matchField,
enrichFields
enrichFields,
source
);
}
}
Expand All @@ -91,7 +97,8 @@ public EnrichLookupOperator(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) {
super(driverContext, maxOutstandingRequests);
this.sessionId = sessionId;
Expand All @@ -103,6 +110,8 @@ public EnrichLookupOperator(
this.matchType = matchType;
this.matchField = matchField;
this.enrichFields = enrichFields;
this.source = source;
this.responseHeadersCollector = new ResponseHeadersCollector(enrichLookupService.getThreadContext());
}

@Override
Expand All @@ -116,9 +125,14 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
matchType,
matchField,
new Page(inputBlock),
enrichFields
enrichFields,
source
);
enrichLookupService.lookupAsync(
request,
parentTask,
ActionListener.runBefore(listener.map(inputPage::appendPage), responseHeadersCollector::collect)
);
enrichLookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
}

@Override
Expand All @@ -140,6 +154,7 @@ public String toString() {
protected void doClose() {
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
// then cancel it when this operator terminates early (e.g., have enough result).
responseHeadersCollector.finish();
}

@Override
Expand Down
Loading