Skip to content

Commit 498adc3

Browse files
committed
Added warnings collection in EnrichQuerySourceOperator
1 parent d1aa099 commit 498adc3

File tree

12 files changed

+154
-46
lines changed

12 files changed

+154
-46
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ static TransportVersion def(int id) {
189189
public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0);
190190
public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0);
191191
public static final TransportVersion KQL_QUERY_ADDED = def(8_786_00_0);
192+
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_787_00_0);
192193

193194
/*
194195
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.compute.data.IntVector;
2323
import org.elasticsearch.compute.data.Page;
2424
import org.elasticsearch.compute.operator.SourceOperator;
25+
import org.elasticsearch.compute.operator.Warnings;
2526
import org.elasticsearch.core.Releasables;
2627

2728
import java.io.IOException;
@@ -38,17 +39,25 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
3839
private int queryPosition = -1;
3940
private final IndexReader indexReader;
4041
private final IndexSearcher searcher;
42+
private final Warnings warnings;
4143
private final int maxPageSize;
4244

4345
// using smaller pages enables quick cancellation and reduces sorting costs
4446
public static final int DEFAULT_MAX_PAGE_SIZE = 256;
4547

46-
public EnrichQuerySourceOperator(BlockFactory blockFactory, int maxPageSize, QueryList queryList, IndexReader indexReader) {
48+
public EnrichQuerySourceOperator(
49+
BlockFactory blockFactory,
50+
int maxPageSize,
51+
QueryList queryList,
52+
IndexReader indexReader,
53+
Warnings warnings
54+
) {
4755
this.blockFactory = blockFactory;
4856
this.maxPageSize = maxPageSize;
4957
this.queryList = queryList;
5058
this.indexReader = indexReader;
5159
this.searcher = new IndexSearcher(indexReader);
60+
this.warnings = warnings;
5261
}
5362

5463
@Override
@@ -73,12 +82,18 @@ public Page getOutput() {
7382
}
7483
int totalMatches = 0;
7584
do {
76-
Query query = nextQuery();
77-
if (query == null) {
78-
assert isFinished();
79-
break;
85+
Query query;
86+
try {
87+
query = nextQuery();
88+
if (query == null) {
89+
assert isFinished();
90+
break;
91+
}
92+
query = searcher.rewrite(new ConstantScoreQuery(query));
93+
} catch (Exception e) {
94+
warnings.registerException(e);
95+
continue;
8096
}
81-
query = searcher.rewrite(new ConstantScoreQuery(query));
8297
final var weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
8398
if (weight == null) {
8499
continue;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.compute.data.IntBlock;
3333
import org.elasticsearch.compute.data.IntVector;
3434
import org.elasticsearch.compute.data.Page;
35+
import org.elasticsearch.compute.operator.DriverContext;
36+
import org.elasticsearch.compute.operator.Warnings;
3537
import org.elasticsearch.core.IOUtils;
3638
import org.elasticsearch.index.mapper.KeywordFieldMapper;
3739
import org.elasticsearch.index.mapper.MappedFieldType;
@@ -120,7 +122,8 @@ public void testQueries() throws Exception {
120122
// 3 -> [] -> []
121123
// 4 -> [a1] -> [3]
122124
// 5 -> [] -> []
123-
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader);
125+
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
126+
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader, warnings);
124127
Page p0 = queryOperator.getOutput();
125128
assertNotNull(p0);
126129
assertThat(p0.getPositionCount(), equalTo(6));
@@ -187,7 +190,8 @@ public void testRandomMatchQueries() throws Exception {
187190
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
188191
var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms);
189192
int maxPageSize = between(1, 256);
190-
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader);
193+
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
194+
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader, warnings);
191195
Map<Integer, Set<Integer>> actualPositions = new HashMap<>();
192196
while (queryOperator.isFinished() == false) {
193197
Page page = queryOperator.getOutput();

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ public void testLookupIndex() throws IOException {
183183
DataType.KEYWORD,
184184
"lookup",
185185
"data",
186-
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG)))
186+
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
187+
Source.EMPTY
187188
);
188189
DriverContext driverContext = driverContext();
189190
try (

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public enum Cap {
274274
RANGEQUERY_FOR_DATETIME,
275275

276276
/**
277-
* Enforce strict type checking on ENRICH range types.
277+
* Enforce strict type checking on ENRICH range types, and warnings for KEYWORD parsing at runtime. Done in #115091.
278278
*/
279279
ENRICH_STRICT_RANGE_TYPES,
280280

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.compute.operator.DriverContext;
4242
import org.elasticsearch.compute.operator.Operator;
4343
import org.elasticsearch.compute.operator.OutputOperator;
44+
import org.elasticsearch.compute.operator.Warnings;
4445
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
4546
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
4647
import org.elasticsearch.compute.operator.lookup.QueryList;
@@ -77,6 +78,7 @@
7778
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
7879
import org.elasticsearch.xpack.esql.core.expression.Alias;
7980
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
81+
import org.elasticsearch.xpack.esql.core.tree.Source;
8082
import org.elasticsearch.xpack.esql.core.type.DataType;
8183
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
8284
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -326,11 +328,18 @@ private void doLookup(T request, CancellableTask task, ActionListener<Page> list
326328
releasables.add(mergePositionsOperator);
327329
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
328330
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
331+
var warnings = Warnings.createWarnings(
332+
DriverContext.WarningsMode.COLLECT,
333+
request.source.source().getLineNumber(),
334+
request.source.source().getColumnNumber(),
335+
request.source.text()
336+
);
329337
var queryOperator = new EnrichQuerySourceOperator(
330338
driverContext.blockFactory(),
331339
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
332340
queryList,
333-
searchExecutionContext.getIndexReader()
341+
searchExecutionContext.getIndexReader(),
342+
warnings
334343
);
335344
releasables.add(queryOperator);
336345
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
@@ -446,13 +455,22 @@ abstract static class Request {
446455
final DataType inputDataType;
447456
final Page inputPage;
448457
final List<NamedExpression> extractFields;
458+
final Source source;
449459

450-
Request(String sessionId, String index, DataType inputDataType, Page inputPage, List<NamedExpression> extractFields) {
460+
Request(
461+
String sessionId,
462+
String index,
463+
DataType inputDataType,
464+
Page inputPage,
465+
List<NamedExpression> extractFields,
466+
Source source
467+
) {
451468
this.sessionId = sessionId;
452469
this.index = index;
453470
this.inputDataType = inputDataType;
454471
this.inputPage = inputPage;
455472
this.extractFields = extractFields;
473+
this.source = source;
456474
}
457475
}
458476

@@ -462,6 +480,7 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
462480
final DataType inputDataType;
463481
final Page inputPage;
464482
final List<NamedExpression> extractFields;
483+
final Source source;
465484
// TODO: Remove this workaround once we have Block RefCount
466485
final Page toRelease;
467486
final RefCounted refs = AbstractRefCounted.of(this::releasePage);
@@ -472,14 +491,16 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
472491
DataType inputDataType,
473492
Page inputPage,
474493
Page toRelease,
475-
List<NamedExpression> extractFields
494+
List<NamedExpression> extractFields,
495+
Source source
476496
) {
477497
this.sessionId = sessionId;
478498
this.shardId = shardId;
479499
this.inputDataType = inputDataType;
480500
this.inputPage = inputPage;
481501
this.toRelease = toRelease;
482502
this.extractFields = extractFields;
503+
this.source = source;
483504
}
484505

485506
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.tasks.CancellableTask;
2020
import org.elasticsearch.xcontent.XContentBuilder;
2121
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
22+
import org.elasticsearch.xpack.esql.core.tree.Source;
2223
import org.elasticsearch.xpack.esql.core.type.DataType;
2324

2425
import java.io.IOException;
@@ -35,6 +36,7 @@ public final class EnrichLookupOperator extends AsyncOperator {
3536
private final String matchType;
3637
private final String matchField;
3738
private final List<NamedExpression> enrichFields;
39+
private final Source source;
3840
private long totalTerms = 0L;
3941

4042
public record Factory(
@@ -47,7 +49,8 @@ public record Factory(
4749
String enrichIndex,
4850
String matchType,
4951
String matchField,
50-
List<NamedExpression> enrichFields
52+
List<NamedExpression> enrichFields,
53+
Source source
5154
) implements OperatorFactory {
5255
@Override
5356
public String describe() {
@@ -75,7 +78,8 @@ public Operator get(DriverContext driverContext) {
7578
enrichIndex,
7679
matchType,
7780
matchField,
78-
enrichFields
81+
enrichFields,
82+
source
7983
);
8084
}
8185
}
@@ -91,7 +95,8 @@ public EnrichLookupOperator(
9195
String enrichIndex,
9296
String matchType,
9397
String matchField,
94-
List<NamedExpression> enrichFields
98+
List<NamedExpression> enrichFields,
99+
Source source
95100
) {
96101
super(driverContext, maxOutstandingRequests);
97102
this.sessionId = sessionId;
@@ -103,6 +108,7 @@ public EnrichLookupOperator(
103108
this.matchType = matchType;
104109
this.matchField = matchField;
105110
this.enrichFields = enrichFields;
111+
this.source = source;
106112
}
107113

108114
@Override
@@ -116,7 +122,8 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
116122
matchType,
117123
matchField,
118124
new Page(inputBlock),
119-
enrichFields
125+
enrichFields,
126+
source
120127
);
121128
enrichLookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
122129
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
3030
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
3131
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
32+
import org.elasticsearch.xpack.esql.core.tree.Source;
3233
import org.elasticsearch.xpack.esql.core.type.DataType;
3334
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3435
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
@@ -73,12 +74,14 @@ protected TransportRequest transportRequest(EnrichLookupService.Request request,
7374
request.matchField,
7475
request.inputPage,
7576
null,
76-
request.extractFields
77+
request.extractFields,
78+
request.source
7779
);
7880
}
7981

8082
@Override
8183
protected QueryList queryList(TransportRequest request, SearchExecutionContext context, Block inputBlock, DataType inputDataType) {
84+
validateTypes(inputDataType, context.getFieldType(request.matchField));
8285
MappedFieldType fieldType = context.getFieldType(request.matchField);
8386
return switch (request.matchType) {
8487
case "match", "range" -> termQueryList(fieldType, context, inputBlock, inputDataType);
@@ -104,6 +107,10 @@ private static void validateTypes(DataType inputDataType, MappedFieldType fieldT
104107
}
105108

106109
private static boolean rangeTypesCompatible(RangeType rangeType, DataType inputDataType) {
110+
if (inputDataType.noText() == DataType.KEYWORD) {
111+
// We allow runtime parsing of string types to numeric types
112+
return true;
113+
}
107114
return switch (rangeType) {
108115
case INTEGER, LONG -> inputDataType.isWholeNumber();
109116
case IP -> inputDataType == DataType.IP;
@@ -123,9 +130,10 @@ public static class Request extends AbstractLookupService.Request {
123130
String matchType,
124131
String matchField,
125132
Page inputPage,
126-
List<NamedExpression> extractFields
133+
List<NamedExpression> extractFields,
134+
Source source
127135
) {
128-
super(sessionId, index, inputDataType, inputPage, extractFields);
136+
super(sessionId, index, inputDataType, inputPage, extractFields, source);
129137
this.matchType = matchType;
130138
this.matchField = matchField;
131139
}
@@ -143,9 +151,10 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
143151
String matchField,
144152
Page inputPage,
145153
Page toRelease,
146-
List<NamedExpression> extractFields
154+
List<NamedExpression> extractFields,
155+
Source source
147156
) {
148-
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields);
157+
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields, source);
149158
this.matchType = matchType;
150159
this.matchField = matchField;
151160
}
@@ -165,6 +174,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
165174
}
166175
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
167176
List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
177+
var source = Source.EMPTY;
178+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENRICH_RUNTIME_WARNINGS)) {
179+
source = Source.readFrom(planIn);
180+
}
168181
TransportRequest result = new TransportRequest(
169182
sessionId,
170183
shardId,
@@ -173,7 +186,8 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
173186
matchField,
174187
inputPage,
175188
inputPage,
176-
extractFields
189+
extractFields,
190+
source
177191
);
178192
result.setParentTask(parentTaskId);
179193
return result;
@@ -192,6 +206,9 @@ public void writeTo(StreamOutput out) throws IOException {
192206
out.writeWriteable(inputPage);
193207
PlanStreamOutput planOut = new PlanStreamOutput(out, null);
194208
planOut.writeNamedWriteableCollection(extractFields);
209+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENRICH_RUNTIME_WARNINGS)) {
210+
source.writeTo(planOut);
211+
}
195212
}
196213

197214
@Override

0 commit comments

Comments
 (0)