Skip to content

Commit e6c5dcc

Browse files
authored
ESQL: Fuse MV_MIN and MV_MAX and document process (#138029)
Adds REST tests for the `percentiles_bucket` pipeline bucket aggregation. This gives us forwards and backwards compatibility tests for these aggs as well as mixed version cluster tests for these aggs. Relates to #26220
1 parent 6783135 commit e6c5dcc

File tree

21 files changed

+1661
-23
lines changed

21 files changed

+1661
-23
lines changed

docs/changelog/138029.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138029
2+
summary: Fuse MV_MIN and MV_MAX and document process
3+
area: ES|QL
4+
type: feature
5+
issues: []

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,9 @@ tests:
438438
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT
439439
method: testLookupExplosionBigString
440440
issue: https://github.com/elastic/elasticsearch/issues/138510
441+
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
442+
method: test {csv-spec:inlinestats.MvMinMvExpand}
443+
issue: https://github.com/elastic/elasticsearch/issues/137679
441444
- class: org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SubstituteRoundToTests
442445
method: testSubqueryWithCountStarAndDateTrunc {default}
443446
issue: https://github.com/elastic/elasticsearch/issues/138601

server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java

Lines changed: 158 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.core.Nullable;
1818
import org.elasticsearch.core.Releasable;
1919
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
20+
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader;
21+
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinLongsFromDocValuesBlockLoader;
2022
import org.elasticsearch.search.fetch.StoredFieldsSpec;
2123
import org.elasticsearch.search.lookup.Source;
2224

@@ -25,8 +27,139 @@
2527
import java.util.Map;
2628

2729
/**
28-
* Interface for loading data in a block shape. Instances of this class
29-
* must be immutable and thread safe.
30+
* Loads values from a chunk of lucene documents into a "Block" for the compute engine.
31+
* <p>
32+
* Think of a Block as an array of values for a sequence of lucene documents. That's
33+
* almost true! For the purposes of implementing {@link BlockLoader}, it's close enough.
34+
* The compute engine operates on arrays because the good folks that build CPUs have
35+
* spent the past 40 years making them really really good at running tight loops over
36+
* arrays of data. So we play along with the CPU and make arrays.
37+
* </p>
38+
* <h2>How to implement</h2>
39+
* <p>
40+
* There are a lot of interesting choices hiding in here to make getting those arrays
41+
* out of lucene work well:
42+
* </p>
43+
* <ul>
44+
* <li>
45+
* {@code doc_values} are already on disk in array-like structures so we prefer
46+
* to just copy them into an array in one loop inside {@link ColumnAtATimeReader}.
47+
* Well, not entirely array-like. {@code doc_values} are designed to be read in
48+
* non-descending order (think {@code 0, 1, 1, 4, 9}) and will fail if they are
49+
* read truly randomly. This lets the doc values implementations have some
50+
* chunking/compression/magic on top of the array-like on disk structure. The
51+
* caller manages this, always putting {@link Docs} in non-descending order.
52+
* Extend {@link BlockDocValuesReader} to implement all this.
53+
* </li>
54+
* <li>
55+
* All stored {@code stored} fields for each document are stored on disk together,
56+
* compressed with a general purpose compression algorithm like
57+
* <a href="https://en.wikipedia.org/wiki/Zstd">Zstd</a>. Blocks of documents are
58+
* compressed together to get a better compression ratio. Just like doc values,
59+
* we read them in non-descending order. Unlike doc values, we read all fields for a
60+
* document at once. Because reading one requires decompressing them all. We do
61+
* this by returning {@code null} from {@link BlockLoader#columnAtATimeReader}
62+
* to signal that we can't load the whole column at once. Instead, we implement a
63+
* {@link RowStrideReader} which the caller will call once for each doc. Extend
64+
* {@link BlockStoredFieldsReader} to implement all this.
65+
* </li>
66+
* <li>
67+
* Fields loaded from {@code _source} are an extra special case of {@code stored}
68+
* fields. {@code _source} itself is just another stored field, compressed in chunks
69+
* with all the other stored fields. It's the original bytes sent when indexing the
70+
* document. Think {@code json} or {@code yaml}. When we need fields from
71+
* {@code _source} we get it from the stored fields reader infrastructure and then
72+
* explode it into a {@link Map} representing the original {@code json} and
73+
* the {@link RowStrideReader} implementation grabs the parts of the {@code json}
74+
* it needs. Extend {@link BlockSourceReader} to implement all this.
75+
* </li>
76+
* <li>
77+
* Synthetic {@code _source} complicates this further by storing fields in somewhat
78+
* unexpected places, but is otherwise like a {@code stored} field reader. Use
79+
* {@link FallbackSyntheticSourceBlockLoader} to implement all this.
80+
* </li>
81+
* </ul>
82+
* <h2>How many to implement</h2>
83+
* <p>
84+
* Generally reads are faster from {@code doc_values}, slower from {@code stored} fields,
85+
* and even slower from {@code _source}. If we get to chose, we pick {@code doc_values}.
86+
* But we work with what's on disk and that's a product of the field type and what the user's
87+
* configured. Picking the optimal choice given what's on disk is the responsibility of each
88+
* field's {@link MappedFieldType#blockLoader} method. The more configurable the field's
89+
* storage strategies the more {@link BlockLoader}s you have to implement to integrate it
90+
* with ESQL. It can get to be a lot. Sorry.
91+
* </p>
92+
* <p>
93+
* For a field to be supported by ESQL fully it has to be loadable if it was configured to be
94+
* stored in any way. It's possible to turn off storage entirely by turning off
95+
* {@code doc_values} and {@code _source} and {@code stored} fields. In that case, it's
96+
* acceptable to return {@link ConstantNullsReader}. User turned the field off, best we can do
97+
* is {@code null}.
98+
* </p>
99+
* <p>
100+
* We also sometimes want to "push" executing some ESQL functions into the block loader itself.
101+
* Usually we do this when it's a ton faster. See the docs for {@code BlockLoaderExpression}
102+
* for why and how we do this.
103+
* </p>
104+
* <p>
105+
* For example, {@code long} fields implement these block loaders:
106+
* </p>
107+
* <ul>
108+
* <li>
109+
* {@link org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader} to read
110+
* from {@code doc_values}.
111+
* </li>
112+
* <li>
113+
* {@link org.elasticsearch.index.mapper.BlockSourceReader.LongsBlockLoader} to read from
114+
* {@code _source}.
115+
* </li>
116+
* <li>
117+
* A specially configured {@link FallbackSyntheticSourceBlockLoader} to read synthetic
118+
* {@code _source}.
119+
* </li>
120+
* <li>
121+
* {@link MvMinLongsFromDocValuesBlockLoader} to read {@code MV_MIN(long_field)} from
122+
* {@code doc_values}.
123+
* </li>
124+
* <li>
125+
* {@link MvMaxLongsFromDocValuesBlockLoader} to read {@code MV_MAX(long_field)} from
126+
* {@code doc_values}.
127+
* </li>
128+
* </ul>
129+
* <p>
130+
* NOTE: We can't read from {@code long}s from {@code stored} fields which is a
131+
* <a href="https://github.com/elastic/elasticsearch/issues/138019">bug</a>, but maybe not
132+
* a terrible one because it's very uncommon to configure {@code long} to be {@code stored}
133+
* but to disable {@code _source} and {@code doc_values}. Nothing's perfect. Especially
134+
* code.
135+
* </p>
136+
* <h2>Why is {@link AllReader}?</h2>
137+
* <p>
138+
* When we described how to read from {@code doc_values} we said we <strong>prefer</strong>
139+
* to use {@link ColumnAtATimeReader}. But some callers don't support reading column-at-a-time
140+
* and need to read row-by-row. So we also need an implementation of {@link RowStrideReader}
141+
* that reads from {@code doc_values}. Usually it's most convenient to implement both of those
142+
* in the same {@code class}. {@link AllReader} is an interface for those sorts of classes, and
143+
* you'll see it in the {@code doc_values} code frequently.
144+
* </p>
145+
* <h2>Why is {@link #rowStrideStoredFieldSpec}?</h2>
146+
* <p>
147+
* When decompressing {@code stored} fields lucene can skip stored field that aren't used. They
148+
* still have to be decompressed, but they aren't turned into java objects which saves a fair bit
149+
* of work. If you don't need any stored fields return {@link StoredFieldsSpec#NO_REQUIREMENTS}.
150+
* Otherwise, return what you need.
151+
* </p>
152+
* <h2>Thread safety</h2>
153+
* <p>
154+
* Instances of this class must be immutable and thread safe. Instances of
155+
* {@link ColumnAtATimeReader} and {@link RowStrideReader} are all mutable and can only
156+
* be accessed by one thread at a time but <strong>may</strong> be passed between threads.
157+
* See implementations {@link Reader#canReuse} for how that's handled. "Normal" java objects
158+
* don't need to do anything special to be kicked from thread to thread - the transfer itself
159+
* establishes a {@code happens-before} relationship that makes everything you need visible.
160+
* But Lucene's readers aren't "normal" java objects and sometimes need to be rebuilt if we
161+
* shift threads.
162+
* </p>
30163
*/
31164
public interface BlockLoader {
32165
/**
@@ -115,10 +248,26 @@ interface StoredFields {
115248
Map<String, List<Object>> storedFields() throws IOException;
116249
}
117250

251+
/**
252+
* Build a column-at-a-time reader. <strong>May</strong> return {@code null}
253+
* if the underlying storage needs to be loaded row-by-row. Callers should try
254+
* this first, only falling back to {@link #rowStrideReader} if this returns
255+
* {@code null} or if they can't load column-at-a-time themselves.
256+
*/
257+
@Nullable
118258
ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException;
119259

260+
/**
261+
* Build a row-by-row reader. Must <strong>never</strong> return {@code null},
262+
* evan if the underlying storage prefers to be loaded column-at-a-time. Some
263+
* callers simply can't load column-at-a-time so all implementations must support
264+
* this method.
265+
*/
120266
RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException;
121267

268+
/**
269+
* What {@code stored} fields are needed by this reader.
270+
*/
122271
StoredFieldsSpec rowStrideStoredFieldSpec();
123272

124273
/**
@@ -540,8 +689,13 @@ Block buildExponentialHistogramBlockDirect(
540689
}
541690

542691
/**
543-
* Marker interface for block results. The compute engine has a fleshed
544-
* out implementation.
692+
* A columnar representation of homogenous data. It has a position (row) count, and
693+
* various data retrieval methods for accessing the underlying data that is stored at a given
694+
* position. In other words, a fancy wrapper over an array.
695+
* <p>
696+
* <strong>This</strong> is just a marker interface for these results. The compute engine
697+
* has fleshed out implementations.
698+
* </p>
545699
*/
546700
interface Block extends Releasable {}
547701

server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@
3333
import org.elasticsearch.index.fielddata.FieldDataContext;
3434
import org.elasticsearch.index.fielddata.IndexFieldData;
3535
import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
36+
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
3637
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader;
38+
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader;
39+
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader;
3740
import org.elasticsearch.index.query.SearchExecutionContext;
3841
import org.elasticsearch.script.IpFieldScript;
3942
import org.elasticsearch.script.Script;
@@ -457,7 +460,15 @@ public static Query rangeQuery(
457460
@Override
458461
public BlockLoader blockLoader(BlockLoaderContext blContext) {
459462
if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) {
460-
return new BytesRefsFromOrdsBlockLoader(name());
463+
BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig();
464+
if (cfg == null) {
465+
return new BytesRefsFromOrdsBlockLoader(name());
466+
}
467+
return switch (cfg.function()) {
468+
case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name());
469+
case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name());
470+
default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]");
471+
};
461472
}
462473

