Skip to content

Commit 70892ab

Browse files
committed
Split encoding and histogram implementation, general code cleanup
1 parent ff78a50 commit 70892ab

File tree

4 files changed

+313
-196
lines changed

4 files changed

+313
-196
lines changed

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/CompressedExponentialHistogram.java

Lines changed: 28 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88
package org.elasticsearch.xpack.exponentialhistogram;
99

1010
import org.apache.lucene.util.BytesRef;
11-
import org.elasticsearch.common.bytes.BytesReference;
12-
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
13-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1411
import org.elasticsearch.common.io.stream.StreamOutput;
1512
import org.elasticsearch.exponentialhistogram.BucketIterator;
1613
import org.elasticsearch.exponentialhistogram.CopyableBucketIterator;
@@ -31,28 +28,17 @@
3128
*/
3229
public class CompressedExponentialHistogram implements ExponentialHistogram {
3330

34-
private static final int SCALE_OFFSET = 11;
35-
private static final int HAS_NEGATIVE_BUCKETS_FLAG = 1 << 6; // = 64
36-
private static final int SCALE_MASK = 0x3F; // = 63
37-
static {
38-
// protection against changes to MIN_SCALE and MAX_SCALE messing with our encoding
39-
assert MIN_SCALE + SCALE_OFFSET >= 0;
40-
assert MAX_SCALE + SCALE_OFFSET <= SCALE_MASK;
41-
}
42-
43-
private int scale;
4431
private double zeroThreshold;
4532
private long valueCount;
4633
private ZeroBucket lazyZeroBucket;
4734

48-
private final Buckets positiveBuckets = new Buckets();
49-
private final Buckets negativeBuckets = new Buckets();
50-
51-
private byte[] compressedData;
35+
private final EncodedHistogramData encodedData = new EncodedHistogramData();
36+
private final Buckets positiveBuckets = new Buckets(true);
37+
private final Buckets negativeBuckets = new Buckets(false);
5238

5339
@Override
5440
public int scale() {
55-
return scale;
41+
return encodedData.scale();
5642
}
5743

5844
@Override
@@ -79,46 +65,30 @@ public ExponentialHistogram.Buckets negativeBuckets() {
7965
*
8066
* @param zeroThreshold the zeroThreshold for the histogram, which needs to be stored externally
8167
* @param valueCount the total number of values the histogram contains, needs to be stored externally
82-
* @param histogramData the encoded histogram bytes which previously where generated via {@code #writeHistogramBytes}
68+
* @param encodedHistogramData the encoded histogram bytes which previously where generated via
69+
* {@link EncodedHistogramData#write(StreamOutput, int, List, List)}.
8370
*/
84-
public void reset(double zeroThreshold, long valueCount, BytesRef histogramData) throws IOException {
71+
public void reset(double zeroThreshold, long valueCount, BytesRef encodedHistogramData) throws IOException {
8572
lazyZeroBucket = null;
86-
8773
this.zeroThreshold = zeroThreshold;
8874
this.valueCount = valueCount;
89-
this.compressedData = histogramData.bytes;
90-
ByteArrayStreamInput input = new ByteArrayStreamInput();
91-
input.reset(histogramData.bytes, histogramData.offset, histogramData.length);
92-
93-
int scaleWithFlags = input.readByte();
94-
this.scale = (scaleWithFlags & SCALE_MASK) - SCALE_OFFSET;
95-
boolean hasNegativeBuckets = (scaleWithFlags & HAS_NEGATIVE_BUCKETS_FLAG) != 0;
96-
97-
int negativeBucketsLength = 0;
98-
if (hasNegativeBuckets) {
99-
negativeBucketsLength = input.readVInt();
100-
}
101-
102-
negativeBuckets.reset(input.getPosition(), negativeBucketsLength);
103-
input.skipBytes(negativeBucketsLength);
104-
positiveBuckets.reset(input.getPosition(), input.available());
75+
encodedData.load(encodedHistogramData);
76+
negativeBuckets.resetCachedData();
77+
positiveBuckets.resetCachedData();
10578
}
10679

10780
private final class Buckets implements ExponentialHistogram.Buckets {
10881

109-
private int encodedBucketsStart;
110-
private int encodedBucketsLength;
111-
82+
private final boolean isForPositiveBuckets; // false if for negative buckets
11283
private long cachedValueCount;
11384
private long cachedMaxIndex;
11485

115-
private Buckets() {
116-
reset(0, 0);
86+
private Buckets(boolean isForPositiveBuckets) {
87+
this.isForPositiveBuckets = isForPositiveBuckets;
88+
resetCachedData();
11789
}
11890

119-
private void reset(int encodedBucketsStart, int encodedBucketsLength) {
120-
this.encodedBucketsStart = encodedBucketsStart;
121-
this.encodedBucketsLength = encodedBucketsLength;
91+
private void resetCachedData() {
12292
cachedValueCount = -1;
12393
cachedMaxIndex = Long.MIN_VALUE;
12494
}
@@ -137,7 +107,11 @@ private void computeCachedDataIfRequired() {
137107

138108
@Override
139109
public CopyableBucketIterator iterator() {
140-
return new CompressedBucketsIterator(encodedBucketsStart, encodedBucketsLength);
110+
if (isForPositiveBuckets) {
111+
return new CompressedBucketsIterator(encodedData.positiveBucketsDecoder());
112+
} else {
113+
return new CompressedBucketsIterator(encodedData.negativeBucketsDecoder());
114+
}
141115
}
142116

143117
@Override
@@ -154,173 +128,41 @@ public long valueCount() {
154128

155129
private class CompressedBucketsIterator implements CopyableBucketIterator {
156130

157-
private long currentIndex;
158-
/**
159-
* The count for the bucket this iterator is currently pointing at.
160-
* A value of {@code -1} is used to represent that the end has been reached.
161-
*/
162-
private long currentCount;
163-
private final ByteArrayStreamInput bucketData;
164-
165-
CompressedBucketsIterator(int encodedBucketsStartOffset, int length) {
166-
bucketData = new ByteArrayStreamInput();
167-
if (length > 0) {
168-
bucketData.reset(compressedData, encodedBucketsStartOffset, length);
169-
try {
170-
currentIndex = bucketData.readZLong() - 1;
171-
} catch (IOException e) {
172-
throw new IllegalStateException("Bad histogram bytes", e);
173-
}
174-
currentCount = 0;
175-
advance();
176-
} else {
177-
// no data means we are iterating over an empty set of buckets
178-
markEndReached();
179-
}
180-
}
181-
182-
private void markEndReached() {
183-
currentCount = -1;
184-
}
131+
private final EncodedHistogramData.BucketsDecoder decoder;
185132

186-
CompressedBucketsIterator(CompressedBucketsIterator toCopy) {
187-
bucketData = new ByteArrayStreamInput();
188-
bucketData.reset(compressedData, toCopy.bucketData.getPosition(), toCopy.bucketData.available());
189-
currentCount = toCopy.currentCount;
190-
currentIndex = toCopy.currentIndex;
133+
CompressedBucketsIterator(EncodedHistogramData.BucketsDecoder delegate) {
134+
this.decoder = delegate;
191135
}
192136

193137
@Override
194138
public CopyableBucketIterator copy() {
195-
return new CompressedBucketsIterator(this);
139+
return new CompressedBucketsIterator(decoder.copy());
196140
}
197141

198142
@Override
199143
public final boolean hasNext() {
200-
return currentCount > 0;
144+
return decoder.hasNext();
201145
}
202146

203147
@Override
204148
public final long peekCount() {
205-
ensureEndNotReached();
206-
return currentCount;
149+
return decoder.peekCount();
207150
}
208151

209152
@Override
210153
public final long peekIndex() {
211-
ensureEndNotReached();
212-
return currentIndex;
154+
return decoder.peekIndex();
213155
}
214156

215157
@Override
216158
public int scale() {
217159
return CompressedExponentialHistogram.this.scale();
218160
}
219161

220-
/**
221-
* For details on the encoding, see {@link #writeHistogramBytes(StreamOutput, int, List, List)}.
222-
*/
223162
@Override
224163
public final void advance() {
225-
ensureEndNotReached();
226-
try {
227-
if (bucketData.available() > 0) {
228-
currentIndex++;
229-
long countOrNumEmptyBuckets = bucketData.readZLong();
230-
if (countOrNumEmptyBuckets < 0) {
231-
// we have encountered a negative value, this means we "skip"
232-
// the given amount of empty buckets
233-
long numEmptyBuckets = -countOrNumEmptyBuckets;
234-
currentIndex += numEmptyBuckets;
235-
// after we have skipped empty buckets, we know that the next value is a non-empty bucket
236-
currentCount = bucketData.readZLong();
237-
} else {
238-
currentCount = countOrNumEmptyBuckets;
239-
}
240-
assert currentCount > 0;
241-
} else {
242-
markEndReached();
243-
}
244-
} catch (IOException e) {
245-
throw new IllegalStateException("Bad histogram bytes", e);
246-
}
164+
decoder.advance();
247165
}
248-
249-
private void ensureEndNotReached() {
250-
if (currentCount == -1) {
251-
throw new IllegalStateException("end has already been reached");
252-
}
253-
}
254-
}
255-
}
256-
257-
/**
258-
* Serializes the given histogram, so that exactly the same histogram can be reconstructed via {@link #reset(double, long, BytesRef)}.
259-
*
260-
* @param output the output to write the serialized bytes to
261-
* @param scale the scale of the histogram
262-
* @param negativeBuckets the negative buckets of the histogram, sorted by the bucket indices
263-
* @param positiveBuckets the positive buckets of the histogram, sorted by the bucket indices
264-
*/
265-
public static void writeHistogramBytes(
266-
StreamOutput output,
267-
int scale,
268-
List<IndexWithCount> negativeBuckets,
269-
List<IndexWithCount> positiveBuckets
270-
) throws IOException {
271-
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]";
272-
boolean hasNegativeBuckets = negativeBuckets.isEmpty() == false;
273-
int scaleWithFlags = (scale + SCALE_OFFSET);
274-
assert scale >= 0 && scale <= SCALE_MASK;
275-
if (hasNegativeBuckets) {
276-
scaleWithFlags |= HAS_NEGATIVE_BUCKETS_FLAG;
277-
}
278-
output.writeByte((byte) scaleWithFlags);
279-
if (hasNegativeBuckets) {
280-
BytesStreamOutput temp = new BytesStreamOutput();
281-
serializeBuckets(temp, negativeBuckets);
282-
BytesReference data = temp.bytes();
283-
output.writeVInt(data.length());
284-
output.writeBytes(data.array(), data.arrayOffset(), data.length());
285-
}
286-
serializeBuckets(output, positiveBuckets);
287-
}
288-
289-
// Encodes the given bucket indices and counts as bytes into the given output.
290-
// The following scheme is used to maximize compression:
291-
// - if there are no buckets, the result is an empty array (byte[0])
292-
// - write the index of the first bucket as ZigZag-VLong
293-
// - write the count of the first bucket as ZigZag-VLong
294-
// - for each remaining bucket:
295-
// - if the index of the bucket is exactly previousBucketIndex+1, write the count for the bucket as ZigZag-VLong
296-
// - Otherwise there is at least one empty bucket between this one and the previous one.
297-
// We compute that number as n=currentBucketIndex-previousIndex-1 and then write -n out as
298-
// ZigZag-VLong followed by the count for the bucket as ZigZag-VLong. The negation is performed to allow to
299-
// distinguish the cases when decoding.
300-
//
301-
// This encoding provides a compact storage for both dense and sparse histograms:
302-
// For dense histograms it effectively results in encoding the index of the first bucket, followed by just an array of counts.
303-
// For sparse histograms it corresponds to an interleaved encoding of the bucket indices with delta compression and the bucket counts.
304-
// Even partially dense histograms profit from this encoding.
305-
private static void serializeBuckets(StreamOutput out, List<IndexWithCount> buckets) throws IOException {
306-
if (buckets.isEmpty()) {
307-
return; // no buckets, therefore nothing to write
308-
}
309-
long minIndex = buckets.getFirst().index();
310-
out.writeZLong(minIndex);
311-
long prevIndex = minIndex - 1;
312-
for (IndexWithCount indexWithCount : buckets) {
313-
long indexDelta = indexWithCount.index() - prevIndex;
314-
assert indexDelta > 0; // values must be sorted and unique
315-
assert indexWithCount.count() > 0;
316-
317-
long numEmptyBucketsInBetween = indexDelta - 1;
318-
if (numEmptyBucketsInBetween > 0) {
319-
out.writeZLong(-numEmptyBucketsInBetween);
320-
}
321-
out.writeZLong(indexWithCount.count());
322-
323-
prevIndex = indexWithCount.index();
324166
}
325167
}
326168
}

0 commit comments

Comments
 (0)