Skip to content

Commit b387721

Browse files
author
Naireen
committed
add ability to encode and decode histogram data to portable runners
1 parent d760383 commit b387721

File tree

6 files changed

+224
-0
lines changed

6 files changed

+224
-0
lines changed

runners/core-java/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependencies {
4242
implementation project(path: ":model:pipeline", configuration: "shadow")
4343
implementation project(path: ":sdks:java:core", configuration: "shadow")
4444
implementation project(path: ":model:job-management", configuration: "shadow")
45+
implementation library.java.google_api_services_dataflow
4546
implementation library.java.vendored_guava_32_1_2_jre
4647
implementation library.java.joda_time
4748
implementation library.java.vendored_grpc_1_60_1
@@ -52,5 +53,6 @@ dependencies {
5253
testImplementation library.java.junit
5354
testImplementation library.java.mockito_core
5455
testImplementation library.java.slf4j_api
56+
testImplementation(library.java.google_api_services_dataflow)
5557
testRuntimeOnly library.java.slf4j_simple
5658
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,34 @@
1717
*/
1818
package org.apache.beam.runners.core.metrics;
1919

20+
import com.fasterxml.jackson.databind.DeserializationFeature;
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.google.api.services.dataflow.model.Base2Exponent;
24+
import com.google.api.services.dataflow.model.BucketOptions;
25+
import com.google.api.services.dataflow.model.DataflowHistogramValue;
26+
import com.google.api.services.dataflow.model.Linear;
2027
import java.io.IOException;
2128
import java.io.InputStream;
29+
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.Iterator;
32+
import java.util.List;
2233
import java.util.Set;
2334
import org.apache.beam.sdk.coders.Coder;
2435
import org.apache.beam.sdk.coders.DoubleCoder;
2536
import org.apache.beam.sdk.coders.IterableCoder;
2637
import org.apache.beam.sdk.coders.StringUtf8Coder;
2738
import org.apache.beam.sdk.coders.VarLongCoder;
2839
import org.apache.beam.sdk.util.ByteStringOutputStream;
40+
import org.apache.beam.sdk.util.HistogramData;
2941
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
3042
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
3143
import org.joda.time.Instant;
3244

45+
// TODO(naireenhussain): Refactor out DataflowHistogramValue to be runner agnostic, and rename to
46+
// remove Dataflow reference.
47+
3348
/** A set of functions used to encode and decode common monitoring info types. */
3449
public class MonitoringInfoEncodings {
3550
private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString payload) {
163178
throw new RuntimeException(e);
164179
}
165180
}
181+
182+
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
183+
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
184+
try {
185+
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
186+
187+
DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
188+
189+
if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
190+
HistogramData.LinearBuckets buckets =
191+
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
192+
Linear linear = new Linear();
193+
linear.setNumberOfBuckets(numberOfBuckets);
194+
linear.setWidth(buckets.getWidth());
195+
linear.setStart(buckets.getStart());
196+
outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
197+
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
198+
HistogramData.ExponentialBuckets buckets =
199+
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
200+
Base2Exponent base2Exp = new Base2Exponent();
201+
base2Exp.setNumberOfBuckets(numberOfBuckets);
202+
base2Exp.setScale(buckets.getScale());
203+
outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
204+
} else {
205+
throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
206+
}
207+
208+
outputHistogram2.setCount(inputHistogram.getTotalCount());
209+
210+
List<Long> bucketCounts = new ArrayList<>();
211+
212+
Arrays.stream(inputHistogram.getBucketCount())
213+
.forEach(
214+
val -> {
215+
bucketCounts.add(val);
216+
});
217+
218+
outputHistogram2.setBucketCounts(bucketCounts);
219+
220+
ObjectMapper objectMapper = new ObjectMapper();
221+
String jsonString = objectMapper.writeValueAsString(outputHistogram2);
222+
223+
return ByteString.copyFromUtf8(jsonString);
224+
} catch (Exception e) {
225+
throw new RuntimeException(e);
226+
}
227+
}
228+
229+
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
230+
public static HistogramData decodeInt64Histogram(ByteString payload) {
231+
try {
232+
ObjectMapper objectMapper = new ObjectMapper();
233+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
234+
JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
235+
DataflowHistogramValue newHist = new DataflowHistogramValue();
236+
newHist.setCount(jsonNode.get("count").asLong());
237+
238+
List<Long> bucketCounts = new ArrayList<>();
239+
Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
240+
while (itr.hasNext()) {
241+
Long item = itr.next().asLong();
242+
bucketCounts.add(item);
243+
}
244+
newHist.setBucketCounts(bucketCounts);
245+
246+
if (jsonNode.get("bucketOptions").has("linear")) {
247+
Linear linear = new Linear();
248+
JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
249+
linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
250+
linear.setWidth(linearNode.get("width").asDouble());
251+
linear.setStart(linearNode.get("start").asDouble());
252+
newHist.setBucketOptions(new BucketOptions().setLinear(linear));
253+
} else if (jsonNode.get("bucketOptions").has("exponential")) {
254+
Base2Exponent base2Exp = new Base2Exponent();
255+
JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
256+
base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
257+
base2Exp.setScale(expNode.get("scale").asInt());
258+
newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
259+
} else {
260+
throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
261+
}
262+
return new HistogramData(newHist);
263+
} catch (IOException e) {
264+
throw new RuntimeException(e);
265+
}
266+
}
166267
}

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,38 @@
2121
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
2222
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
2323
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
24+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
2425
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
2526
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
2627
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
2728
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
2829
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
2930
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
31+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
3032
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
3133
import static org.junit.Assert.assertEquals;
3234

