1919
2020import static org .apache .beam .runners .core .metrics .MonitoringInfoConstants .TypeUrns .BOUNDED_TRIE_TYPE ;
2121import static org .apache .beam .runners .core .metrics .MonitoringInfoConstants .TypeUrns .DISTRIBUTION_INT64_TYPE ;
22+ import static org .apache .beam .runners .core .metrics .MonitoringInfoConstants .TypeUrns .HISTOGRAM_TYPE ;
2223import static org .apache .beam .runners .core .metrics .MonitoringInfoConstants .TypeUrns .LATEST_INT64_TYPE ;
2324import static org .apache .beam .runners .core .metrics .MonitoringInfoConstants .TypeUrns .SET_STRING_TYPE ;
2425import static org .apache .beam .runners .core .metrics .MonitoringInfoConstants .TypeUrns .SUM_INT64_TYPE ;
2526import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .decodeBoundedTrie ;
2627import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .decodeInt64Counter ;
2728import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .decodeInt64Distribution ;
2829import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .decodeInt64Gauge ;
30+ import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .decodeInt64Histogram ;
2931import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .decodeStringSet ;
3032import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .encodeBoundedTrie ;
3133import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .encodeInt64Counter ;
3234import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .encodeInt64Distribution ;
3335import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .encodeInt64Gauge ;
36+ import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .encodeInt64Histogram ;
3437import static org .apache .beam .runners .core .metrics .MonitoringInfoEncodings .encodeStringSet ;
3538import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkNotNull ;
3639
4750import org .apache .beam .model .pipeline .v1 .MetricsApi .MonitoringInfo ;
4851import org .apache .beam .runners .core .metrics .MetricUpdates .MetricUpdate ;
4952import org .apache .beam .sdk .metrics .Distribution ;
53+ import org .apache .beam .sdk .metrics .Histogram ;
5054import org .apache .beam .sdk .metrics .Metric ;
5155import org .apache .beam .sdk .metrics .MetricKey ;
5256import org .apache .beam .sdk .metrics .MetricName ;
@@ -240,6 +244,10 @@ public BoundedTrieCell getBoundedTrie(MetricName metricName) {
240244 return boundedTries .tryGet (metricName );
241245 }
242246
247+ public MetricsMap <KV <MetricName , HistogramData .BucketType >, HistogramCell > getHistogram () {
248+ return histograms ;
249+ }
250+
243251 private <UpdateT , CellT extends MetricCell <UpdateT >>
244252 ImmutableList <MetricUpdate <UpdateT >> extractUpdates (MetricsMap <MetricName , CellT > cells ) {
245253 ImmutableList .Builder <MetricUpdate <UpdateT >> updates = ImmutableList .builder ();
@@ -253,6 +261,22 @@ ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT
253261 return updates .build ();
254262 }
255263
264+ // Needs a separate update since its constructor is different
265+ private <UpdateT , CellT extends MetricCell <UpdateT >>
266+ ImmutableList <MetricUpdate <UpdateT >> extractHistogramUpdates (
267+ MetricsMap <KV <MetricName , HistogramData .BucketType >, CellT > cells ) {
268+ ImmutableList .Builder <MetricUpdate <UpdateT >> updates = ImmutableList .builder ();
269+ cells .forEach (
270+ (key , value ) -> {
271+ if (value .getDirty ().beforeCommit ()) {
272+ updates .add (
273+ MetricUpdate .create (
274+ MetricKey .create (stepName , key .getKey ()), value .getCumulative ()));
275+ }
276+ });
277+ return updates .build ();
278+ }
279+
256280 /**
257281 * Return the cumulative values for any metrics that have changed since the last time updates were
258282 * committed.
@@ -263,7 +287,8 @@ public MetricUpdates getUpdates() {
263287 extractUpdates (distributions ),
264288 extractUpdates (gauges ),
265289 extractUpdates (stringSets ),
266- extractUpdates (boundedTries ));
290+ extractUpdates (boundedTries ),
291+ extractHistogramUpdates (histograms ));
267292 }
268293
269294 /** @return The MonitoringInfo metadata from the metric. */
@@ -296,6 +321,20 @@ public MetricUpdates getUpdates() {
296321 return builder ;
297322 }
298323
324+ /**
325+ * @param metricUpdate
326+ * @return The MonitoringInfo generated from the histogram metricUpdate.
327+ */
328+ private @ Nullable MonitoringInfo histogramUpdateToMonitoringInfo (
329+ MetricUpdate <HistogramData > metricUpdate ) {
330+ SimpleMonitoringInfoBuilder builder = histogramToMonitoringMetadata (metricUpdate .getKey ());
331+ if (builder == null ) {
332+ return null ;
333+ }
334+ builder .setInt64HistogramValue (metricUpdate .getUpdate ());
335+ return builder .build ();
336+ }
337+
299338 /** @return The MonitoringInfo metadata from the counter metric. */
300339 private @ Nullable SimpleMonitoringInfoBuilder counterToMonitoringMetadata (MetricKey metricKey ) {
301340 return metricToMonitoringMetadata (
@@ -376,6 +415,14 @@ public MetricUpdates getUpdates() {
376415 MonitoringInfoConstants .Urns .USER_BOUNDED_TRIE );
377416 }
378417
418+ /** @return The MonitoringInfo metadata from the histogram metric. */
419+ private @ Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata (MetricKey metricKey ) {
420+ return metricToMonitoringMetadata (
421+ metricKey ,
422+ MonitoringInfoConstants .TypeUrns .HISTOGRAM_TYPE ,
423+ MonitoringInfoConstants .Urns .USER_HISTOGRAM );
424+ }
425+
379426 /**
380427 * @param metricUpdate
381428 * @return The MonitoringInfo generated from the string set metricUpdate.
@@ -444,6 +491,14 @@ public Iterable<MonitoringInfo> getMonitoringInfos() {
444491 monitoringInfos .add (mi );
445492 }
446493 }
494+
495+ for (MetricUpdate <HistogramData > metricUpdate : metricUpdates .histogramsUpdates ()) {
496+ MonitoringInfo mi = histogramUpdateToMonitoringInfo (metricUpdate );
497+ if (mi != null ) {
498+ monitoringInfos .add (mi );
499+ }
500+ }
501+
447502 return monitoringInfos ;
448503 }
449504
@@ -496,6 +551,16 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
496551 }
497552 }
498553 });
554+ histograms .forEach (
555+ (metricName , histogramCell ) -> {
556+ if (histogramCell .getDirty ().beforeCommit ()) {
557+ String shortId =
558+ getShortId (metricName .getKey (), this ::histogramToMonitoringMetadata , shortIds );
559+ if (shortId != null ) {
560+ builder .put (shortId , encodeInt64Histogram (histogramCell .getCumulative ()));
561+ }
562+ }
563+ });
499564 return builder .build ();
500565 }
501566
@@ -532,6 +597,10 @@ public void commitUpdates() {
532597 gauges .forEachValue (gauge -> gauge .getDirty ().afterCommit ());
533598 stringSets .forEachValue (sSets -> sSets .getDirty ().afterCommit ());
534599 boundedTries .forEachValue (bTrie -> bTrie .getDirty ().afterCommit ());
600+ histograms .forEachValue (
601+ histogram -> {
602+ histogram .getDirty ().afterCommit ();
603+ });
535604 }
536605
537606 private <UserT extends Metric , UpdateT , CellT extends MetricCell <UpdateT >>
@@ -545,6 +614,18 @@ ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(MetricsMap<MetricName, C
545614 return updates .build ();
546615 }
547616
617+ private <UserT extends Metric , UpdateT , CellT extends MetricCell <UpdateT >>
618+ ImmutableList <MetricUpdate <UpdateT >> extractHistogramCumulatives (
619+ MetricsMap <KV <MetricName , HistogramData .BucketType >, CellT > cells ) {
620+ ImmutableList .Builder <MetricUpdate <UpdateT >> updates = ImmutableList .builder ();
621+ cells .forEach (
622+ (key , value ) -> {
623+ UpdateT update = checkNotNull (value .getCumulative ());
624+ updates .add (MetricUpdate .create (MetricKey .create (stepName , key .getKey ()), update ));
625+ });
626+ return updates .build ();
627+ }
628+
548629 /**
549630 * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
550631 * container.
@@ -555,7 +636,8 @@ public MetricUpdates getCumulative() {
555636 extractCumulatives (distributions ),
556637 extractCumulatives (gauges ),
557638 extractCumulatives (stringSets ),
558- extractCumulatives (boundedTries ));
639+ extractCumulatives (boundedTries ),
640+ extractHistogramCumulatives (histograms ));
559641 }
560642
561643 /** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
@@ -577,7 +659,6 @@ private void updateForSumInt64Type(MonitoringInfo monitoringInfo) {
577659 private void updateForDistributionInt64Type (MonitoringInfo monitoringInfo ) {
578660 MetricName metricName = MonitoringInfoMetricName .of (monitoringInfo );
579661 Distribution distribution = getDistribution (metricName );
580-
581662 DistributionData data = decodeInt64Distribution (monitoringInfo .getPayload ());
582663 distribution .update (data .sum (), data .count (), data .min (), data .max ());
583664 }
@@ -600,6 +681,14 @@ private void updateForBoundedTrieType(MonitoringInfo monitoringInfo) {
600681 boundedTrie .update (decodeBoundedTrie (monitoringInfo .getPayload ()));
601682 }
602683
684+ private void updateForHistogramInt64 (MonitoringInfo monitoringInfo ) {
685+ MetricName metricName = MonitoringInfoMetricName .of (monitoringInfo );
686+ HistogramData .BucketType buckets = HistogramData .ExponentialBuckets .of (1 , 17 );
687+ Histogram histogram = getHistogram (metricName , buckets );
688+ HistogramData data = decodeInt64Histogram (monitoringInfo .getPayload ());
689+ histogram .update (data );
690+ }
691+
603692 /** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */
604693 public void update (Iterable <MonitoringInfo > monitoringInfos ) {
605694 for (MonitoringInfo monitoringInfo : monitoringInfos ) {
@@ -628,6 +717,9 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
628717 updateForBoundedTrieType (monitoringInfo );
629718 break ;
630719
720+ case HISTOGRAM_TYPE :
721+ updateForHistogramInt64 (monitoringInfo ); // use type, and not urn info
722+ break ;
631723 default :
632724 LOG .warn ("Unsupported metric type {}" , monitoringInfo .getType ());
633725 }
@@ -677,14 +769,16 @@ public boolean equals(@Nullable Object object) {
677769 && Objects .equals (distributions , metricsContainerImpl .distributions )
678770 && Objects .equals (gauges , metricsContainerImpl .gauges )
679771 && Objects .equals (stringSets , metricsContainerImpl .stringSets )
680- && Objects .equals (boundedTries , metricsContainerImpl .boundedTries );
772+ && Objects .equals (boundedTries , metricsContainerImpl .boundedTries )
773+ && Objects .equals (histograms , metricsContainerImpl .histograms );
681774 }
682775 return false ;
683776 }
684777
685778 @ Override
686779 public int hashCode () {
687- return Objects .hash (stepName , counters , distributions , gauges , stringSets , boundedTries );
780+ return Objects .hash (
781+ stepName , counters , distributions , gauges , stringSets , boundedTries , histograms );
688782 }
689783
690784 /**
@@ -816,6 +910,7 @@ public static MetricsContainerImpl deltaContainer(
816910 deltaValueCell .incTopBucketCount (
817911 currValue .getTopBucketCount () - prevValue .getTopBucketCount ());
818912 }
913+
819914 for (Map .Entry <MetricName , StringSetCell > cell : curr .stringSets .entries ()) {
820915 // Simply take the most recent value for stringSets, no need to count deltas.
821916 deltaContainer .stringSets .get (cell .getKey ()).update (cell .getValue ().getCumulative ());
0 commit comments