Skip to content

Commit 69998f6

Browse files
author
Naireen
committed
update naming
1 parent 55316e5 commit 69998f6

File tree

21 files changed

+99
-137
lines changed

21 files changed

+99
-137
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,13 @@ message MonitoringInfoSpecs {
379379
]
380380
}];
381381

382-
USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
383-
urn: "beam:metric:user:per_worker_histogram_int64:v1",
384-
type: "beam:metrics:per_worker_histogram_int64:v1",
382+
USER_HISTOGRAM = 23 [(monitoring_info_spec) = {
383+
urn: "beam:metric:user:histogram_int64:v1",
384+
type: "beam:metrics:histogram_int64:v1",
385385
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
386386
annotations: [{
387387
key: "description",
388-
value: "URN utilized to report per worker histogram metric."
388+
value: "URN utilized to report histogram metric."
389389
}]
390390
}];
391391
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,21 @@ public class DefaultMetricResults extends MetricResults {
4545
private final Iterable<MetricResult<GaugeResult>> gauges;
4646
private final Iterable<MetricResult<StringSetResult>> stringSets;
4747
private final Iterable<MetricResult<BoundedTrieResult>> boundedTries;
48-
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;
48+
private final Iterable<MetricResult<HistogramData>> histograms;
4949

5050
public DefaultMetricResults(
5151
Iterable<MetricResult<Long>> counters,
5252
Iterable<MetricResult<DistributionResult>> distributions,
5353
Iterable<MetricResult<GaugeResult>> gauges,
5454
Iterable<MetricResult<StringSetResult>> stringSets,
5555
Iterable<MetricResult<BoundedTrieResult>> boundedTries,
56-
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
56+
Iterable<MetricResult<HistogramData>> histograms) {
5757
this.counters = counters;
5858
this.distributions = distributions;
5959
this.gauges = gauges;
6060
this.stringSets = stringSets;
6161
this.boundedTries = boundedTries;
62-
this.perWorkerHistograms = perWorkerHistograms;
62+
this.histograms = histograms;
6363
}
6464

6565
@Override
@@ -73,9 +73,7 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
7373
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
7474
Iterables.filter(
7575
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())),
76-
Iterables.filter(
77-
perWorkerHistograms,
78-
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
79-
76+
Iterables.filter(
77+
histograms, histogram -> MetricFiltering.matches(filter, histogram.getKey())));
8078
}
8179
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
7272
public abstract Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates();
7373

7474
/** All the histogram updates. */
75-
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();
75+
public abstract Iterable<MetricUpdate<HistogramData>> histogramsUpdates();
7676

7777
/** Create a new {@link MetricUpdates} bundle. */
7878
public static MetricUpdates create(
@@ -81,13 +81,14 @@ public static MetricUpdates create(
8181
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
8282
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
8383
Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates,
84-
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
84+
Iterable<MetricUpdate<HistogramData>> histogramsUpdates) {
8585
return new AutoValue_MetricUpdates(
8686
counterUpdates,
8787
distributionUpdates,
8888
gaugeUpdates,
89-
stringSetUpdates, boundedTrieUpdates,
90-
perWorkerHistogramsUpdates);
89+
stringSetUpdates,
90+
boundedTrieUpdates,
91+
histogramsUpdates);
9192
}
9293

9394
/** Returns true if there are no updates in this MetricUpdates object. */
@@ -97,6 +98,6 @@ public boolean isEmpty() {
9798
&& Iterables.isEmpty(gaugeUpdates())
9899
&& Iterables.isEmpty(stringSetUpdates())
99100
&& Iterables.isEmpty(boundedTrieUpdates())
100-
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
101+
&& Iterables.isEmpty(histogramsUpdates());
101102
}
102103
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java

Lines changed: 17 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.BOUNDED_TRIE_TYPE;
2121
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
22+
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE;
2223
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
23-
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE;
2424
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
2525
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
2626
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeBoundedTrie;
@@ -97,9 +97,6 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
9797

