Skip to content

Commit f39b6d5

Browse files
authored
Refactor: Move exponential histogram compression into shared library (#136936)
1 parent 5ef1c35 commit f39b6d5

File tree

6 files changed

+487
-289
lines changed

6 files changed

+487
-289
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1414
import org.elasticsearch.exponentialhistogram.BucketIterator;
15+
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
1516
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
1617
import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker;
1718
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator;
1819
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
19-
import org.elasticsearch.xpack.analytics.mapper.IndexWithCount;
20-
import org.elasticsearch.xpack.exponentialhistogram.CompressedExponentialHistogram;
2120
import org.openjdk.jmh.annotations.Benchmark;
2221
import org.openjdk.jmh.annotations.BenchmarkMode;
2322
import org.openjdk.jmh.annotations.Fork;
@@ -32,7 +31,6 @@
3231
import org.openjdk.jmh.annotations.Warmup;
3332

3433
import java.io.IOException;
35-
import java.util.ArrayList;
3634
import java.util.List;
3735
import java.util.Random;
3836
import java.util.concurrent.ThreadLocalRandom;
@@ -109,28 +107,24 @@ public void setUp() {
109107
}
110108

111109
private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogram) {
112-
List<IndexWithCount> negativeBuckets = new ArrayList<>();
113-
List<IndexWithCount> positiveBuckets = new ArrayList<>();
114-
115-
BucketIterator it = histogram.negativeBuckets().iterator();
116-
while (it.hasNext()) {
117-
negativeBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
118-
it.advance();
119-
}
120-
it = histogram.positiveBuckets().iterator();
121-
while (it.hasNext()) {
122-
positiveBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
123-
it.advance();
124-
}
125-
126-
long totalCount = histogram.zeroBucket().count() + histogram.negativeBuckets().valueCount() + histogram.positiveBuckets()
127-
.valueCount();
128110
BytesStreamOutput histoBytes = new BytesStreamOutput();
129111
try {
130-
CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets);
112+
CompressedExponentialHistogram.writeHistogramBytes(
113+
histoBytes,
114+
histogram.scale(),
115+
histogram.negativeBuckets().iterator(),
116+
histogram.positiveBuckets().iterator()
117+
);
131118
CompressedExponentialHistogram result = new CompressedExponentialHistogram();
132119
BytesRef data = histoBytes.bytes().toBytesRef();
133-
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, histogram.sum(), histogram.min(), histogram.max(), data);
120+
result.reset(
121+
histogram.zeroBucket().zeroThreshold(),
122+
histogram.valueCount(),
123+
histogram.sum(),
124+
histogram.min(),
125+
histogram.max(),
126+
data
127+
);
134128
return result;
135129
} catch (IOException e) {
136130
throw new RuntimeException(e);
Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,42 @@
11
/*
2-
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3-
* or more contributor license agreements. Licensed under the Elastic License
4-
* 2.0; you may not use this file except in compliance with the Elastic License
5-
* 2.0.
2+
* Copyright Elasticsearch B.V., and/or licensed to Elasticsearch B.V.
3+
* under one or more license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* This file is based on a modification of https://github.com/open-telemetry/opentelemetry-java which is licensed under the Apache 2.0 License.
620
*/
721

8-
package org.elasticsearch.xpack.exponentialhistogram;
22+
package org.elasticsearch.exponentialhistogram;
923

1024
import org.apache.lucene.util.BytesRef;
1125
import org.apache.lucene.util.RamUsageEstimator;
12-
import org.elasticsearch.common.io.stream.StreamOutput;
13-
import org.elasticsearch.exponentialhistogram.AbstractExponentialHistogram;
14-
import org.elasticsearch.exponentialhistogram.BucketIterator;
15-
import org.elasticsearch.exponentialhistogram.CopyableBucketIterator;
16-
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
17-
import org.elasticsearch.exponentialhistogram.ZeroBucket;
18-
import org.elasticsearch.xpack.analytics.mapper.IndexWithCount;
1926

2027
import java.io.IOException;
21-
import java.util.List;
28+
import java.io.OutputStream;
2229
import java.util.OptionalLong;
2330

2431
/**
2532
* Implementation of a {@link ExponentialHistogram} optimized for a minimal memory footprint.
26-
* The compression used here also corresponds to how exponential_histogram fields are stored in
27-
* doc values by {@link ExponentialHistogramFieldMapper}.
2833
* <p>
2934
* While this implementation is optimized for a minimal memory footprint, it is still a fully compliant {@link ExponentialHistogram}
3035
* and can therefore be directly consumed for merging / quantile estimation without requiring any prior copying or decoding.
36+
*
37+
* Note that this histogram implementation stores the zero threshold as double value.
38+
* For that reason it is lossy if used for storing intermediate merge results, which can have the zero threshold represented
39+
* as a (scale, index) pair.
3140
*/
3241
public class CompressedExponentialHistogram extends AbstractExponentialHistogram {
3342

@@ -40,7 +49,7 @@ public class CompressedExponentialHistogram extends AbstractExponentialHistogram
4049
private double max;
4150
private ZeroBucket lazyZeroBucket;
4251

43-
private final EncodedHistogramData encodedData = new EncodedHistogramData();
52+
private final CompressedHistogramData encodedData = new CompressedHistogramData();
4453
private final Buckets positiveBuckets = new Buckets(true);
4554
private final Buckets negativeBuckets = new Buckets(false);
4655

@@ -99,7 +108,7 @@ public ExponentialHistogram.Buckets negativeBuckets() {
99108
* @param max the maximum of the values the histogram contains, needs to be stored externally.
100109
* Must be {@link Double#NaN} if the histogram is empty, non-Nan otherwise.
101110
* @param encodedHistogramData the encoded histogram bytes which previously where generated via
102-
* {@link #writeHistogramBytes(StreamOutput, int, List, List)}.
111+
* {@link #writeHistogramBytes(OutputStream, int, BucketIterator, BucketIterator)}.
103112
*/
104113
public void reset(double zeroThreshold, long valueCount, double sum, double min, double max, BytesRef encodedHistogramData)
105114
throws IOException {
@@ -123,18 +132,14 @@ public void reset(double zeroThreshold, long valueCount, double sum, double min,
123132
* @param negativeBuckets the negative buckets of the histogram, sorted by the bucket indices
124133
* @param positiveBuckets the positive buckets of the histogram, sorted by the bucket indices
125134
*/
126-
public static void writeHistogramBytes(
127-
StreamOutput output,
128-
int scale,
129-
List<IndexWithCount> negativeBuckets,
130-
List<IndexWithCount> positiveBuckets
131-
) throws IOException {
132-
EncodedHistogramData.write(output, scale, negativeBuckets, positiveBuckets);
135+
public static void writeHistogramBytes(OutputStream output, int scale, BucketIterator negativeBuckets, BucketIterator positiveBuckets)
136+
throws IOException {
137+
CompressedHistogramData.write(output, scale, negativeBuckets, positiveBuckets);
133138
}
134139

135140
@Override
136141
public long ramBytesUsed() {
137-
return SHALLOW_SIZE + ZeroBucket.SHALLOW_SIZE + 2 * Buckets.SHALLOW_SIZE + EncodedHistogramData.SHALLOW_SIZE;
142+
return SHALLOW_SIZE + ZeroBucket.SHALLOW_SIZE + 2 * Buckets.SHALLOW_SIZE + CompressedHistogramData.SHALLOW_SIZE;
138143
}
139144

140145
private final class Buckets implements ExponentialHistogram.Buckets {
@@ -190,9 +195,9 @@ public long valueCount() {
190195

191196
private class CompressedBucketsIterator implements CopyableBucketIterator {
192197

193-
private final EncodedHistogramData.BucketsDecoder decoder;
198+
private final CompressedHistogramData.BucketsDecoder decoder;
194199

195-
CompressedBucketsIterator(EncodedHistogramData.BucketsDecoder delegate) {
200+
CompressedBucketsIterator(CompressedHistogramData.BucketsDecoder delegate) {
196201
this.decoder = delegate;
197202
}
198203

0 commit comments

Comments
 (0)