Skip to content

Commit 528f1ce

Browse files
authored
OTLP intake: Implement exponential_histogram conversion algorithm (elastic#139188)
1 parent 8d32173 commit 528f1ce

File tree

8 files changed

+1010
-32
lines changed

8 files changed

+1010
-32
lines changed

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,10 @@ public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder)
208208
} else {
209209
builder.startObject();
210210
builder.startArray("counts");
211-
HistogramConverter.counts(dataPoint, builder::value);
211+
TDigestConverter.counts(dataPoint, builder::value);
212212
builder.endArray();
213213
builder.startArray("values");
214-
HistogramConverter.centroidValues(dataPoint, builder::value);
214+
TDigestConverter.centroidValues(dataPoint, builder::value);
215215
builder.endArray();
216216
builder.endObject();
217217
}
@@ -274,10 +274,10 @@ public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder)
274274
} else {
275275
builder.startObject();
276276
builder.startArray("counts");
277-
HistogramConverter.counts(dataPoint, builder::value);
277+
TDigestConverter.counts(dataPoint, builder::value);
278278
builder.endArray();
279279
builder.startArray("values");
280-
HistogramConverter.centroidValues(dataPoint, builder::value);
280+
TDigestConverter.centroidValues(dataPoint, builder::value);
281281
builder.endArray();
282282
builder.endObject();
283283
}
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.oteldata.otlp.datapoint;
9+
10+
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
11+
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
12+
13+
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
14+
import org.elasticsearch.xcontent.XContentBuilder;
15+
16+
import java.io.IOException;
17+
18+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
19+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.BUCKET_COUNTS_FIELD;
20+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.BUCKET_INDICES_FIELD;
21+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.MAX_FIELD;
22+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.MIN_FIELD;
23+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.NEGATIVE_FIELD;
24+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.POSITIVE_FIELD;
25+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.SCALE_FIELD;
26+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.SUM_FIELD;
27+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.ZERO_COUNT_FIELD;
28+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.ZERO_FIELD;
29+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent.ZERO_THRESHOLD_FIELD;
30+
31+
/**
32+
* Utility class to convert OpenTelemetry histogram data points into exponential histograms
33+
* so that we can use it with the {@code exponential_histogram} field type.
34+
*
35+
* OTLP exponential histograms will be left unchanged and just formatted into the XContent that the field type expects.
36+
* OTLP explicit bucket histograms will be converted to exponential histograms at maximum scale,
37+
* with the bucket centers corresponding to the centroids which are computed by {@link TDigestConverter}.
38+
* For the details see {@link #buildExponentialHistogram(HistogramDataPoint, XContentBuilder, BucketBuffer)}.
39+
*/
40+
public class ExponentialHistogramConverter {
41+
42+
/**
43+
* Writes the provided OTLP exponential histogram as elasticsearch exponential_histogram field value.
44+
* @param dataPoint the point to write
45+
* @param builder the builder to write to
46+
*/
47+
public static void buildExponentialHistogram(ExponentialHistogramDataPoint dataPoint, XContentBuilder builder) throws IOException {
48+
builder.startObject();
49+
builder.field("scale", dataPoint.getScale());
50+
if (dataPoint.getZeroCount() > 0) {
51+
builder.startObject(ZERO_FIELD).field(ZERO_COUNT_FIELD, dataPoint.getZeroCount());
52+
if (dataPoint.getZeroThreshold() != 0) {
53+
builder.field(ZERO_THRESHOLD_FIELD, dataPoint.getZeroThreshold());
54+
}
55+
builder.endObject();
56+
}
57+
if (dataPoint.hasNegative()) {
58+
writeExponentialBuckets(builder, NEGATIVE_FIELD, dataPoint.getNegative());
59+
}
60+
if (dataPoint.hasPositive()) {
61+
writeExponentialBuckets(builder, POSITIVE_FIELD, dataPoint.getPositive());
62+
}
63+
if (dataPoint.hasSum()) {
64+
builder.field(SUM_FIELD, dataPoint.getSum());
65+
}
66+
if (dataPoint.hasMin()) {
67+
builder.field(MIN_FIELD, dataPoint.getMin());
68+
}
69+
if (dataPoint.hasMax()) {
70+
builder.field(MAX_FIELD, dataPoint.getMax());
71+
}
72+
builder.endObject();
73+
}
74+
75+
private static void writeExponentialBuckets(XContentBuilder builder, String fieldName, ExponentialHistogramDataPoint.Buckets buckets)
76+
throws IOException {
77+
builder.startObject(fieldName);
78+
builder.startArray(BUCKET_INDICES_FIELD);
79+
for (int i = 0; i < buckets.getBucketCountsCount(); i++) {
80+
long count = buckets.getBucketCounts(i);
81+
if (count != 0) {
82+
builder.value(buckets.getOffset() + i);
83+
}
84+
}
85+
builder.endArray();
86+
builder.startArray(BUCKET_COUNTS_FIELD);
87+
for (int i = 0; i < buckets.getBucketCountsCount(); i++) {
88+
long count = buckets.getBucketCounts(i);
89+
if (count != 0) {
90+
builder.value(count);
91+
}
92+
}
93+
builder.endArray();
94+
builder.endObject();
95+
}
96+
97+
/**
98+
* Writes the provided OTLP explicit bucket histogram as elasticsearch exponential_histogram field value.
99+
* First, the buckets will be converted to the same centroids as computed by {@link TDigestConverter},
100+
* then we will use an exponential histogram with maximum scale with one bucket per centroid.
101+
* Due to the maximum scale, the buckets will be very narrow and will always
102+
* yield almost exactly the centroids for percentile estimation.
103+
* <br>
104+
* In addition, we preserve the min/max if provided. To make sure that the min/max are actually part of a bucket, we do the following:
105+
* <ul>
106+
* <li>If the min is smaller than the centroid of the first bucket with data, we add a separate single-value bucket for it
107+
* and reduce the count of the original first bucket by one (removing it, if it becomes empty)</li>
108+
* <li>If the min is larger than the centroid of the first bucket with data, we clamp the centroid to the min</li>
109+
* </ul>
110+
* And we do the same thing for max with the last populated bucket.
111+
*
112+
* @param dataPoint the point to write
113+
* @param builder the builder to write to
114+
* @param bucketsScratch reusable, temporary memory used to build the output histogram
115+
*/
116+
public static void buildExponentialHistogram(HistogramDataPoint dataPoint, XContentBuilder builder, BucketBuffer bucketsScratch)
117+
throws IOException {
118+
builder.startObject().field(SCALE_FIELD, MAX_SCALE);
119+
120+
// TODO: When start supporting cumulative buckets, we can't do the synthetic min/max bucket trick anymore.
121+
// Then we probably need to just drop min/max, which aren't really useful for cumulative histograms anyway
122+
123+
int size = dataPoint.getBucketCountsCount();
124+
125+
if (size > 0) {
126+
bucketsScratch.clear();
127+
128+
boolean minHandled = false;
129+
130+
int lastPopulatedBucket = size - 1;
131+
while (lastPopulatedBucket >= 0 && dataPoint.getBucketCounts(lastPopulatedBucket) == 0) {
132+
lastPopulatedBucket--;
133+
}
134+
135+
for (int i = 0; i < size; i++) {
136+
long count = dataPoint.getBucketCounts(i);
137+
if (count > 0) {
138+
double centroid = TDigestConverter.getCentroid(dataPoint, i);
139+
if (dataPoint.hasMin() && minHandled == false) {
140+
if (centroid > dataPoint.getMin() && count > 1) {
141+
// min is smaller than the centroid and not the only value in the bucket, we inject a separate, single value
142+
// bucket for it
143+
bucketsScratch.append(dataPoint.getMin(), 1);
144+
count -= 1;
145+
} else {
146+
// clamp the bucket centroid to min
147+
centroid = dataPoint.getMin();
148+
}
149+
minHandled = true;
150+
}
151+
boolean injectMaxBucketAfterIteration = false;
152+
if (dataPoint.hasMax() && lastPopulatedBucket == i) {
153+
if (centroid < dataPoint.getMax() && count > 1) {
154+
// max is bigger than the centroid and not the only value in the bucket, we inject a separate, single value
155+
// bucket for it
156+
injectMaxBucketAfterIteration = true;
157+
count -= 1;
158+
} else {
159+
// clamp the bucket centroid to min
160+
centroid = dataPoint.getMax();
161+
}
162+
}
163+
bucketsScratch.append(centroid, count);
164+
if (injectMaxBucketAfterIteration) {
165+
bucketsScratch.append(dataPoint.getMax(), 1);
166+
}
167+
}
168+
}
169+
bucketsScratch.writeBuckets(builder);
170+
if (dataPoint.hasSum()) {
171+
builder.field(SUM_FIELD, dataPoint.getSum());
172+
}
173+
if (dataPoint.hasMin()) {
174+
builder.field(MIN_FIELD, dataPoint.getMin());
175+
}
176+
if (dataPoint.hasMax()) {
177+
builder.field(MAX_FIELD, dataPoint.getMax());
178+
}
179+
}
180+
builder.endObject();
181+
}
182+
183+
private static class IndexWithCountList {
184+
185+
private static final int INITIAL_CAPACITY = 32;
186+
187+
private long[] indices = new long[INITIAL_CAPACITY];
188+
private long[] counts = new long[INITIAL_CAPACITY];
189+
private int size = 0;
190+
191+
public void add(long index, long count) {
192+
if (size == indices.length) {
193+
doubleCapacity();
194+
}
195+
indices[size] = index;
196+
counts[size] = count;
197+
size++;
198+
}
199+
200+
public void setCount(int position, long count) {
201+
assert position < size;
202+
counts[position] = count;
203+
}
204+
205+
public int size() {
206+
return size;
207+
}
208+
209+
public long getIndex(int position) {
210+
assert position < size;
211+
return indices[position];
212+
}
213+
214+
public long getCount(int position) {
215+
assert position < size;
216+
return counts[position];
217+
}
218+
219+
public void clear() {
220+
size = 0;
221+
}
222+
223+
private void doubleCapacity() {
224+
long[] newIndices = new long[indices.length * 2];
225+
long[] newCounts = new long[counts.length * 2];
226+
System.arraycopy(indices, 0, newIndices, 0, size);
227+
System.arraycopy(counts, 0, newCounts, 0, size);
228+
indices = newIndices;
229+
counts = newCounts;
230+
}
231+
}
232+
233+
public static class BucketBuffer {
234+
// negative indices are sorted from highest to lowest (smallest bucket center to largest bucket center)
235+
private final IndexWithCountList negativeBuckets = new IndexWithCountList();
236+
237+
// positive indices are sorted from lowest to highest (smallest bucket center to largest bucket center)
238+
private final IndexWithCountList positiveBuckets = new IndexWithCountList();
239+
240+
long zeroCount = 0;
241+
242+
private void clear() {
243+
negativeBuckets.clear();
244+
positiveBuckets.clear();
245+
zeroCount = 0;
246+
}
247+
248+
void append(double center, long count) {
249+
if (count == 0) {
250+
return;
251+
}
252+
if (center < 0) {
253+
addOrMergeBucket(center, count, negativeBuckets);
254+
} else if (center > 0) {
255+
addOrMergeBucket(center, count, positiveBuckets);
256+
} else {
257+
zeroCount += count;
258+
}
259+
}
260+
261+
private static void addOrMergeBucket(double center, long count, IndexWithCountList buckets) {
262+
long index = ExponentialScaleUtils.computeIndex(center, MAX_SCALE);
263+
if (buckets.size() > 0 && buckets.getIndex(buckets.size() - 1) == index) {
264+
// merge with previous
265+
int lastPos = buckets.size() - 1;
266+
buckets.setCount(lastPos, buckets.getCount(lastPos) + count);
267+
} else {
268+
buckets.add(index, count);
269+
}
270+
}
271+
272+
private void writeBuckets(XContentBuilder builder) throws IOException {
273+
if (zeroCount > 0) {
274+
builder.startObject(ZERO_FIELD).field(ZERO_COUNT_FIELD, zeroCount).endObject();
275+
}
276+
if (negativeBuckets.size() > 0) {
277+
// write in inverse order to get lowest to highest index
278+
builder.startObject(NEGATIVE_FIELD);
279+
builder.startArray(BUCKET_INDICES_FIELD);
280+
for (int i = negativeBuckets.size() - 1; i >= 0; i--) {
281+
builder.value(negativeBuckets.getIndex(i));
282+
}
283+
builder.endArray();
284+
builder.startArray(BUCKET_COUNTS_FIELD);
285+
for (int i = negativeBuckets.size() - 1; i >= 0; i--) {
286+
builder.value(negativeBuckets.getCount(i));
287+
}
288+
builder.endArray();
289+
builder.endObject();
290+
}
291+
if (positiveBuckets.size() > 0) {
292+
builder.startObject(POSITIVE_FIELD);
293+
builder.startArray(BUCKET_INDICES_FIELD);
294+
for (int i = 0; i < positiveBuckets.size(); i++) {
295+
builder.value(positiveBuckets.getIndex(i));
296+
}
297+
builder.endArray();
298+
builder.startArray(BUCKET_COUNTS_FIELD);
299+
for (int i = 0; i < positiveBuckets.size(); i++) {
300+
builder.value(positiveBuckets.getCount(i));
301+
}
302+
builder.endArray();
303+
builder.endObject();
304+
}
305+
}
306+
}
307+
}

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverter.java renamed to x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TDigestConverter.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* Elasticsearch exporter on GitHub
2323
* </a>
2424
*/
25-
class HistogramConverter {
25+
class TDigestConverter {
2626

2727
static <E extends Exception> void counts(ExponentialHistogramDataPoint dp, CheckedLongConsumer<E> counts) throws E {
2828
ExponentialHistogramDataPoint.Buckets negative = dp.getNegative();
@@ -105,26 +105,31 @@ static <E extends Exception> void centroidValues(HistogramDataPoint dp, CheckedD
105105
for (int i = 0; i < size; i++) {
106106
long count = dp.getBucketCounts(i);
107107
if (count != 0) {
108-
double value;
109-
if (i == 0) {
110-
// (-infinity, explicit_bounds[i]]
111-
value = dp.getExplicitBounds(i);
112-
if (value > 0) {
113-
value /= 2;
114-
}
115-
} else if (i == size - 1) {
116-
// (explicit_bounds[i], +infinity)
117-
value = dp.getExplicitBounds(i - 1);
118-
} else {
119-
// [explicit_bounds[i-1], explicit_bounds[i])
120-
// Use the midpoint between the boundaries.
121-
value = dp.getExplicitBounds(i - 1) + (dp.getExplicitBounds(i) - dp.getExplicitBounds(i - 1)) / 2.0;
122-
}
123-
values.accept(value);
108+
values.accept(getCentroid(dp, i));
124109
}
125110
}
126111
}
127112

113+
public static double getCentroid(HistogramDataPoint dp, int bucketIndex) {
114+
double value;
115+
if (bucketIndex == 0) {
116+
// (-infinity, explicit_bounds[i]]
117+
value = dp.getExplicitBounds(bucketIndex);
118+
if (value > 0) {
119+
value /= 2;
120+
}
121+
} else if (bucketIndex == dp.getBucketCountsCount() - 1) {
122+
// (explicit_bounds[i], +infinity)
123+
value = dp.getExplicitBounds(bucketIndex - 1);
124+
} else {
125+
// [explicit_bounds[i-1], explicit_bounds[i])
126+
// Use the midpoint between the boundaries.
127+
value = dp.getExplicitBounds(bucketIndex - 1) + (dp.getExplicitBounds(bucketIndex) - dp.getExplicitBounds(bucketIndex - 1))
128+
/ 2.0;
129+
}
130+
return value;
131+
}
132+
128133
interface CheckedLongConsumer<E extends Exception> {
129134
void accept(long value) throws E;
130135
}

0 commit comments

Comments
 (0)