Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
aaf7bac
Added strict range type checks for ENRICH
craigtaverner Oct 18, 2024
7cd46a0
Update docs/changelog/115091.yaml
craigtaverner Oct 18, 2024
d1aa099
Cover range(DATE)->DATETIME case
craigtaverner Oct 21, 2024
498adc3
Added warnings collection in EnrichQuerySourceOperator
craigtaverner Nov 8, 2024
678a302
Fixed failing test
craigtaverner Nov 11, 2024
f423537
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 11, 2024
c3674a5
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 12, 2024
5ccd0f6
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 13, 2024
4967d43
Fix warnings propagating back from async request
craigtaverner Nov 13, 2024
95ce406
Improved changelog text, and fixed failing tests
craigtaverner Nov 14, 2024
15feaf7
Refactor ENRICH limitations to more closely match reality
craigtaverner Nov 14, 2024
68b5962
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 14, 2024
7709786
Update docs/changelog/115091.yaml
craigtaverner Nov 14, 2024
8a85e07
Enrich has no internal config, so we should not use it for parent sou…
craigtaverner Nov 14, 2024
813f502
Fix failing test with new warnings
craigtaverner Nov 14, 2024
a77874b
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 18, 2024
77ad43c
Make yaml tests more resilient in mixed-cluster and serverless
craigtaverner Nov 18, 2024
d16c02b
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 19, 2024
81777de
Clean up enrich range-type checks with comments
craigtaverner Nov 19, 2024
f31b199
Merge remote-tracking branch 'origin/main' into enrich_strict_range_t…
craigtaverner Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/115091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 115091
summary: Added stricter range type checks and runtime warnings for ENRICH
area: ES|QL
type: bug
issues:
- 107357
- 116799
31 changes: 28 additions & 3 deletions docs/reference/esql/esql-enrich-data.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,33 @@ include::{es-ref-dir}/ingest/apis/enrich/execute-enrich-policy.asciidoc[tag=upda

include::../ingest/enrich.asciidoc[tag=update-enrich-policy]

==== Limitations
==== Enrich Policy Types and Limitations
The {esql} `ENRICH` command supports all three enrich policy types:

`geo_match`::
Matches enrich data to incoming documents based on a <<query-dsl-geo-shape-query,`geo_shape` query>>.
For an example, see <<geo-match-enrich-policy-type>>.

`match`::
Matches enrich data to incoming documents based on a <<query-dsl-term-query,`term` query>>.
For an example, see <<match-enrich-policy-type>>.

`range`::
Matches a number, date, or IP address in incoming documents to a range in the
enrich index based on a <<query-dsl-term-query,`term` query>>. For an example,
see <<range-enrich-policy-type>>.

// tag::limitations[]
The {esql} `ENRICH` command only supports enrich policies of type `match`.
Furthermore, `ENRICH` only supports enriching on a column of type `keyword`.
While all three enrich policy types are supported, there are some limitations to be aware of:

* The `geo_match` enrich policy type only supports the `intersects` spatial relation.
* It is required that the `match_field` in the `ENRICH` command is of the correct type.
For example, if the enrich policy is of type `geo_match`, the `match_field` in the `ENRICH`
command must be of type `geo_point` or `geo_shape`.
Likewise, a `range` enrich policy requires a `match_field` of type `integer`, `long`, `date`, or `ip`,
depending on the type of the range field in the original enrich index.
* However, this constraint is relaxed for `range` policies when the `match_field` is of type `KEYWORD`.
In this case the field values will be parsed during query execution, row by row.
If any value fails to parse, the output values for that row will be set to `null`,
an appropriate warning will be produced and the query will continue to execute.
// end::limitations[]
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ static TransportVersion def(int id) {
public static final TransportVersion VERTEX_AI_INPUT_TYPE_ADDED = def(8_790_00_0);
public static final TransportVersion SKIP_INNER_HITS_SEARCH_SOURCE = def(8_791_00_0);
public static final TransportVersion QUERY_RULES_LIST_INCLUDES_TYPES = def(8_792_00_0);
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_793_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,6 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("esql/80_text/reverse text", "The output type changed from TEXT to KEYWORD.")
task.skipTest("esql/80_text/values function", "The output type changed from TEXT to KEYWORD.")
task.skipTest("privileges/11_builtin/Test get builtin privileges" ,"unnecessary to test compatibility")
task.skipTest("esql/61_enrich_ip/Invalid IP strings", "We switched from exceptions to null+warnings for ENRICH runtime errors")
})

Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,13 @@ public DataType noText() {
return isString(this) ? KEYWORD : this;
}

public boolean isDate() {
return switch (this) {
case DATETIME, DATE_NANOS -> true;
default -> false;
};
}

