Skip to content

Commit 81f33e4

Browse files
authored
Change downsample's MetricFieldProducers (#124701)
Refactor MetricFieldProducer to use SortedNumericDoubleValues instead of FormattedDocValues, which saves unneeded conversations / casts.
1 parent b427a2b commit 81f33e4

File tree

5 files changed

+110
-63
lines changed

5 files changed

+110
-63
lines changed

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ public void write(XContentBuilder builder) throws IOException {
3939
if (fieldProducer.isEmpty() == false) {
4040
if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
4141
for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) {
42-
if (metric.get() != null) {
43-
builder.field(metric.name(), metric.get());
44-
}
42+
builder.field(metric.name(), metric.get());
4543
}
4644
} else if (fieldProducer instanceof LabelFieldProducer labelFieldProducer) {
4745
LabelFieldProducer.Label label = labelFieldProducer.label();

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.index.IndexService;
3535
import org.elasticsearch.index.engine.Engine;
3636
import org.elasticsearch.index.fielddata.FormattedDocValues;
37+
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
3738
import org.elasticsearch.index.mapper.DateFieldMapper;
3839
import org.elasticsearch.index.mapper.DocCountFieldMapper;
3940
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
@@ -361,14 +362,30 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
361362
docCountProvider.setLeafReaderContext(ctx);
362363

363364
// For each field, return a tuple with the downsample field producer and the field value leaf
364-
final AbstractDownsampleFieldProducer[] fieldProducers = new AbstractDownsampleFieldProducer[fieldValueFetchers.size()];
365-
final FormattedDocValues[] formattedDocValues = new FormattedDocValues[fieldValueFetchers.size()];
366-
for (int i = 0; i < fieldProducers.length; i++) {
367-
fieldProducers[i] = fieldValueFetchers.get(i).fieldProducer();
368-
formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx);
365+
final List<AbstractDownsampleFieldProducer> nonMetricProducers = new ArrayList<>();
366+
final List<FormattedDocValues> formattedDocValues = new ArrayList<>();
367+
368+
final List<MetricFieldProducer> metricProducers = new ArrayList<>();
369+
final List<SortedNumericDoubleValues> numericDocValues = new ArrayList<>();
370+
for (var fieldValueFetcher : fieldValueFetchers) {
371+
var fieldProducer = fieldValueFetcher.fieldProducer();
372+
if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
373+
metricProducers.add(metricFieldProducer);
374+
numericDocValues.add(fieldValueFetcher.getNumericLeaf(ctx));
375+
} else {
376+
nonMetricProducers.add(fieldProducer);
377+
formattedDocValues.add(fieldValueFetcher.getLeaf(ctx));
378+
}
369379
}
370380

