Skip to content

Commit d28362e

Browse files
JSticklergrobinson-grafana
authored andcommitted
docs: Edit deployment modes SSD (#20058)
1 parent 448cc74 commit d28362e

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

pkg/distributor/distributor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ type Distributor struct {
209209
// are consumed.
210210
numMetadataPartitions int
211211

212+
// inflightBytes keeps track of the total number of bytes for all in-flight
213+
// push requests.
214+
inflightBytes atomic.Uint64
215+
inflightBytesGauge prometheus.Gauge
216+
212217
// kafka metrics
213218
kafkaAppends *prometheus.CounterVec
214219
kafkaWriteBytesTotal prometheus.Counter
@@ -391,6 +396,12 @@ func New(
391396
partitionRing: partitionRing,
392397
ingestLimits: ingestLimits,
393398
numMetadataPartitions: numMetadataPartitions,
399+
inflightBytes: *atomic.NewUint64(0),
400+
inflightBytesGauge: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
401+
Namespace: constants.Loki,
402+
Name: "distributor_inflight_bytes",
403+
Help: "The number of bytes currently inflight.",
404+
}),
394405
}
395406

396407
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
@@ -539,6 +550,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
539550
// Push a set of streams.
540551
// The returned error is the last one seen.
541552
func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRequest, streamResolver *requestScopedStreamResolver, format string) (*logproto.PushResponse, error) {
553+
requestSizeBytes := uint64(req.Size())
554+
d.inflightBytes.Add(requestSizeBytes)
555+
d.inflightBytesGauge.Add(float64(requestSizeBytes))
556+
defer func() {
557+
d.inflightBytes.Sub(requestSizeBytes)
558+
d.inflightBytesGauge.Sub(float64(requestSizeBytes))
559+
}()
560+
542561
tenantID, err := tenant.TenantID(ctx)
543562
if err != nil {
544563
return nil, err

0 commit comments

Comments
 (0)