Skip to content

Commit e6a89ed

Browse files
ESQL: Push functions to field loads in primaries (#138544)
Only push functions into field loads from primary indices. LOOKUP will need an entirely different infrastructure for pushing functions into field loads. Relates to #137679 Co-authored-by: Carlos Delgado <[email protected]>
1 parent df9f1f9 commit e6a89ed

File tree

3 files changed

+334
-92
lines changed

3 files changed

+334
-92
lines changed

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java

Lines changed: 226 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,40 @@ public void testLengthNotPushedToText() throws IOException {
107107
);
108108
}
109109

110+
public void testVCosine() throws IOException {
111+
test(
112+
justType("dense_vector"),
113+
b -> b.startArray("test").value(128).value(128).value(0).endArray(),
114+
"| EVAL test = V_COSINE(test, [0, 255, 255])",
115+
matchesList().item(0.5),
116+
matchesMap().entry("test:column_at_a_time:FloatDenseVectorFromDocValues.Normalized.V_COSINE", 1)
117+
);
118+
}
119+
120+
public void testVHammingToByte() throws IOException {
121+
test(
122+
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "byte").endObject(),
123+
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
124+
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
125+
matchesList().item(6.0),
126+
matchesMap().entry("test:column_at_a_time:ByteDenseVectorFromDocValues.V_HAMMING", 1)
127+
);
128+
}
129+
130+
public void testVHammingToBit() throws IOException {
131+
test(
132+
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "bit").endObject(),
133+
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
134+
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
135+
matchesList().item(6.0),
136+
matchesMap().entry("test:column_at_a_time:BitDenseVectorFromDocValues.V_HAMMING", 1)
137+
);
138+
}
139+
140+
//
141+
// Tests for more complex shapes.
142+
//
143+
110144
/**
111145
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
112146
*/
@@ -120,8 +154,16 @@ public void testLengthNotPushedToLookupJoinKeyword() throws IOException {
120154
| EVAL test = LENGTH(test)
121155
""",
122156
matchesList().item(1),
123-
matchesMap().entry("main_matching:column_at_a_time:BytesRefsFromOrds.Singleton", 1), //
124-
sig -> {}
157+
matchesMap().entry("main_matching:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
158+
sig -> assertMap(
159+
sig,
160+
matchesList().item("LuceneSourceOperator")
161+
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
162+
.item("LookupOperator")
163+
.item("EvalOperator") // this one just renames the field
164+
.item("AggregationOperator")
165+
.item("ExchangeSinkOperator")
166+
)
125167
);
126168
}
127169

@@ -131,7 +173,6 @@ public void testLengthNotPushedToLookupJoinKeyword() throws IOException {
131173
* querying it.
132174
*/
133175
public void testLengthNotPushedToLookupJoinKeywordSameName() throws IOException {
134-
assumeFalse("fix in 137679 - we push to the index but that's just wrong!", true);
135176
String value = "v".repeat(between(0, 256));
136177
initLookupIndex();
137178
test(b -> {
@@ -144,40 +185,197 @@ public void testLengthNotPushedToLookupJoinKeywordSameName() throws IOException
144185
| LOOKUP JOIN lookup ON matching == main_matching
145186
| EVAL test = LENGTH(test)
146187
""",
147-
matchesList().item(1), // <--- This is incorrectly returning value.length()
188+
matchesList().item(1),
148189
matchesMap().entry("main_matching:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
149-
// ^^^^ This is incorrectly returning test:column_at_a_time:Utf8CodePointsFromOrds.Singleton
150-
sig -> {}
190+
sig -> assertMap(
191+
sig,
192+
matchesList().item("LuceneSourceOperator")
193+
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
194+
.item("LookupOperator")
195+
.item("EvalOperator") // this one just renames the field
196+
.item("AggregationOperator")
197+
.item("ExchangeSinkOperator")
198+
)
151199
);
152200
}
153201