9898
private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);
9999

100-
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
101-
new MetricsMap<>(HistogramCell::new);
102-
103100
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
104101
new MetricsMap<>(HistogramCell::new);
105102

@@ -247,20 +244,8 @@ public BoundedTrieCell getBoundedTrie(MetricName metricName) {
247244
return boundedTries.tryGet(metricName);
248245
}
249246

250-
/**
251-
* Return the {@link Histogram} that should be used for implementing the given per-worker {@code
252-
* metricName} in this container.
253-
*/
254-
@Override
255-
public HistogramCell getPerWorkerHistogram(
256-
MetricName metricName, HistogramData.BucketType bucketType) {
257-
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
258-
return val;
259-
}
260-
261-
public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
262-
getPerWorkerHistogram() {
263-
return perWorkerHistograms;
247+
public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> getHistogram() {
248+
return histograms;
264249
}
265250

266251
private <UpdateT, CellT extends MetricCell<UpdateT>>
@@ -303,7 +288,7 @@ public MetricUpdates getUpdates() {
303288
extractUpdates(gauges),
304289
extractUpdates(stringSets),
305290
extractUpdates(boundedTries),
306-
extractHistogramUpdates(perWorkerHistograms));
291+
extractHistogramUpdates(histograms));
307292
}
308293

309294
/** @return The MonitoringInfo metadata from the metric. */
@@ -434,8 +419,8 @@ public MetricUpdates getUpdates() {
434419
private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) {
435420
return metricToMonitoringMetadata(
436421
metricKey,
437-
MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE,
438-
MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM);
422+
MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE,
423+
MonitoringInfoConstants.Urns.USER_HISTOGRAM);
439424
}
440425