3335
import java.util.Collections;
36+
import org.apache.beam.sdk.testing.ExpectedLogs;
37+
import org.apache.beam.sdk.util.HistogramData;
3438
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
3539
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
3640
import org.joda.time.Instant;
41+
import org.junit.Rule;
3742
import org.junit.Test;
43+
import org.junit.rules.ExpectedException;
3844
import org.junit.runner.RunWith;
3945
import org.junit.runners.JUnit4;
4046

4147
/** Tests for {@link MonitoringInfoEncodings}. */
4248
@RunWith(JUnit4.class)
4349
public class MonitoringInfoEncodingsTest {
50+
@Rule
51+
public ExpectedLogs monitoringInfoCodingsExpectedLogs =
52+
ExpectedLogs.none(MonitoringInfoEncodings.class);
53+
54+
@Rule public ExpectedException thrown = ExpectedException.none();
55+
4456
@Test
4557
public void testInt64DistributionEncoding() {
4658
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
@@ -105,4 +117,36 @@ public void testDoubleCounterEncoding() {
105117
assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload);
106118
assertEquals(1.0, decodeDoubleCounter(payload), 0.001);
107119
}
120+
121+
@Test
122+
public void testHistgramInt64EncodingLinearHist() {
123+
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);
124+
125+
HistogramData inputHistogram = new HistogramData(buckets);
126+
inputHistogram.record(5, 10, 15, 20);
127+
ByteString payload = encodeInt64Histogram(inputHistogram);
128+
129+
assertEquals(inputHistogram, decodeInt64Histogram(payload));
130+
}
131+
132+
@Test
133+
public void testHistgramInt64EncodingExpHist() {
134+
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
135+
HistogramData inputHistogram = new HistogramData(buckets);
136+
inputHistogram.record(2, 4, 8, 16, 32);
137+
ByteString payload = encodeInt64Histogram(inputHistogram);
138+
assertEquals(inputHistogram, decodeInt64Histogram(payload));
139+
}
140+
141+
@Test
142+
public void testHistgramInt64EncodingUnsupportedBucket() {
143+
thrown.expect(Exception.class);
144+
thrown.expectMessage("Unable to parse histogram, bucket is not recognized");
145+
146+
HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();
147+
148+
HistogramData inputHistogram = new HistogramData(buckets);
149+
inputHistogram.record(2, 4, 8, 16, 32);
150+
encodeInt64Histogram(inputHistogram);
151+
}
108152
}

