Skip to content

Commit f5ed586

Browse files
NaireenNaireen
andauthored
Add histogram parsing in runner v2 (#34017)
* Add histogram parsing in runner v2 * address comments --------- Co-authored-by: Naireen <[email protected]>
1 parent 73b4d53 commit f5ed586

File tree

4 files changed

+257
-0
lines changed

4 files changed

+257
-0
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,61 @@ message BoundedTrie {
628628
repeated string singleton = 3;
629629
}
630630

631+
// The message type used for encoding Histogram Data
632+
message HistogramValue {
633+
// Number of values recorded in this histogram.
634+
optional int64 count = 1;
635+
636+
// Describes the bucket boundaries used in the histogram.
637+
optional BucketOptions bucket_options = 2;
638+
639+
// The number of values in each bucket of the histogram, as described in
640+
// `bucket_options`. `bucket_counts` should contain N values, where N is the
641+
// number of buckets specified in `bucket_options`. If `bucket_counts` has
642+
// fewer than N values, the remaining values are assumed to be 0.
643+
repeated int64 bucket_counts = 3;
644+
645+
// `BucketOptions` describes the bucket boundaries used in the histogram.
646+
message BucketOptions {
647+
// Linear buckets with the following boundaries for indices in 0 to n-1.
648+
// - i in [0, n-1]: [start + (i)*width, start + (i+1)*width)
649+
message Linear {
650+
// Must be greater than 0.
651+
//
652+
// (-- api-linter: core::0140::prepositions=disabled
653+
// aip.dev/not-precedent: `bucket_count` would cause confusion with
654+
// `bucket_counts` field --)
655+
optional int32 number_of_buckets = 1;
656+
// Distance between bucket boundaries. Must be greater than 0.
657+
optional double width = 2;
658+
// Lower bound of the first bucket.
659+
optional double start = 3;
660+
}
661+
662+
// Exponential buckets where the growth factor between buckets is
663+
// `2**(2**-scale)`. e.g. for `scale=1` growth factor is
664+
// `2**(2**(-1))=sqrt(2)`. `n` buckets will have the following boundaries.
665+
// - 0th: [0, gf)
666+
// - i in [1, n-1]: [gf^(i), gf^(i+1))
667+
message Base2Exponent {
668+
// Must be greater than 0.
669+
//
670+
// (-- api-linter: core::0140::prepositions=disabled
671+
// aip.dev/not-precedent: `bucket_count` would cause confusion with
672+
// `bucket_counts` field --)
673+
optional int32 number_of_buckets = 1;
674+
// Must be between -3 and 3. This forces the growth factor of the bucket
675+
// boundaries to be between `2^(1/8)` and `256`.
676+
optional int32 scale = 2;
677+
}
678+
oneof bucket_type {
679+
// Bucket boundaries grow linearly.
680+
Linear linear = 1;
681+
// Bucket boundaries grow exponentially.
682+
Base2Exponent exponential = 2;
683+
}
684+
}
685+
}
631686

632687
// General monitored state information which contains structured information
633688
// which does not fit into a typical metric format.

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import java.io.InputStream;
2222
import java.util.Set;
2323
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
24+
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue;
2425
import org.apache.beam.sdk.coders.Coder;
2526
import org.apache.beam.sdk.coders.DoubleCoder;
2627
import org.apache.beam.sdk.coders.IterableCoder;
2728
import org.apache.beam.sdk.coders.StringUtf8Coder;
2829
import org.apache.beam.sdk.coders.VarLongCoder;
2930
import org.apache.beam.sdk.util.ByteStringOutputStream;
31+
import org.apache.beam.sdk.util.HistogramData;
3032
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
3133
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
3234
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -179,4 +181,18 @@ public static double decodeDoubleCounter(ByteString payload) {
179181
throw new RuntimeException(e);
180182
}
181183
}
184+
185+
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
186+
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
187+
return inputHistogram.toProto().toByteString();
188+
}
189+
190+
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
191+
public static HistogramData decodeInt64Histogram(ByteString payload) {
192+
try {
193+
return new HistogramData(HistogramValue.parseFrom(payload));
194+
} catch (InvalidProtocolBufferException e) {
195+
throw new RuntimeException(e);
196+
}
197+
}
182198
}

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,44 @@
2222
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
2323
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
2424
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
25+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
2526
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
2627
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeBoundedTrie;
2728
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
2829
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
2930
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
3031
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
3132
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
33+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
3234
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
3335
import static org.junit.Assert.assertEquals;
3436

