Skip to content

Commit e0dcf4c

Browse files
author
Naireen
committed
Add per worker histogram to portable runner
1 parent b387721 commit e0dcf4c

File tree

22 files changed

+330
-38
lines changed

22 files changed

+330
-38
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,17 @@ message MonitoringInfoSpecs {
367367
}
368368
]
369369
}];
370+
371+
372+
USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
373+
urn: "beam:metric:user:per_worker_histogram_int64:v1",
374+
type: "beam:metrics:per_worker_histogram_int64:v1",
375+
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
376+
annotations: [{
377+
key: "description",
378+
value: "URN utilized to report user metric."
379+
}]
380+
}];
370381
}
371382
}
372383

@@ -576,6 +587,10 @@ message MonitoringInfoTypeUrns {
576587
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
577588
"beam:metrics:set_string:v1"];
578589

590+
PER_WORKER_HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
591+
"beam:metrics:per_worker_histogram_int64:v1"];
592+
593+
579594
// General monitored state information which contains structured information
580595
// which does not fit into a typical metric format. See MonitoringTableData
581596
// for more details.

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.metrics.MetricResults;
2626
import org.apache.beam.sdk.metrics.MetricsFilter;
2727
import org.apache.beam.sdk.metrics.StringSetResult;
28+
import org.apache.beam.sdk.util.HistogramData;
2829
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2930
import org.checkerframework.checker.nullness.qual.Nullable;
3031

@@ -42,16 +43,19 @@ public class DefaultMetricResults extends MetricResults {
4243
private final Iterable<MetricResult<DistributionResult>> distributions;
4344
private final Iterable<MetricResult<GaugeResult>> gauges;
4445
private final Iterable<MetricResult<StringSetResult>> stringSets;
46+
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;
4547

4648
public DefaultMetricResults(
4749
Iterable<MetricResult<Long>> counters,
4850
Iterable<MetricResult<DistributionResult>> distributions,
4951
Iterable<MetricResult<GaugeResult>> gauges,
50-
Iterable<MetricResult<StringSetResult>> stringSets) {
52+
Iterable<MetricResult<StringSetResult>> stringSets,
53+
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
5154
this.counters = counters;
5255
this.distributions = distributions;
5356
this.gauges = gauges;
5457
this.stringSets = stringSets;
58+
this.perWorkerHistograms = perWorkerHistograms;
5559
}
5660

5761
@Override
@@ -62,6 +66,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
6266
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
6367
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
6468
Iterables.filter(
65-
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
69+
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
70+
Iterables.filter(
71+
perWorkerHistograms,
72+
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
6673
}
6774
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public void update(HistogramCell other) {
7070
dirty.afterModification();
7171
}
7272

73+
@Override
74+
public void update(HistogramData data) {
75+
this.value.update(data);
76+
dirty.afterModification();
77+
}
78+
7379
// TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing
7480
// the infinite buckets as well.
7581
// and remove the incTopBucketCount and incBotBucketCount methods.

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Serializable;
2222
import java.util.Collections;
2323
import org.apache.beam.sdk.metrics.MetricKey;
24+
import org.apache.beam.sdk.util.HistogramData;
2425
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2526

2627
/** Representation of multiple metric updates. */
@@ -34,6 +35,7 @@ public abstract class MetricUpdates {
3435
Collections.emptyList(),
3536
Collections.emptyList(),
3637
Collections.emptyList(),
38+
Collections.emptyList(),
3739
Collections.emptyList());
3840

3941
/**
@@ -66,21 +68,30 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
6668
/** All the sets updates. */
6769
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();
6870

71+
/** All the histogram updates. */
72+
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();
73+
6974
/** Create a new {@link MetricUpdates} bundle. */
7075
public static MetricUpdates create(
7176
Iterable<MetricUpdate<Long>> counterUpdates,
7277
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
7378
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
74-
Iterable<MetricUpdate<StringSetData>> stringSetUpdates) {
79+
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
80+
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
7581
return new AutoValue_MetricUpdates(
76-
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates);
82+
counterUpdates,
83+
distributionUpdates,
84+
gaugeUpdates,
85+
stringSetUpdates,
86+
perWorkerHistogramsUpdates);
7787
}
7888

7989
/** Returns true if there are no updates in this MetricUpdates object. */
8090
public boolean isEmpty() {
8191
return Iterables.isEmpty(counterUpdates())
8292
&& Iterables.isEmpty(distributionUpdates())
8393
&& Iterables.isEmpty(gaugeUpdates())
84-
&& Iterables.isEmpty(stringSetUpdates());
94+
&& Iterables.isEmpty(stringSetUpdates())
95+
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
8596
}
8697
}

0 commit comments

Comments
 (0)