/**
* Named parameters with default values. It's just easier to do this with
* a builder in java....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand All @@ -38,17 +39,25 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
private int queryPosition = -1;
private final IndexReader indexReader;
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;

// using smaller pages enables quick cancellation and reduces sorting costs
public static final int DEFAULT_MAX_PAGE_SIZE = 256;

public EnrichQuerySourceOperator(BlockFactory blockFactory, int maxPageSize, QueryList queryList, IndexReader indexReader) {
public EnrichQuerySourceOperator(
BlockFactory blockFactory,
int maxPageSize,
QueryList queryList,
IndexReader indexReader,
Warnings warnings
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.queryList = queryList;
this.indexReader = indexReader;
this.searcher = new IndexSearcher(indexReader);
this.warnings = warnings;
}

@Override
Expand All @@ -73,12 +82,18 @@ public Page getOutput() {
}
int totalMatches = 0;
do {
Query query = nextQuery();
if (query == null) {
assert isFinished();
break;
Query query;
try {
query = nextQuery();
if (query == null) {
assert isFinished();
break;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
} catch (Exception e) {
warnings.registerException(e);
continue;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
final var weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
if (weight == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -120,7 +122,8 @@ public void testQueries() throws Exception {
// 3 -> [] -> []
// 4 -> [a1] -> [3]
// 5 -> [] -> []
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader, warnings);
Page p0 = queryOperator.getOutput();
assertNotNull(p0);
assertThat(p0.getPositionCount(), equalTo(6));
Expand Down Expand Up @@ -187,7 +190,8 @@ public void testRandomMatchQueries() throws Exception {
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms);
int maxPageSize = between(1, 256);
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader, warnings);
Map<Integer, Set<Integer>> actualPositions = new HashMap<>();
while (queryOperator.isFinished() == false) {
Page page = queryOperator.getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void testLookupIndex() throws IOException {
DataType.KEYWORD,
"lookup",
"data",
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG)))
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
Source.EMPTY
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ public enum Cap {
*/
RANGEQUERY_FOR_DATETIME,

/**
* Enforce strict type checking on ENRICH range types, and warnings for KEYWORD parsing at runtime. Done in #115091.
*/
ENRICH_STRICT_RANGE_TYPES,

/**
* Fix for non-unique attribute names in ROW and logical plans.
* https://github.com/elastic/elasticsearch/issues/110541
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
import org.elasticsearch.compute.operator.lookup.QueryList;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -166,6 +168,10 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
);
}

public ThreadContext getThreadContext() {
return transportService.getThreadPool().getThreadContext();
}

/**
* Convert a request as sent to {@link #lookupAsync} into a transport request after
* preflight checks have been performed.
Expand Down Expand Up @@ -327,11 +333,18 @@ private void doLookup(T request, CancellableTask task, ActionListener<Page> list
releasables.add(mergePositionsOperator);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
request.source.source().getColumnNumber(),
request.source.text()
);
var queryOperator = new EnrichQuerySourceOperator(
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
searchExecutionContext.getIndexReader()
searchExecutionContext.getIndexReader(),
warnings
);
releasables.add(queryOperator);
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
Expand Down Expand Up @@ -447,13 +460,22 @@ abstract static class Request {
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;

Request(String sessionId, String index, DataType inputDataType, Page inputPage, List<NamedExpression> extractFields) {
Request(
String sessionId,
String index,
DataType inputDataType,
Page inputPage,
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.index = index;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.extractFields = extractFields;
this.source = source;
}
}

Expand All @@ -467,6 +489,7 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;
// TODO: Remove this workaround once we have Block RefCount
final Page toRelease;
final RefCounted refs = AbstractRefCounted.of(this::releasePage);
Expand All @@ -477,14 +500,16 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
DataType inputDataType,
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.shardId = shardId;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.toRelease = toRelease;
this.extractFields = extractFields;
this.source = source;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.elasticsearch.compute.operator.AsyncOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.io.IOException;
Expand All @@ -35,6 +37,8 @@ public final class EnrichLookupOperator extends AsyncOperator {
private final String matchType;
private final String matchField;
private final List<NamedExpression> enrichFields;
private final ResponseHeadersCollector responseHeadersCollector;
private final Source source;
private long totalTerms = 0L;

public record Factory(
Expand All @@ -47,7 +51,8 @@ public record Factory(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) implements OperatorFactory {
@Override
public String describe() {
Expand Down Expand Up @@ -75,7 +80,8 @@ public Operator get(DriverContext driverContext) {
enrichIndex,
matchType,
matchField,
enrichFields
enrichFields,
source
);
}
}
Expand All @@ -91,7 +97,8 @@ public EnrichLookupOperator(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) {
super(driverContext, maxOutstandingRequests);
this.sessionId = sessionId;
Expand All @@ -103,6 +110,8 @@ public EnrichLookupOperator(
this.matchType = matchType;
this.matchField = matchField;
this.enrichFields = enrichFields;
this.source = source;
this.responseHeadersCollector = new ResponseHeadersCollector(enrichLookupService.getThreadContext());
}

@Override
Expand All @@ -116,9 +125,14 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
matchType,
matchField,
new Page(inputBlock),
enrichFields
enrichFields,
source
);
enrichLookupService.lookupAsync(
request,
parentTask,
ActionListener.runBefore(listener.map(inputPage::appendPage), responseHeadersCollector::collect)
);
enrichLookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
}

@Override
Expand All @@ -140,6 +154,7 @@ public String toString() {
protected void doClose() {
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
// then cancel it when this operator terminates early (e.g., have enough result).
responseHeadersCollector.finish();
}

@Override
Expand Down
Loading