3537
import java.util.ArrayList;
3638
import java.util.Arrays;
3739
import java.util.Collections;
3840
import org.apache.beam.runners.core.metrics.BoundedTrieData.BoundedTrieNode;
41+
import org.apache.beam.sdk.testing.ExpectedLogs;
42+
import org.apache.beam.sdk.util.HistogramData;
43+
import org.apache.beam.sdk.util.HistogramData.HistogramParsingException;
3944
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
4045
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4146
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
4247
import org.joda.time.Instant;
48+
import org.junit.Rule;
4349
import org.junit.Test;
50+
import org.junit.rules.ExpectedException;
4451
import org.junit.runner.RunWith;
4552
import org.junit.runners.JUnit4;
4653

4754
/** Tests for {@link MonitoringInfoEncodings}. */
4855
@RunWith(JUnit4.class)
4956
public class MonitoringInfoEncodingsTest {
57+
@Rule
58+
public ExpectedLogs monitoringInfoCodingsExpectedLogs =
59+
ExpectedLogs.none(MonitoringInfoEncodings.class);
60+
61+
@Rule public ExpectedException thrown = ExpectedException.none();
62+
5063
@Test
5164
public void testInt64DistributionEncoding() {
5265
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
@@ -143,4 +156,36 @@ public void testDoubleCounterEncoding() {
143156
assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload);
144157
assertEquals(1.0, decodeDoubleCounter(payload), 0.001);
145158
}
159+
160+
@Test
161+
public void testHistgramInt64EncodingLinearHist() {
162+
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);
163+
164+
HistogramData inputHistogram = new HistogramData(buckets);
165+
inputHistogram.record(5, 10, 15, 20);
166+
ByteString payload = encodeInt64Histogram(inputHistogram);
167+
168+
assertEquals(inputHistogram, decodeInt64Histogram(payload));
169+
}
170+
171+
@Test
172+
public void testHistgramInt64EncodingExpHist() {
173+
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
174+
HistogramData inputHistogram = new HistogramData(buckets);
175+
inputHistogram.record(2, 4, 8, 16, 32);
176+
ByteString payload = encodeInt64Histogram(inputHistogram);
177+
assertEquals(inputHistogram, decodeInt64Histogram(payload));
178+
}
179+
180+
@Test
181+
public void testHistgramInt64EncodingUnsupportedBucket() {
182+
thrown.expect(HistogramParsingException.class);
183+
thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not recognized");
184+
185+
HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();
186+
187+
HistogramData inputHistogram = new HistogramData(buckets);
188+
inputHistogram.record(2, 4, 8, 16, 32);
189+
encodeInt64Histogram(inputHistogram);
190+
}
146191
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@
2424
import java.util.Arrays;
2525
import java.util.Objects;
2626
import javax.annotation.concurrent.GuardedBy;
27+
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue;
28+
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions;
29+
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions.Base2Exponent;
30+
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions.Linear;
31+
import org.apache.beam.sdk.annotations.Internal;
32+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2733
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath;
2834
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
2935
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -74,6 +80,43 @@ public HistogramData(BucketType bucketType) {
7480
this.sumOfSquaredDeviations = 0;
7581
}
7682

83+
/**
84+
* Create a histogram from HistogramValue proto.
85+
*
86+
* @param histogramProto HistogramValue proto used to populate stats for the histogram.
87+
*/
88+
public HistogramData(HistogramValue histogramProto) {
89+
int numBuckets;
90+
if (histogramProto.getBucketOptions().hasLinear()) {
91+
System.out.println("xxx its linear");
92+
double start = histogramProto.getBucketOptions().getLinear().getStart();
93+
double width = histogramProto.getBucketOptions().getLinear().getWidth();
94+
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
95+
this.bucketType = LinearBuckets.of(start, width, numBuckets);
96+
this.buckets = new long[bucketType.getNumBuckets()];
97+
98+
int idx = 0;
99+
for (long val : histogramProto.getBucketCountsList()) {
100+
this.buckets[idx] = val;
101+
this.numBoundedBucketRecords += val;
102+
idx++;
103+
}
104+
} else {
105+
System.out.println("xxx its exp");
106+
// Assume it's a exponential histogram if its not linear
107+
int scale = histogramProto.getBucketOptions().getExponential().getScale();
108+
numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets();
109+
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
110+
this.buckets = new long[bucketType.getNumBuckets()];
111+
int idx = 0;
112+
for (long val : histogramProto.getBucketCountsList()) {
113+
this.buckets[idx] = val;
114+
this.numBoundedBucketRecords += val;
115+
idx++;
116+
}
117+
}
118+
}
119+
77120
public BucketType getBucketType() {
78121
return this.bucketType;
79122
}
@@ -207,6 +250,10 @@ public synchronized HistogramData getAndReset() {
207250
return other;
208251
}
209252

