diff --git a/ddtrace/internal/datastreams/processor.py b/ddtrace/internal/datastreams/processor.py index 511d3543e0f..00f64126539 100644 --- a/ddtrace/internal/datastreams/processor.py +++ b/ddtrace/internal/datastreams/processor.py @@ -70,28 +70,6 @@ def gzip_compress(payload): ] -class SumCount: - """Helper class to keep track of sum and count of values.""" - - __slots__ = ("_sum", "_count") - - def __init__(self) -> None: - self._sum: float = 0.0 - self._count: int = 0 - - def add(self, value: float) -> None: - self._sum += value - self._count += 1 - - @property - def sum(self) -> float: - return self._sum - - @property - def count(self) -> int: - return self._count - - class PathwayStats(object): """Aggregated pathway statistics.""" @@ -100,7 +78,7 @@ class PathwayStats(object): def __init__(self): self.full_pathway_latency = DDSketch() self.edge_latency = DDSketch() - self.payload_size = SumCount() + self.payload_size = DDSketch() PartitionKey = NamedTuple("PartitionKey", [("topic", str), ("partition", int)]) @@ -229,6 +207,7 @@ def _serialize_buckets(self): "ParentHash": parent_hash, "PathwayLatency": stat_aggr.full_pathway_latency.to_proto(), "EdgeLatency": stat_aggr.edge_latency.to_proto(), + "PayloadSize": stat_aggr.payload_size.to_proto(), } bucket_aggr_stats.append(serialized_bucket) for consumer_key, offset in bucket.latest_commit_offsets.items(): @@ -294,7 +273,6 @@ def _flush_stats(self, payload: bytes) -> None: def periodic(self): # type: () -> None - with self._lock: serialized_stats = self._serialize_buckets() diff --git a/releasenotes/notes/fix-dsm-no-payload-size-d5a8e54464dbd2f8.yaml b/releasenotes/notes/fix-dsm-no-payload-size-d5a8e54464dbd2f8.yaml new file mode 100644 index 00000000000..b8c874c198d --- /dev/null +++ b/releasenotes/notes/fix-dsm-no-payload-size-d5a8e54464dbd2f8.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + data_streams: This fix resolves an issue where payload size statistics were not being sent to + the backend for Data Streams Monitoring (DSM).