@@ -209,6 +209,11 @@ type Distributor struct {
209209 // are consumed.
210210 numMetadataPartitions int
211211
212+ // inflightBytes keeps track of the total number of bytes for all in-flight
213+ // push requests.
214+ inflightBytes atomic.Uint64
215+ inflightBytesGauge prometheus.Gauge
216+
212217 // kafka metrics
213218 kafkaAppends * prometheus.CounterVec
214219 kafkaWriteBytesTotal prometheus.Counter
@@ -391,6 +396,12 @@ func New(
391396 partitionRing : partitionRing ,
392397 ingestLimits : ingestLimits ,
393398 numMetadataPartitions : numMetadataPartitions ,
399+ inflightBytes : * atomic .NewUint64 (0 ),
400+ inflightBytesGauge : promauto .With (registerer ).NewGauge (prometheus.GaugeOpts {
401+ Namespace : constants .Loki ,
402+ Name : "distributor_inflight_bytes" ,
403+ Help : "The number of bytes currently inflight." ,
404+ }),
394405 }
395406
396407 if overrides .IngestionRateStrategy () == validation .GlobalIngestionRateStrategy {
@@ -539,6 +550,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
539550// Push a set of streams.
540551// The returned error is the last one seen.
541552func (d * Distributor ) PushWithResolver (ctx context.Context , req * logproto.PushRequest , streamResolver * requestScopedStreamResolver , format string ) (* logproto.PushResponse , error ) {
553+ requestSizeBytes := uint64 (req .Size ())
554+ d .inflightBytes .Add (requestSizeBytes )
555+ d .inflightBytesGauge .Add (float64 (requestSizeBytes ))
556+ defer func () {
557+ d .inflightBytes .Sub (requestSizeBytes )
558+ d .inflightBytesGauge .Sub (float64 (requestSizeBytes ))
559+ }()
560+
542561 tenantID , err := tenant .TenantID (ctx )
543562 if err != nil {
544563 return nil , err
0 commit comments