253+
public synchronized long[] getBucketCount() {
254+
return buckets;
255+
}
256+
210257
public synchronized void record(double value) {
211258
double rangeTo = bucketType.getRangeTo();
212259
double rangeFrom = bucketType.getRangeFrom();
@@ -240,6 +287,64 @@ private synchronized void updateStatistics(double value) {
240287
sumOfSquaredDeviations += (value - mean) * (value - oldMean);
241288
}
242289

290+
public static class HistogramParsingException extends RuntimeException {
291+
public HistogramParsingException(String message) {
292+
super(message);
293+
}
294+
}
295+
296+
/** Converts this {@link HistogramData} to its proto {@link HistogramValue}. */
297+
public synchronized HistogramValue toProto() {
298+
HistogramValue.Builder builder = HistogramValue.newBuilder();
299+
// try {
300+
int numberOfBuckets = this.getBucketType().getNumBuckets();
301+
302+
if (this.getBucketType() instanceof HistogramData.LinearBuckets) {
303+
System.out.println("xxx linear buckets");
304+
HistogramData.LinearBuckets buckets = (HistogramData.LinearBuckets) this.getBucketType();
305+
Linear.Builder linearBuilder = Linear.newBuilder();
306+
linearBuilder.setNumberOfBuckets(numberOfBuckets);
307+
linearBuilder.setWidth(buckets.getWidth());
308+
linearBuilder.setStart(buckets.getStart());
309+
Linear linearOptions = linearBuilder.build();
310+
311+
BucketOptions.Builder bucketBuilder = BucketOptions.newBuilder();
312+
bucketBuilder.setLinear(linearOptions);
313+
builder.setBucketOptions(bucketBuilder.build());
314+
315+
} else if (this.getBucketType() instanceof HistogramData.ExponentialBuckets) {
316+
System.out.println("xxx exp buckets");
317+
HistogramData.ExponentialBuckets buckets =
318+
(HistogramData.ExponentialBuckets) this.getBucketType();
319+
320+
Base2Exponent.Builder base2ExpBuilder = Base2Exponent.newBuilder();
321+
base2ExpBuilder.setNumberOfBuckets(numberOfBuckets);
322+
base2ExpBuilder.setScale(buckets.getScale());
323+
Base2Exponent exponentialOptions = base2ExpBuilder.build();
324+
325+
BucketOptions.Builder bucketBuilder = BucketOptions.newBuilder();
326+
bucketBuilder.setExponential(exponentialOptions);
327+
builder.setBucketOptions(bucketBuilder.build());
328+
} else {
329+
throw new HistogramParsingException(
330+
"Unable to encode Int64 Histogram, bucket is not recognized");
331+
}
332+
333+
builder.setCount(this.getTotalCount());
334+
335+
for (long val : this.getBucketCount()) {
336+
builder.addBucketCounts(val);
337+
}
338+
System.out.println("xxxx " + builder.toString());
339+
return builder.build();
340+
}
341+
342+
// /** Creates a {@link HistogramData} instance from its proto {@link HistogramValue}. */
343+
// public static HistogramData fromProto(HistogramValue proto) {
344+
// HistgramValue value = new HistgramValue();
345+
// return new HistogramValue(proto);
346+
// }
347+
243348
/**
244349
* Increment the {@code numTopRecords} and update {@code topRecordsSum} when a new overflow value
245350
* is recorded. This function should only be called when a Histogram is recording a value greater
@@ -573,6 +678,42 @@ public double getRangeTo() {
573678
// Note: equals() and hashCode() are implemented by the AutoValue.
574679
}
575680

681+
/** Used for testing unsupported Bucket formats. */
682+
@AutoValue
683+
@Internal
684+
@VisibleForTesting
685+
public abstract static class UnsupportedBuckets implements BucketType {
686+
687+
public static UnsupportedBuckets of() {
688+
return new AutoValue_HistogramData_UnsupportedBuckets(0);
689+
}
690+
691+
@Override
692+
public int getBucketIndex(double value) {
693+
return 0;
694+
}
695+
696+
@Override
697+
public double getBucketSize(int index) {
698+
return 0;
699+
}
700+
701+
@Override
702+
public double getAccumulatedBucketSize(int index) {
703+
return 0;
704+
}
705+
706+
@Override
707+
public double getRangeFrom() {
708+
return 0;
709+
}
710+
711+
@Override
712+
public double getRangeTo() {
713+
return 0;
714+
}
715+
}
716+
576717
@Override
577718
public synchronized boolean equals(@Nullable Object object) {
578719
if (object instanceof HistogramData) {

0 commit comments

Comments
 (0)