Skip to content

Commit f889e0d

Browse files
committed
Fix a distribution metric problem when count is zero.
1 parent fe07fe7 commit f889e0d

File tree

2 files changed

+15
-1
lines changed
  • model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1
  • sdks/go/pkg/beam/runners/prism/internal/jobservices

2 files changed

+15
-1
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ message MonitoringInfo {
457457
SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
458458
SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
459459
SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
460-
// Label which if has a "true" value indicates that the metric is intended
460+
// Label which if has a "true" value indicates that the metric is intended
461461
// to be aggregated per-worker.
462462
PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
463463
}
@@ -517,6 +517,10 @@ message MonitoringInfoTypeUrns {
517517
// - sum: beam:coder:varint:v1
518518
// - min: beam:coder:varint:v1
519519
// - max: beam:coder:varint:v1
520+
//
521+
// Note that when count is zero, the SDK may or may not send sum, min, and
522+
// max in the response. If those fields are included in the payload, runners
523+
// should ignore them.
520524
DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
521525
"beam:metrics:distribution_int64:v1"];
522526

@@ -531,6 +535,10 @@ message MonitoringInfoTypeUrns {
531535
// - sum: beam:coder:double:v1
532536
// - min: beam:coder:double:v1
533537
// - max: beam:coder:double:v1
538+
//
539+
// Note that when count is zero, the SDK may or may not send sum, min, and
540+
// max in the response. If those fields are included in the payload, runners
541+
// should ignore them.
534542
DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
535543
"beam:metrics:distribution_double:v1"];
536544

sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,12 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
326326
if dist.Count, err = coder.DecodeVarInt(buf); err != nil {
327327
return err
328328
}
329+
if dist.Count == 0 {
330+
// When there is no elements reported, the payload may contain the values
331+
// for count, sum, min and max, or it may only contain one 0x00 byte for
332+
// count. No matter what, we will skip aggregation in this case.
333+
return nil
334+
}
329335
if dist.Sum, err = coder.DecodeVarInt(buf); err != nil {
330336
return err
331337
}

0 commit comments

Comments
 (0)