Skip to content

Commit 474c286

Browse files
authored
Block loader and compute interface for t-digest field (elastic#138246)
Add support for a t-digest block type, representing the t-digest field type
1 parent d9e286a commit 474c286

File tree

12 files changed

+596
-10
lines changed

12 files changed

+596
-10
lines changed

server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,8 @@ Block buildExponentialHistogramBlockDirect(
535535
Block zeroThresholds,
536536
Block encodedHistograms
537537
);
538+
539+
Block buildTDigestBlockDirect(Block encodedDigests, Block minima, Block maxima, Block sums, Block valueCounts);
538540
}
539541

540542
/**
@@ -697,4 +699,16 @@ interface ExponentialHistogramBuilder extends Builder {
697699

698700
BytesRefBuilder encodedHistograms();
699701
}
702+
703+
interface TDigestBuilder extends Builder {
704+
DoubleBuilder minima();
705+
706+
DoubleBuilder maxima();
707+
708+
DoubleBuilder sums();
709+
710+
LongBuilder valueCounts();
711+
712+
BytesRefBuilder encodedDigests();
713+
}
700714
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public static TDigestState create(CircuitBreaker breaker, double compression) {
8484
}
8585
}
8686

87-
static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) {
87+
public static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) {
8888
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-with-type");
8989
try {
9090
return new TDigestState(breaker, type, compression);

test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,42 @@ public BlockLoader.Block buildExponentialHistogramBlockDirect(
517517
encodedHistograms
518518
);
519519
}
520+
521+
@Override
522+
public BlockLoader.Block buildTDigestBlockDirect(
523+
BlockLoader.Block encodedDigests,
524+
BlockLoader.Block minima,
525+
BlockLoader.Block maxima,
526+
BlockLoader.Block sums,
527+
BlockLoader.Block valueCounts
528+
) {
529+
TestBlock minBlock = (TestBlock) minima;
530+
TestBlock maxBlock = (TestBlock) maxima;
531+
TestBlock sumBlock = (TestBlock) sums;
532+
TestBlock countBlock = (TestBlock) valueCounts;
533+
TestBlock digestBlock = (TestBlock) encodedDigests;
534+
535+
assert minBlock.size() == digestBlock.size();
536+
assert maxBlock.size() == digestBlock.size();
537+
assert sumBlock.size() == digestBlock.size();
538+
assert countBlock.size() == digestBlock.size();
539+
540+
var values = new ArrayList<>(minBlock.size());
541+
542+
for (int i = 0; i < minBlock.size(); i++) {
543+
// we need to represent this complex block somehow
544+
HashMap<String, Object> value = new HashMap<>();
545+
value.put("min", minBlock.values.get(i));
546+
value.put("max", maxBlock.values.get(i));
547+
value.put("sum", sumBlock.values.get(i));
548+
value.put("value_count", countBlock.values.get(i));
549+
value.put("encoded_digest", digestBlock.values.get(i));
550+
551+
values.add(value);
552+
}
553+
554+
return new TestBlock(values);
555+
}
520556
};
521557
}
522558

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.analytics.mapper;
9+
10+
import org.apache.lucene.index.LeafReaderContext;
11+
import org.elasticsearch.core.Releasables;
12+
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
13+
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromBinaryBlockLoader;
14+
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
15+
import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader;
16+
17+
import java.io.IOException;
18+
19+
public class TDigestBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader {
20+
private final DoublesBlockLoader minimaLoader;
21+
private final DoublesBlockLoader maximaLoader;
22+
private final DoublesBlockLoader sumsLoader;
23+
private final LongsBlockLoader valueCountsLoader;
24+
private final BytesRefsFromBinaryBlockLoader encodedDigestLoader;
25+
26+
public TDigestBlockLoader(
27+
BytesRefsFromBinaryBlockLoader encodedDigestLoader,
28+
DoublesBlockLoader minimaLoader,
29+
DoublesBlockLoader maximaLoader,
30+
DoublesBlockLoader sumsLoader,
31+
LongsBlockLoader valueCountsLoader
32+
) {
33+
this.encodedDigestLoader = encodedDigestLoader;
34+
this.minimaLoader = minimaLoader;
35+
this.maximaLoader = maximaLoader;
36+
this.sumsLoader = sumsLoader;
37+
this.valueCountsLoader = valueCountsLoader;
38+
}
39+
40+
@Override
41+
public AllReader reader(LeafReaderContext context) throws IOException {
42+
AllReader encodedDigestReader = encodedDigestLoader.reader(context);
43+
AllReader minimaReader = minimaLoader.reader(context);
44+
AllReader maximaReader = maximaLoader.reader(context);
45+
AllReader sumsReader = sumsLoader.reader(context);
46+
AllReader valueCountsReader = valueCountsLoader.reader(context);
47+
48+
return new TDigestReader(encodedDigestReader, minimaReader, maximaReader, sumsReader, valueCountsReader);
49+
}
50+
51+
@Override
52+
public Builder builder(BlockFactory factory, int expectedCount) {
53+
return null;
54+
}
55+
56+
static class TDigestReader implements AllReader {
57+
58+
private final AllReader encodedDigestReader;
59+
private final AllReader minimaReader;
60+
private final AllReader maximaReader;
61+
private final AllReader sumsReader;
62+
private final AllReader valueCountsReader;
63+
64+
TDigestReader(
65+
AllReader encodedDigestReader,
66+
AllReader minimaReader,
67+
AllReader maximaReader,
68+
AllReader sumsReader,
69+
AllReader valueCountsReader
70+
) {
71+
this.encodedDigestReader = encodedDigestReader;
72+
this.minimaReader = minimaReader;
73+
this.maximaReader = maximaReader;
74+
this.sumsReader = sumsReader;
75+
this.valueCountsReader = valueCountsReader;
76+
}
77+
78+
@Override
79+
public boolean canReuse(int startingDocID) {
80+
return minimaReader.canReuse(startingDocID)
81+
&& maximaReader.canReuse(startingDocID)
82+
&& sumsReader.canReuse(startingDocID)
83+
&& valueCountsReader.canReuse(startingDocID)
84+
&& encodedDigestReader.canReuse(startingDocID);
85+
}
86+
87+
@Override
88+
// Column oriented reader
89+
public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
90+
Block minima = null;
91+
Block maxima = null;
92+
Block sums = null;
93+
Block valueCounts = null;
94+
Block encodedBytes = null;
95+
Block result;
96+
boolean success = false;
97+
try {
98+
minima = minimaReader.read(factory, docs, offset, nullsFiltered);
99+
maxima = maximaReader.read(factory, docs, offset, nullsFiltered);
100+
sums = sumsReader.read(factory, docs, offset, nullsFiltered);
101+
valueCounts = valueCountsReader.read(factory, docs, offset, nullsFiltered);
102+
encodedBytes = encodedDigestReader.read(factory, docs, offset, nullsFiltered);
103+
result = factory.buildTDigestBlockDirect(encodedBytes, minima, maxima, sums, valueCounts);
104+
success = true;
105+
} finally {
106+
if (success == false) {
107+
Releasables.close(minima, maxima, sums, valueCounts, encodedBytes);
108+
}
109+
}
110+
return result;
111+
}
112+
113+
@Override
114+
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
115+
ExponentialHistogramBuilder histogramBuilder = (ExponentialHistogramBuilder) builder;
116+
minimaReader.read(docId, storedFields, histogramBuilder.minima());
117+
maximaReader.read(docId, storedFields, histogramBuilder.maxima());
118+
sumsReader.read(docId, storedFields, histogramBuilder.sums());
119+
valueCountsReader.read(docId, storedFields, histogramBuilder.valueCounts());
120+
encodedDigestReader.read(docId, storedFields, histogramBuilder.encodedHistograms());
121+
}
122+
123+
@Override
124+
public String toString() {
125+
return "BlockDocValuesReader.TDigest";
126+
}
127+
}
128+
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2424
import org.elasticsearch.common.util.BigArrays;
2525
import org.elasticsearch.common.util.FeatureFlag;
26+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
2627
import org.elasticsearch.index.fielddata.FieldDataContext;
2728
import org.elasticsearch.index.fielddata.FormattedDocValues;
2829
import org.elasticsearch.index.fielddata.HistogramValue;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.index.fielddata.IndexHistogramFieldData;
3334
import org.elasticsearch.index.fielddata.LeafHistogramFieldData;
3435
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
36+
import org.elasticsearch.index.mapper.BlockLoader;
3537
import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader;
3638
import org.elasticsearch.index.mapper.DocumentParserContext;
3739
import org.elasticsearch.index.mapper.DocumentParsingException;
@@ -42,6 +44,9 @@
4244
import org.elasticsearch.index.mapper.MapperBuilderContext;
4345
import org.elasticsearch.index.mapper.SourceValueFetcher;
4446
import org.elasticsearch.index.mapper.ValueFetcher;
47+
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromBinaryBlockLoader;
48+
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
49+
import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader;
4550
import org.elasticsearch.index.query.SearchExecutionContext;
4651
import org.elasticsearch.script.field.DocValuesScriptFieldFactory;
4752
import org.elasticsearch.search.DocValueFormat;
@@ -58,6 +63,7 @@
5863
import java.io.IOException;
5964
import java.io.UncheckedIOException;
6065
import java.util.Map;
66+
import java.util.Objects;
6167

6268
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
6369

@@ -79,13 +85,13 @@ private static TDigestFieldMapper toType(FieldMapper in) {
7985
}
8086

8187
public static class Builder extends FieldMapper.Builder {
82-
private static final int DEFAULT_COMPRESSION = 100;
83-
private static final int MAXIMUM_COMPRESSION = 10000;
88+
private static final double DEFAULT_COMPRESSION = 100d;
89+
private static final double MAXIMUM_COMPRESSION = 10000d;
8490

8591
private final Parameter<Map<String, String>> meta = Parameter.metaParam();
8692
private final Parameter<Explicit<Boolean>> ignoreMalformed;
8793
private final Parameter<TDigestState.Type> digestType;
88-
private final Parameter<Integer> compression;
94+
private final Parameter<Double> compression;
8995

9096
public Builder(String name, boolean ignoreMalformedByDefault) {
9197
super(name);
@@ -102,7 +108,15 @@ public Builder(String name, boolean ignoreMalformedByDefault) {
102108
TDigestState.Type.HYBRID,
103109
TDigestState.Type.class
104110
);
105-
this.compression = Parameter.intParam("compression", false, m -> toType(m).compression, DEFAULT_COMPRESSION).addValidator(c -> {
111+
this.compression = new Parameter<>(
112+
"compression",
113+
false,
114+
() -> DEFAULT_COMPRESSION,
115+
(n, c1, o) -> XContentMapValues.nodeDoubleValue(o),
116+
m -> toType(m).compression,
117+
XContentBuilder::field,
118+
Objects::toString
119+
).addValidator(c -> {
106120
if (c <= 0 || c > MAXIMUM_COMPRESSION) {
107121
throw new IllegalArgumentException(
108122
"compression must be a positive integer between 1 and " + MAXIMUM_COMPRESSION + " was [" + c + "]"
@@ -135,7 +149,7 @@ public TDigestFieldMapper build(MapperBuilderContext context) {
135149
private final Explicit<Boolean> ignoreMalformed;
136150
private final boolean ignoreMalformedByDefault;
137151
private final TDigestState.Type digestType;
138-
private final int compression;
152+
private final double compression;
139153

140154
public TDigestFieldMapper(String simpleName, MappedFieldType mappedFieldType, BuilderParams builderParams, Builder builder) {
141155
super(simpleName, mappedFieldType, builderParams);
@@ -154,7 +168,7 @@ public TDigestState.Type digestType() {
154168
return digestType;
155169
}
156170

157-
public int compression() {
171+
public double compression() {
158172
return compression;
159173
}
160174

@@ -184,6 +198,18 @@ public String typeName() {
184198
return CONTENT_TYPE;
185199
}
186200

201+
@Override
202+
public BlockLoader blockLoader(BlockLoaderContext blContext) {
203+
DoublesBlockLoader minimaLoader = new DoublesBlockLoader(valuesMinSubFieldName(name()), NumericUtils::sortableLongToDouble);
204+
DoublesBlockLoader maximaLoader = new DoublesBlockLoader(valuesMaxSubFieldName(name()), NumericUtils::sortableLongToDouble);
205+
DoublesBlockLoader sumsLoader = new DoublesBlockLoader(valuesSumSubFieldName(name()), NumericUtils::sortableLongToDouble);
206+
LongsBlockLoader valueCountsLoader = new LongsBlockLoader(valuesCountSubFieldName(name()));
207+
BytesRefsFromBinaryBlockLoader digestLoader = new BytesRefsFromBinaryBlockLoader(name());
208+
209+
// TODO: We're constantly passing around this set of 5 things. It would be nice to make a container for that.
210+
return new TDigestBlockLoader(digestLoader, minimaLoader, maximaLoader, sumsLoader, valueCountsLoader);
211+
}
212+
187213
@Override
188214
public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
189215
return SourceValueFetcher.identity(name(), context, format);
@@ -444,7 +470,7 @@ private static String valuesMaxSubFieldName(String fullPath) {
444470
}
445471

446472
/** re-usable {@link HistogramValue} implementation */
447-
private static class InternalTDigestValue extends HistogramValue {
473+
static class InternalTDigestValue extends HistogramValue {
448474
double value;
449475
long count;
450476
boolean isExhausted;

0 commit comments

Comments
 (0)