diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index d5951c23c10e..fcce35394b91 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -457,7 +457,7 @@ message MonitoringInfo { SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }]; SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }]; SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }]; - // Label which if has a "true" value indicates that the metric is intended + // Label which if has a "true" value indicates that the metric is intended // to be aggregated per-worker. PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }]; } @@ -517,6 +517,10 @@ message MonitoringInfoTypeUrns { // - sum: beam:coder:varint:v1 // - min: beam:coder:varint:v1 // - max: beam:coder:varint:v1 + // + // Note that when count is zero, the SDK may not send sum, min, and max in + // the response. If those fields are included in the payload, runners should + // omit them. DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:distribution_int64:v1"]; @@ -531,6 +535,10 @@ message MonitoringInfoTypeUrns { // - sum: beam:coder:double:v1 // - min: beam:coder:double:v1 // - max: beam:coder:double:v1 + // + // Note that when count is zero, the SDK may not send sum, min, and max in + // the response. If those fields are included in the payload, runners should + // omit them. DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:distribution_double:v1"]; diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index bbbdfd1eba4f..12d935815461 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -326,6 +326,12 @@ func (m *distributionInt64) accumulate(pyld []byte) error { if dist.Count, err = coder.DecodeVarInt(buf); err != nil { return err } + if dist.Count == 0 { + // When there is no elements reported, the payload may contain the values + // for count, sum, min and max, or it may contain only one 0x00 byte for + // count. No matter what, we will skip aggregation in this case. + return nil + } if dist.Sum, err = coder.DecodeVarInt(buf); err != nil { return err }