371-
var leafBucketCollector = new LeafDownsampleCollector(aggCtx, docCountProvider, fieldProducers, formattedDocValues);
381+
var leafBucketCollector = new LeafDownsampleCollector(
382+
aggCtx,
383+
docCountProvider,
384+
nonMetricProducers.toArray(new AbstractDownsampleFieldProducer[0]),
385+
formattedDocValues.toArray(new FormattedDocValues[0]),
386+
metricProducers.toArray(new MetricFieldProducer[0]),
387+
numericDocValues.toArray(new SortedNumericDoubleValues[0])
388+
);
372389
leafBucketCollectors.add(leafBucketCollector);
373390
return leafBucketCollector;
374391
}
@@ -386,7 +403,10 @@ class LeafDownsampleCollector extends LeafBucketCollector {
386403
final AggregationExecutionContext aggCtx;
387404
final DocCountProvider docCountProvider;
388405
final FormattedDocValues[] formattedDocValues;
389-
final AbstractDownsampleFieldProducer[] fieldProducers;
406+
final AbstractDownsampleFieldProducer[] nonMetricProducers;
407+
408+
final MetricFieldProducer[] metricProducers;
409+
final SortedNumericDoubleValues[] numericDocValues;
390410

391411
// Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
392412
long firstTimeStampForBulkCollection;
@@ -396,13 +416,20 @@ class LeafDownsampleCollector extends LeafBucketCollector {
396416
LeafDownsampleCollector(
397417
AggregationExecutionContext aggCtx,
398418
DocCountProvider docCountProvider,
399-
AbstractDownsampleFieldProducer[] fieldProducers,
400-
FormattedDocValues[] formattedDocValues
419+
AbstractDownsampleFieldProducer[] nonMetricProducers,
420+
FormattedDocValues[] formattedDocValues,
421+
MetricFieldProducer[] metricProducers,
422+
SortedNumericDoubleValues[] numericDocValues
401423
) {
424+
assert nonMetricProducers.length == formattedDocValues.length;
425+
assert metricProducers.length == numericDocValues.length;
426+
402427
this.aggCtx = aggCtx;
403428
this.docCountProvider = docCountProvider;
404-
this.fieldProducers = fieldProducers;
429+
this.nonMetricProducers = nonMetricProducers;
405430
this.formattedDocValues = formattedDocValues;
431+
this.metricProducers = metricProducers;
432+
this.numericDocValues = numericDocValues;
406433
}
407434

408435
@Override
@@ -488,11 +515,16 @@ void leafBulkCollection() throws IOException {
488515

489516
downsampleBucketBuilder.collectDocCount(docIdBuffer, docCountProvider);
490517
// Iterate over all field values and collect the doc_values for this docId
491-
for (int i = 0; i < fieldProducers.length; i++) {
492-
AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i];
518+
for (int i = 0; i < nonMetricProducers.length; i++) {
519+
AbstractDownsampleFieldProducer fieldProducer = nonMetricProducers[i];
493520
FormattedDocValues docValues = formattedDocValues[i];
494521
fieldProducer.collect(docValues, docIdBuffer);
495522
}
523+
for (int i = 0; i < metricProducers.length; i++) {
524+
MetricFieldProducer metricFieldProducer = metricProducers[i];
525+
SortedNumericDoubleValues numericDoubleValues = numericDocValues[i];
526+
metricFieldProducer.collect(numericDoubleValues, docIdBuffer);
527+
}
496528

497529
docsProcessed += docIdBuffer.size();
498530
task.setDocsProcessed(docsProcessed);

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.apache.lucene.index.LeafReaderContext;
1111
import org.elasticsearch.index.fielddata.FormattedDocValues;
1212
import org.elasticsearch.index.fielddata.IndexFieldData;
13+
import org.elasticsearch.index.fielddata.LeafNumericFieldData;
14+
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
1315
import org.elasticsearch.index.mapper.MappedFieldType;
1416
import org.elasticsearch.index.mapper.NumberFieldMapper;
1517
import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
@@ -50,6 +52,11 @@ public FormattedDocValues getLeaf(LeafReaderContext context) {
5052
return fieldData.load(context).getFormattedValues(format);
5153
}
5254

55+
public SortedNumericDoubleValues getNumericLeaf(LeafReaderContext context) {
56+
LeafNumericFieldData numericFieldData = (LeafNumericFieldData) fieldData.load(context);
57+
return numericFieldData.getDoubleValues();
58+
}
59+
5360
public AbstractDownsampleFieldProducer fieldProducer() {
5461
return fieldProducer;
5562
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.internal.hppc.IntArrayList;
1111
import org.elasticsearch.index.fielddata.FormattedDocValues;
12+
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
1213
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
1314
import org.elasticsearch.xcontent.XContentBuilder;
1415

@@ -46,7 +47,7 @@ public Metric[] metrics() {
4647
}
4748

4849
/** Collect the value of a raw field and compute all downsampled metrics */
49-
void collect(Number value) {
50+
void collect(double value) {
5051
for (MetricFieldProducer.Metric metric : metrics()) {
5152
metric.collect(value);
5253
}
@@ -55,14 +56,19 @@ void collect(Number value) {
5556

5657
@Override
5758
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
59+
assert false : "MetricFieldProducer does not support formatted doc values";
60+
throw new UnsupportedOperationException();
61+
}
62+
63+
public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
5864
for (int i = 0; i < docIdBuffer.size(); i++) {
5965
int docId = docIdBuffer.get(i);
6066
if (docValues.advanceExact(docId) == false) {
6167
continue;
6268
}
6369
int docValuesCount = docValues.docValueCount();
6470
for (int j = 0; j < docValuesCount; j++) {
65-
Number num = (Number) docValues.nextValue();
71+
double num = docValues.nextValue();
6672
collect(num);
6773
}
6874
}
@@ -83,9 +89,9 @@ public String name() {
8389
return name;
8490
}
8591

86-
abstract void collect(Number number);
92+
abstract void collect(double number);
8793

88-
abstract Number get();
94+
abstract double get();
8995

9096
abstract void reset();
9197
}
@@ -94,51 +100,55 @@ public String name() {
94100
* Metric implementation that computes the maximum of all values of a field
95101
*/
96102
static final class Max extends Metric {
97-
private Double max;
103+
private static final double NO_VALUE = -Double.MAX_VALUE;
104+
105+
private double max = NO_VALUE;
98106

99107
Max() {
100108
super("max");
101109
}
102110

103111
@Override
104-
void collect(Number value) {
105-
this.max = max != null ? Math.max(value.doubleValue(), max) : value.doubleValue();
112+
void collect(double value) {
113+
this.max = Math.max(value, max);
106114
}
107115

108116
@Override
109-
Number get() {
117+
double get() {
110118
return max;
111119
}
112120

113121
@Override
114122
void reset() {
115-
max = null;
123+
max = NO_VALUE;
116124
}
117125
}
118126

119127
/**
120128
* Metric implementation that computes the minimum of all values of a field
121129
*/
122130
static final class Min extends Metric {
123-
private Double min;
131+
private static final double NO_VALUE = Double.MAX_VALUE;
132+
133+
private double min = NO_VALUE;
124134

125135
Min() {
126136
super("min");
127137
}
128138

129139
@Override
130-
void collect(Number value) {
131-
this.min = min != null ? Math.min(value.doubleValue(), min) : value.doubleValue();
140+
void collect(double value) {
141+
this.min = Math.min(value, min);
132142
}
133143

134144
@Override
135-
Number get() {
145+
double get() {
136146
return min;
137147
}
138148

139149
@Override
140150
void reset() {
141-
min = null;
151+
min = NO_VALUE;
142152
}
143153
}
144154

@@ -157,12 +167,12 @@ static final class Sum extends Metric {
157167
}
158168

159169
@Override
160-
void collect(Number value) {
161-
kahanSummation.add(value.doubleValue());
170+
void collect(double value) {
171+
kahanSummation.add(value);
162172
}
163173

164174
@Override
165-
Number get() {
175+
double get() {
166176
return kahanSummation.value();
167177
}
168178

@@ -183,12 +193,12 @@ static final class ValueCount extends Metric {
183193
}
184194

185195
@Override
186-
void collect(Number value) {
196+
void collect(double value) {
187197
count++;
188198
}
189199

190200
@Override
191-
Number get() {
201+
double get() {
192202
return count;
193203
}
194204

@@ -206,27 +216,29 @@ void reset() {
206216
* ignoring everything else.
207217
*/
208218
static final class LastValue extends Metric {
209-
private Number lastValue;
219+
private static final double NO_VALUE = Double.MIN_VALUE;
220+
221+
private double lastValue = NO_VALUE;
210222

211223
LastValue() {
212224
super("last_value");
213225
}
214226

215227
@Override
216-
void collect(Number value) {
217-
if (lastValue == null) {
228+
void collect(double value) {
229+
if (lastValue == Double.MIN_VALUE) {
218230
lastValue = value;
219231
}
220232
}
221233

222234
@Override
223-
Number get() {
235+
double get() {
224236
return lastValue;
225237
}
226238

227239
@Override
228240
void reset() {
229-
lastValue = null;
241+
lastValue = NO_VALUE;
230242
}
231243
}
232244

@@ -240,7 +252,7 @@ static final class CounterMetricFieldProducer extends MetricFieldProducer {
240252
}
241253

242254
@Override
243-
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
255+
public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
244256
// Counter producers only collect the last_value. Since documents are
245257
// collected by descending timestamp order, the producer should only
246258
// process the first value for every tsid. So, it will only collect the
@@ -281,9 +293,7 @@ public void write(XContentBuilder builder) throws IOException {
281293
if (isEmpty() == false) {
282294
builder.startObject(name());
283295
for (MetricFieldProducer.Metric metric : metrics()) {
284-
if (metric.get() != null) {
285-
builder.field(metric.name(), metric.get());
286-
}
296+
builder.field(metric.name(), metric.get());
287297
}
288298
builder.endObject();
289299
}

0 commit comments

Comments
 (0)