Skip to content

Commit 55316e5

Browse files
author
Naireen
committed
Add histogram metric to portable runner
1 parent ea40d9e commit 55316e5

File tree

23 files changed

+318
-32
lines changed

23 files changed

+318
-32
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,16 @@ message MonitoringInfoSpecs {
378378
}
379379
]
380380
}];
381+
382+
USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
383+
urn: "beam:metric:user:per_worker_histogram_int64:v1",
384+
type: "beam:metrics:per_worker_histogram_int64:v1",
385+
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
386+
annotations: [{
387+
key: "description",
388+
value: "URN utilized to report per worker histogram metric."
389+
}]
390+
}];
381391
}
382392
}
383393

@@ -593,6 +603,12 @@ message MonitoringInfoTypeUrns {
593603
BOUNDED_TRIE_TYPE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
594604
"beam:metrics:bounded_trie:v1"];
595605

606+
// Represents a histogram.
607+
//
608+
// Encoding: DataflowHistogram proto
609+
HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
610+
"beam:metrics:histogram_int64:v1"];
611+
596612
// General monitored state information which contains structured information
597613
// which does not fit into a typical metric format. See MonitoringTableData
598614
// for more details.

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.beam.sdk.metrics.MetricResults;
2727
import org.apache.beam.sdk.metrics.MetricsFilter;
2828
import org.apache.beam.sdk.metrics.StringSetResult;
29+
import org.apache.beam.sdk.util.HistogramData;
2930
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
3031
import org.checkerframework.checker.nullness.qual.Nullable;
3132

@@ -44,18 +45,21 @@ public class DefaultMetricResults extends MetricResults {
4445
private final Iterable<MetricResult<GaugeResult>> gauges;
4546
private final Iterable<MetricResult<StringSetResult>> stringSets;
4647
private final Iterable<MetricResult<BoundedTrieResult>> boundedTries;
48+
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;
4749

4850
public DefaultMetricResults(
4951
Iterable<MetricResult<Long>> counters,
5052
Iterable<MetricResult<DistributionResult>> distributions,
5153
Iterable<MetricResult<GaugeResult>> gauges,
5254
Iterable<MetricResult<StringSetResult>> stringSets,
53-
Iterable<MetricResult<BoundedTrieResult>> boundedTries) {
55+
Iterable<MetricResult<BoundedTrieResult>> boundedTries,
56+
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
5457
this.counters = counters;
5558
this.distributions = distributions;
5659
this.gauges = gauges;
5760
this.stringSets = stringSets;
5861
this.boundedTries = boundedTries;
62+
this.perWorkerHistograms = perWorkerHistograms;
5963
}
6064

6165
@Override
@@ -68,6 +72,10 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
6872
Iterables.filter(
6973
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
7074
Iterables.filter(
71-
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())));
75+
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())),
76+
Iterables.filter(
77+
perWorkerHistograms,
78+
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
79+
7280
}
7381
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public void update(HistogramCell other) {
7070
dirty.afterModification();
7171
}
7272

73+
@Override
74+
public void update(HistogramData data) {
75+
this.value.update(data);
76+
dirty.afterModification();
77+
}
78+
7379
// TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing
7480
// the infinite buckets as well.
7581
// and remove the incTopBucketCount and incBotBucketCount methods.

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Serializable;
2222
import java.util.Collections;
2323
import org.apache.beam.sdk.metrics.MetricKey;
24+
import org.apache.beam.sdk.util.HistogramData;
2425
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2526

2627
/** Representation of multiple metric updates. */
@@ -35,6 +36,7 @@ public abstract class MetricUpdates {
3536
Collections.emptyList(),
3637
Collections.emptyList(),
3738
Collections.emptyList(),
39+
Collections.emptyList(),
3840
Collections.emptyList());
3941

4042
/**
@@ -69,15 +71,23 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
6971

7072
public abstract Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates();
7173

74+
/** All the histogram updates. */
75+
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();
76+
7277
/** Create a new {@link MetricUpdates} bundle. */
7378
public static MetricUpdates create(
7479
Iterable<MetricUpdate<Long>> counterUpdates,
7580
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
7681
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
7782
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
78-
Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates) {
83+
Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates,
84+
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
7985
return new AutoValue_MetricUpdates(
80-
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates, boundedTrieUpdates);
86+
counterUpdates,
87+
distributionUpdates,
88+
gaugeUpdates,
89+
stringSetUpdates, boundedTrieUpdates,
90+
perWorkerHistogramsUpdates);
8191
}
8292

8393
/** Returns true if there are no updates in this MetricUpdates object. */
@@ -86,6 +96,7 @@ public boolean isEmpty() {
8696
&& Iterables.isEmpty(distributionUpdates())
8797
&& Iterables.isEmpty(gaugeUpdates())
8898
&& Iterables.isEmpty(stringSetUpdates())
89-
&& Iterables.isEmpty(boundedTrieUpdates());
99+
&& Iterables.isEmpty(boundedTrieUpdates())
100+
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
90101
}
91102
}

0 commit comments

Comments
 (0)