441426
/**
@@ -507,7 +492,7 @@ public Iterable<MonitoringInfo> getMonitoringInfos() {
507492
}
508493
}
509494

510-
for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) {
495+
for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.histogramsUpdates()) {
511496
MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate);
512497
if (mi != null) {
513498
monitoringInfos.add(mi);
@@ -566,7 +551,7 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
566551
}
567552
}
568553
});
569-
perWorkerHistograms.forEach(
554+
histograms.forEach(
570555
(metricName, histogramCell) -> {
571556
if (histogramCell.getDirty().beforeCommit()) {
572557
String shortId =
@@ -612,7 +597,7 @@ public void commitUpdates() {
612597
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
613598
stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit());
614599
boundedTries.forEachValue(bTrie -> bTrie.getDirty().afterCommit());
615-
perWorkerHistograms.forEachValue(
600+
histograms.forEachValue(
616601
histogram -> {
617602
histogram.getDirty().afterCommit();
618603
});
@@ -652,7 +637,7 @@ public MetricUpdates getCumulative() {
652637
extractCumulatives(gauges),
653638
extractCumulatives(stringSets),
654639
extractCumulatives(boundedTries),
655-
extractHistogramCumulatives(perWorkerHistograms));
640+
extractHistogramCumulatives(histograms));
656641
}
657642

658643
/** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
@@ -696,10 +681,10 @@ private void updateForBoundedTrieType(MonitoringInfo monitoringInfo) {
696681
boundedTrie.update(decodeBoundedTrie(monitoringInfo.getPayload()));
697682
}
698683

699-
private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) {
684+
private void updateForHistogramInt64(MonitoringInfo monitoringInfo) {
700685
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
701686
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
702-
Histogram histogram = getPerWorkerHistogram(metricName, buckets);
687+
Histogram histogram = getHistogram(metricName, buckets);
703688
HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload());
704689
histogram.update(data);
705690
}
@@ -732,8 +717,8 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
732717
updateForBoundedTrieType(monitoringInfo);
733718
break;
734719

735-
case PER_WORKER_HISTOGRAM_TYPE:
736-
updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info
720+
case HISTOGRAM_TYPE:
721+
updateForHistogramInt64(monitoringInfo); // use type, and not urn info
737722
break;
738723
default:
739724
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
@@ -785,14 +770,15 @@ public boolean equals(@Nullable Object object) {
785770
&& Objects.equals(gauges, metricsContainerImpl.gauges)
786771
&& Objects.equals(stringSets, metricsContainerImpl.stringSets)
787772
&& Objects.equals(boundedTries, metricsContainerImpl.boundedTries)
788-
&& Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms);
773+
&& Objects.equals(histograms, metricsContainerImpl.histograms);
789774
}
790775
return false;
791776
}
792777

793778
@Override
794779
public int hashCode() {
795-
return Objects.hash(stepName, counters, distributions, gauges, stringSets, boundedTries, perWorkerHistograms);
780+
return Objects.hash(
781+
stepName, counters, distributions, gauges, stringSets, boundedTries, histograms);
796782
}
797783

798784
/**
@@ -924,21 +910,6 @@ public static MetricsContainerImpl deltaContainer(
924910
deltaValueCell.incTopBucketCount(
925911
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
926912
}
927-
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
928-
curr.perWorkerHistograms.entries()) {
929-
HistogramData.BucketType bt = cell.getKey().getValue();
930-
HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative();
931-
HistogramData currValue = cell.getValue().getCumulative();
932-
HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey());
933-
deltaValueCell.incBottomBucketCount(
934-
currValue.getBottomBucketCount() - prevValue.getBottomBucketCount());
935-
for (int i = 0; i < bt.getNumBuckets(); i++) {
936-
Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i);
937-
deltaValueCell.incBucketCount(i, bucketCountDelta);
938-
}
939-
deltaValueCell.incTopBucketCount(
940-
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
941-
}
942913

943914
for (Map.Entry<MetricName, StringSetCell> cell : curr.stringSets.entries()) {
944915
// Simply take the most recent value for stringSets, no need to count deltas.

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public static MetricResults asMetricResults(
139139
Map<MetricKey, MetricResult<GaugeData>> gauges = new HashMap<>();
140140
Map<MetricKey, MetricResult<StringSetData>> sets = new HashMap<>();
141141
Map<MetricKey, MetricResult<BoundedTrieData>> boundedTries = new HashMap<>();
142-
Map<MetricKey, MetricResult<HistogramData>> perWorkerHistograms = new HashMap<>();
142+
Map<MetricKey, MetricResult<HistogramData>> histograms = new HashMap<>();
143143

144144
attemptedMetricsContainers.forEachMetricContainer(
145145
container -> {
@@ -151,8 +151,7 @@ public static MetricResults asMetricResults(
151151
mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
152152
mergeAttemptedResults(
153153
boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine);
154-
mergeAttemptedResults(
155-
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
154+
mergeAttemptedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine);
156155
});
157156
committedMetricsContainers.forEachMetricContainer(
158157
container -> {
@@ -164,8 +163,7 @@ public static MetricResults asMetricResults(
164163
mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
165164
mergeCommittedResults(
166165
boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine);
167-
mergeCommittedResults(
168-
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
166+
mergeCommittedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine);
169167
});
170168

171169
return new DefaultMetricResults(
@@ -182,7 +180,7 @@ public static MetricResults asMetricResults(
182180
boundedTries.values().stream()
183181
.map(result -> result.transform(BoundedTrieData::extractResult))
184182
.collect(toList()),
185-
perWorkerHistograms.values().stream()
183+
histograms.values().stream()
186184
.map(result -> result.transform(HistogramData::extractResult))
187185
.collect(toList()));
188186
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ public static final class Urns {
5656
extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
5757
public static final String USER_BOUNDED_TRIE =
5858
extractUrn(MonitoringInfoSpecs.Enum.USER_BOUNDED_TRIE);
59-
public static final String USER_PER_WORKER_HISTOGRAM =
60-
extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM);
59+
public static final String USER_HISTOGRAM = extractUrn(MonitoringInfoSpecs.Enum.USER_HISTOGRAM);
6160
public static final String SAMPLED_BYTE_SIZE =
6261
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
6362
public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
@@ -170,8 +169,7 @@ public static final class TypeUrns {
170169
public static final String PROGRESS_TYPE = "beam:metrics:progress:v1";
171170
public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1";
172171
public static final String BOUNDED_TRIE_TYPE = "beam:metrics:bounded_trie:v1";
173-
public static final String PER_WORKER_HISTOGRAM_TYPE =
174-
"beam:metrics:per_worker_histogram_int64:v1";
172+
public static final String HISTOGRAM_TYPE = "beam:metrics:histogram_int64:v1";
175173

176174
static {
177175
// Validate that compile time constants match the values stored in the protos.
@@ -200,9 +198,7 @@ public static final class TypeUrns {
200198
checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE)));
201199
checkArgument(
202200
BOUNDED_TRIE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOUNDED_TRIE_TYPE)));
203-
checkArgument(
204-
PER_WORKER_HISTOGRAM_TYPE.equals(
205-
getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM)));
201+
checkArgument(HISTOGRAM_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.HISTOGRAM)));
206202
}
207203
}
208204

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,11 @@ public SimpleMonitoringInfoBuilder setBoundedTrieValue(BoundedTrieData value) {
173173
}
174174

175175
/**
176-
* Encodes the value and sets the type to {@link
177-
* MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}.
176+
* Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM_TYPE}.
178177
*/
179178
public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) {
180179
this.builder.setPayload(encodeInt64Histogram(data));
181-
this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE);
180+
this.builder.setType(MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE);
182181
return this;
183182
}
184183

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,6 @@ public void testDeltaCounters() {
441441
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
442442
MetricName hName = MetricName.named("namespace", "histogram");
443443
MetricName stringSetName = MetricName.named("namespace", "stringset");
444-
MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram");
445444

446445
MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
447446
prevContainer.getCounter(cName).inc(2L);
@@ -453,10 +452,6 @@ public void testDeltaCounters() {
453452
prevContainer.getHistogram(hName, bucketType).update(3);
454453
prevContainer.getHistogram(hName, bucketType).update(20);
455454

456-
// Set PerWorkerBucketCounts to [0,1,1,0,0,0,0]
457-
prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1);
458-
prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3);
459-
460455
MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
461456
nextContainer.getCounter(cName).inc(9L);
462457
nextContainer.getGauge(gName).set(8L);
@@ -475,10 +470,6 @@ public void testDeltaCounters() {
475470
nextContainer.getHistogram(hName, bucketType).update(20);
476471
nextContainer.getHistogram(hName, bucketType).update(20);
477472

478-
// Set PerWorkerBucketCounts to [1,0,0,0,0,0,1]
479-
nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1);
480-
nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20);
481-
482473
MetricsContainerImpl deltaContainer =
483474
MetricsContainerImpl.deltaContainer(prevContainer, nextContainer);
484475
// Expect counter value: 7 = 9 - 2
@@ -504,20 +495,6 @@ public void testDeltaCounters() {
504495
}
505496
assertEquals(
506497
2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount());
507-
508-
// Expect per worker bucket counts: [1,0,0,0,0,0,1]
509-
assertEquals(
510-
1,
511-
deltaContainer
512-
.getPerWorkerHistogram(pwhName, bucketType)
513-
.getCumulative()
514-
.getBottomBucketCount());
515-
assertEquals(
516-
1,
517-
deltaContainer
518-
.getPerWorkerHistogram(pwhName, bucketType)
519-
.getCumulative()
520-
.getTopBucketCount());
521498
}
522499

523500
@Test

0 commit comments

Comments
 (0)