463474
if (isStored()) {
@@ -475,6 +486,17 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
475486
return new BlockSourceReader.IpsBlockLoader(sourceValueFetcher(blContext), lookup);
476487
}
477488

489+
@Override
490+
public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) {
491+
if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) {
492+
return switch (config.function()) {
493+
case MV_MAX, MV_MIN -> true;
494+
default -> false;
495+
};
496+
}
497+
return true;
498+
}
499+
478500
private BlockLoader blockLoaderFromFallbackSyntheticSource(BlockLoaderContext blContext) {
479501
var reader = new IpFallbackSyntheticSourceReader(nullValue);
480502
return new FallbackSyntheticSourceBlockLoader(

server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
6464
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
6565
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader;
66+
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader;
67+
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader;
6668
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.Utf8CodePointsFromOrdsBlockLoader;
6769
import org.elasticsearch.index.query.AutomatonQueryWithDescription;
6870
import org.elasticsearch.index.query.SearchExecutionContext;
@@ -735,10 +737,13 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
735737
if (cfg == null) {
736738
return new BytesRefsFromOrdsBlockLoader(name());
737739
}
738-
if (cfg.function() == BlockLoaderFunctionConfig.Function.LENGTH) {
739-
return new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name());
740-
}
741-
throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]");
740+
return switch (cfg.function()) {
741+
case LENGTH -> new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name());
742+
case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name());
743+
case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name());
744+
default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]");
745+
};
746+
742747
}
743748
if (blContext.blockLoaderFunctionConfig() != null) {
744749
throw new UnsupportedOperationException("function fusing only supported for doc values");
@@ -768,7 +773,10 @@ public Builder builder(BlockFactory factory, int expectedCount) {
768773
@Override
769774
public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) {
770775
if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSourceEnabled())) {
771-
return config.function() == BlockLoaderFunctionConfig.Function.LENGTH;
776+
return switch (config.function()) {
777+
case LENGTH, MV_MAX, MV_MIN -> true;
778+
default -> false;
779+
};
772780
}
773781
return false;
774782
}

0 commit comments

Comments
 (0)