154-
public void testVCosine() throws IOException {
202+
/**
203+
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
204+
*/
205+
public void testLengthPushedInsideInlineStats() throws IOException {
206+
String value = "v".repeat(between(0, 256));
155207
test(
156-
justType("dense_vector"),
157-
b -> b.startArray("test").value(128).value(128).value(0).endArray(),
158-
"| EVAL test = V_COSINE(test, [0, 255, 255])",
159-
matchesList().item(0.5),
160-
matchesMap().entry("test:column_at_a_time:FloatDenseVectorFromDocValues.Normalized.V_COSINE", 1)
208+
justType("keyword"),
209+
b -> b.field("test", value),
210+
"""
211+
| INLINE STATS max_length = MAX(LENGTH(test))
212+
| EVAL test = LENGTH(test)
213+
| WHERE test == max_length
214+
""",
215+
matchesList().item(value.length()),
216+
matchesMap().entry("test:column_at_a_time:Utf8CodePointsFromOrds.Singleton", 1),
217+
sig -> {
218+
// There are two data node plans, one for each phase.
219+
if (sig.contains("FilterOperator")) {
220+
assertMap(
221+
sig,
222+
matchesList().item("LuceneSourceOperator")
223+
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
224+
.item("FilterOperator")
225+
.item("EvalOperator") // this one just renames the field
226+
.item("AggregationOperator")
227+
.item("ExchangeSinkOperator")
228+
);
229+
} else {
230+
assertMap(
231+
sig,
232+
matchesList().item("LuceneSourceOperator")
233+
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
234+
.item("EvalOperator") // this one just renames the field
235+
.item("AggregationOperator")
236+
.item("ExchangeSinkOperator")
237+
);
238+
}
239+
}
161240
);
162241
}
163242

164-
public void testVHammingToByte() throws IOException {
243+
/**
244+
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
245+
*/
246+
public void testLengthNotPushedToInlineStatsResults() throws IOException {
247+
String value = "v".repeat(between(0, 256));
248+
test(justType("keyword"), b -> b.field("test", value), """
249+
| INLINE STATS test2 = VALUES(test)
250+
| EVAL test = LENGTH(test2)
251+
""", matchesList().item(value.length()), matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1), sig -> {
252+
// There are two data node plans, one for each phase.
253+
if (sig.contains("EvalOperator")) {
254+
assertMap(
255+
sig,
256+
matchesList().item("LuceneSourceOperator")
257+
.item("EvalOperator") // The second phase of the INLINE STATS
258+
.item("AggregationOperator")
259+
.item("ExchangeSinkOperator")
260+
);
261+
} else {
262+
assertMap(
263+
sig,
264+
matchesList().item("LuceneSourceOperator")
265+
.item("ValuesSourceReaderOperator")
266+
.item("AggregationOperator")
267+
.item("ExchangeSinkOperator")
268+
);
269+
}
270+
});
271+
}
272+
273+
/**
274+
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
275+
*/
276+
public void testLengthNotPushedToGroupedInlineStatsResults() throws IOException {
277+
String value = "v".repeat(between(0, 256));
278+
CheckedConsumer<XContentBuilder, IOException> mapping = b -> {
279+
b.startObject("test").field("type", "keyword").endObject();
280+
b.startObject("group").field("type", "keyword").endObject();
281+
};
282+
test(mapping, b -> b.field("test", value).field("group", "g"), """
283+
| INLINE STATS test2 = VALUES(test) BY group
284+
| EVAL test = LENGTH(test2)
285+
""", matchesList().item(value.length()), matchesMap().extraOk(), sig -> {
286+
// There are two data node plans, one for each phase.
287+
if (sig.contains("EvalOperator")) {
288+
assertMap(
289+
sig,
290+
matchesList().item("LuceneSourceOperator")
291+
.item("ValuesSourceReaderOperator")
292+
.item("RowInTableLookup")
293+
.item("ColumnLoad")
294+
.item("ProjectOperator")
295+
.item("EvalOperator")
296+
.item("AggregationOperator")
297+
.item("ExchangeSinkOperator")
298+
);
299+
} else {
300+
assertMap(
301+
sig,
302+
matchesList().item("LuceneSourceOperator")
303+
.item("ValuesSourceReaderOperator")
304+
.item("HashAggregationOperator")
305+
.item("ExchangeSinkOperator")
306+
);
307+
}
308+
});
309+
}
310+
311+
/**
312+
* LENGTH not pushed when on a fork branch.
313+
*/
314+
public void testLengthNotPushedToFork() throws IOException {
315+
String value = "v".repeat(between(0, 256));
165316
test(
166-
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "byte").endObject(),
167-
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
168-
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
169-
matchesList().item(6.0),
170-
matchesMap().entry("test:column_at_a_time:ByteDenseVectorFromDocValues.V_HAMMING", 1)
317+
justType("keyword"),
318+
b -> b.field("test", value),
319+
"""
320+
| FORK
321+
(EVAL test = LENGTH(test) + 1)
322+
(EVAL test = LENGTH(test) + 2)
323+
""",
324+
matchesList().item(List.of(value.length() + 1, value.length() + 2)),
325+
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
326+
sig -> assertMap(
327+
sig,
328+
matchesList().item("LuceneSourceOperator")
329+
.item("ValuesSourceReaderOperator")
330+
.item("ProjectOperator")
331+
.item("ExchangeSinkOperator")
332+
)
171333
);
172334
}
173335

