diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 417b1ba34e191..04a5216b37202 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -209,6 +209,11 @@ type Distributor struct { // are consumed. numMetadataPartitions int + // inflightBytes keeps track of the total number of bytes for all in-flight + // push requests. + inflightBytes atomic.Uint64 + inflightBytesGauge prometheus.Gauge + // kafka metrics kafkaAppends *prometheus.CounterVec kafkaWriteBytesTotal prometheus.Counter @@ -391,6 +396,12 @@ func New( partitionRing: partitionRing, ingestLimits: ingestLimits, numMetadataPartitions: numMetadataPartitions, + inflightBytes: *atomic.NewUint64(0), + inflightBytesGauge: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Namespace: constants.Loki, + Name: "distributor_inflight_bytes", + Help: "The number of bytes currently inflight.", + }), } if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { @@ -539,6 +550,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // Push a set of streams. // The returned error is the last one seen. func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRequest, streamResolver *requestScopedStreamResolver, format string) (*logproto.PushResponse, error) { + requestSizeBytes := uint64(req.Size()) + d.inflightBytes.Add(requestSizeBytes) + d.inflightBytesGauge.Add(float64(requestSizeBytes)) + defer func() { + d.inflightBytes.Sub(requestSizeBytes) + d.inflightBytesGauge.Sub(float64(requestSizeBytes)) + }() + tenantID, err := tenant.TenantID(ctx) if err != nil { return nil, err