sdks/java/core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ dependencies {
7575
permitUnusedDeclared library.java.antlr
7676
permitUsedUndeclared library.java.antlr_runtime
7777
// Required to load constants from the model, e.g. max timestamp for global window
78+
provided library.java.google_api_services_dataflow
7879
shadow project(path: ":model:pipeline", configuration: "shadow")
7980
shadow project(path: ":model:fn-execution", configuration: "shadow")
8081
shadow project(path: ":model:job-management", configuration: "shadow")

sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,42 @@ public HistogramData(BucketType bucketType) {
7474
this.sumOfSquaredDeviations = 0;
7575
}
7676

77+
/**
78+
* Create a histogram from DataflowHistogramValue proto.
79+
*
80+
* @param histogramProto DataflowHistogramValue proto used to populate stats for the histogram.
81+
*/
82+
public HistogramData(
83+
com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto) {
84+
int numBuckets;
85+
if (histogramProto.getBucketOptions().getLinear() != null) {
86+
double start = histogramProto.getBucketOptions().getLinear().getStart();
87+
double width = histogramProto.getBucketOptions().getLinear().getWidth();
88+
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
89+
this.bucketType = LinearBuckets.of(start, width, numBuckets);
90+
this.buckets = new long[bucketType.getNumBuckets()];
91+
92+
int idx = 0;
93+
for (long val : histogramProto.getBucketCounts()) {
94+
this.buckets[idx] = val;
95+
this.numBoundedBucketRecords += val;
96+
idx++;
97+
}
98+
} else {
99+
// Assume it's a exponential histogram if its not linear
100+
int scale = histogramProto.getBucketOptions().getExponential().getScale();
101+
numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets();
102+
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
103+
this.buckets = new long[bucketType.getNumBuckets()];
104+
int idx = 0;
105+
for (long val : histogramProto.getBucketCounts()) {
106+
this.buckets[idx] = val;
107+
this.numBoundedBucketRecords += val;
108+
idx++;
109+
}
110+
}
111+
}
112+
77113
public BucketType getBucketType() {
78114
return this.bucketType;
79115
}
@@ -293,6 +329,10 @@ public synchronized long getTopBucketCount() {
293329
return numTopRecords;
294330
}
295331

332+
public synchronized long[] getBucketCount() {
333+
return buckets;
334+
}
335+
296336
public synchronized double getTopBucketMean() {
297337
return numTopRecords == 0 ? 0 : topRecordsSum / numTopRecords;
298338
}
@@ -573,6 +613,40 @@ public double getRangeTo() {
573613
// Note: equals() and hashCode() are implemented by the AutoValue.
574614
}
575615

616+
// Used for testing unsupported Bucket formats
617+
@AutoValue
618+
public abstract static class UnsupportedBuckets implements BucketType {
619+
620+
public static UnsupportedBuckets of() {
621+
return new AutoValue_HistogramData_UnsupportedBuckets(0);
622+
}
623+
624+
@Override
625+
public int getBucketIndex(double value) {
626+
return 0;
627+
}
628+
629+
@Override
630+
public double getBucketSize(int index) {
631+
return 0;
632+
}
633+
634+
@Override
635+
public double getAccumulatedBucketSize(int index) {
636+
return 0;
637+
}
638+
639+
@Override
640+
public double getRangeFrom() {
641+
return 0;
642+
}
643+
644+
@Override
645+
public double getRangeTo() {
646+
return 0;
647+
}
648+
}
649+
576650
@Override
577651
public synchronized boolean equals(@Nullable Object object) {
578652
if (object instanceof HistogramData) {

sdks/java/harness/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencies {
3030
provided project(path: ":model:pipeline", configuration: "shadow")
3131
provided project(path: ":sdks:java:core", configuration: "shadow")
3232
provided project(path: ":sdks:java:transform-service:launcher")
33+
provided library.java.google_api_services_dataflow
3334
provided library.java.avro
3435
provided library.java.jackson_databind
3536
provided library.java.joda_time
@@ -79,4 +80,5 @@ dependencies {
7980
shadowTest project(path: ":sdks:java:core", configuration: "shadowTest")
8081
shadowTestRuntimeClasspath library.java.slf4j_jdk14
8182
permitUnusedDeclared library.java.avro
83+
permitUnusedDeclared library.java.google_api_services_dataflow
8284
}

0 commit comments

Comments
 (0)