174-
public void testVHammingToBit() throws IOException {
336+
public void testLengthNotPushedBeforeFork() throws IOException {
337+
String value = "v".repeat(between(0, 256));
175338
test(
176-
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "bit").endObject(),
177-
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
178-
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
179-
matchesList().item(6.0),
180-
matchesMap().entry("test:column_at_a_time:BitDenseVectorFromDocValues.V_HAMMING", 1)
339+
justType("keyword"),
340+
b -> b.field("test", value),
341+
"""
342+
| EVAL test = LENGTH(test)
343+
| FORK
344+
(EVAL j = 1)
345+
(EVAL j = 2)
346+
""",
347+
matchesList().item(value.length()),
348+
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
349+
sig -> assertMap(
350+
sig,
351+
matchesList().item("LuceneSourceOperator")
352+
.item("ValuesSourceReaderOperator")
353+
.item("ProjectOperator")
354+
.item("ExchangeSinkOperator")
355+
)
356+
);
357+
}
358+
359+
public void testLengthNotPushedAfterFork() throws IOException {
360+
String value = "v".repeat(between(0, 256));
361+
test(
362+
justType("keyword"),
363+
b -> b.field("test", value),
364+
"""
365+
| FORK
366+
(EVAL j = 1)
367+
(EVAL j = 2)
368+
| EVAL test = LENGTH(test)
369+
""",
370+
matchesList().item(value.length()),
371+
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
372+
sig -> assertMap(
373+
sig,
374+
matchesList().item("LuceneSourceOperator")
375+
.item("ValuesSourceReaderOperator")
376+
.item("ProjectOperator")
377+
.item("ExchangeSinkOperator")
378+
)
181379
);
182380
}
183381

@@ -217,7 +415,7 @@ private void test(
217415
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query("""
218416
FROM test
219417
""" + eval + """
220-
| STATS test = VALUES(test)
418+
| STATS test = MV_SORT(VALUES(test))
221419
""");
222420
/*
223421
* TODO if you just do KEEP test then the load is in the data node reduce driver and not merged:
@@ -265,6 +463,9 @@ private void test(
265463
}
266464
case "node_reduce" -> logger.info("node_reduce {}", sig);
267465
case "final" -> logger.info("final {}", sig);
466+
case "main.final" -> logger.info("main final {}", sig);
467+
case "subplan-0.final" -> logger.info("subplan-0 final {}", sig);
468+
case "subplan-1.final" -> logger.info("subplan-1 final {}", sig);
268469
default -> throw new IllegalArgumentException("can't match " + description);
269470
}
270471
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/BlockLoaderWarnings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ public void registerException(Class<? extends Exception> exceptionClass, String
3939
}
4040
delegate.registerException(exceptionClass, message);
4141
}
42+
43+
@Override
44+
public String toString() {
45+
return "warnings for " + source;
46+
}
4247
}

0 commit comments

Comments
 (0)