diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 8fe9a7b095..27a55b707c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -352,7 +352,11 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof } sort.Sort(phlaremodel.Labels(req.Labels)) - d.calculateRequestSize(req) + req.TotalProfiles = 1 + req.TotalBytesUncompressed = calculateRequestSize(req) + req.TotalBytesUncompressedProcessed = req.TotalBytesUncompressed + + d.metrics.receivedDecompressedBytesTotal.WithLabelValues(tenantID).Observe(float64(req.TotalBytesUncompressed)) if err := d.checkIngestLimit(req); err != nil { level.Debug(logger).Log("msg", "rejecting push request due to global ingest limit", "tenant", tenantID) @@ -391,6 +395,9 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof } profLanguage := d.GetProfileLanguage(req) + defer func() { // defer to allow re-calculate the size of the profile after normalization + d.metrics.processedDecompressedBytes.WithLabelValues(tenantID).Observe(float64(req.TotalBytesUncompressedProcessed)) + }() usagestats.NewCounter(fmt.Sprintf("distributor_profile_type_%s_received", profName)).Inc(1) d.profileReceivedStats.Inc(1, profLanguage) @@ -428,6 +435,8 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof req.Profile.Normalize() sp.Finish() + req.TotalBytesUncompressedProcessed = calculateRequestSize(req) + if len(req.Profile.Sample) == 0 { // TODO(kolesnikovae): // Normalization may cause all profiles and series to be empty. @@ -847,14 +856,16 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.ProfileSe return nil } -func (d *Distributor) calculateRequestSize(req *distributormodel.ProfileSeries) { +func calculateRequestSize(req *distributormodel.ProfileSeries) int64 { // include the labels in the size calculation + bs := int64(0) for _, lbs := range req.Labels { - req.TotalBytesUncompressed += int64(len(lbs.Name)) - req.TotalBytesUncompressed += int64(len(lbs.Value)) + bs += int64(len(lbs.Name)) + bs += int64(len(lbs.Value)) } - req.TotalProfiles += 1 - req.TotalBytesUncompressed += int64(req.Profile.SizeVT()) + + bs += int64(req.Profile.SizeVT()) + return bs } func (d *Distributor) checkIngestLimit(req *distributormodel.ProfileSeries) error { diff --git a/pkg/distributor/metrics.go b/pkg/distributor/metrics.go index 9f3b2b03f6..79ec9c846a 100644 --- a/pkg/distributor/metrics.go +++ b/pkg/distributor/metrics.go @@ -17,6 +17,9 @@ type metrics struct { receivedSamplesBytes *prometheus.HistogramVec receivedSymbolsBytes *prometheus.HistogramVec replicationFactor prometheus.Gauge + + receivedDecompressedBytesTotal *prometheus.HistogramVec + processedDecompressedBytes *prometheus.HistogramVec } func newMetrics(reg prometheus.Registerer) *metrics { @@ -39,7 +42,7 @@ func newMetrics(reg prometheus.Registerer) *metrics { prometheus.HistogramOpts{ Namespace: "pyroscope", Name: "distributor_received_decompressed_bytes", - Help: "The number of decompressed bytes per profiles received by the distributor.", + Help: "The number of decompressed bytes per profiles received by the distributor after limits/sampling checks.", Buckets: prometheus.ExponentialBucketsRange(minBytes, maxBytes, bucketsCount), }, []string{"type", "tenant"}, @@ -71,6 +74,24 @@ func newMetrics(reg prometheus.Registerer) *metrics { }, []string{"type", "tenant"}, ), + receivedDecompressedBytesTotal: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pyroscope", + Name: "distributor_received_decompressed_bytes_total", + Help: "The total number of decompressed bytes per profile received by the distributor before limits/sampling checks.", + Buckets: prometheus.ExponentialBucketsRange(minBytes, maxBytes, bucketsCount), + }, + []string{"tenant"}, + ), + processedDecompressedBytes: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pyroscope", + Name: "distributor_processed_decompressed_bytes", + Help: "The number of decompressed bytes per profile received (processed) by the distributor after limits/sampling checks and normalization.", + Buckets: prometheus.ExponentialBucketsRange(minBytes, maxBytes, bucketsCount), + }, + []string{"tenant"}, + ), } if reg != nil { reg.MustRegister( @@ -80,6 +101,8 @@ func newMetrics(reg prometheus.Registerer) *metrics { m.receivedSamplesBytes, m.receivedSymbolsBytes, m.replicationFactor, + m.receivedDecompressedBytesTotal, + m.processedDecompressedBytes, ) } return m diff --git a/pkg/distributor/model/push.go b/pkg/distributor/model/push.go index 7eca85c809..1b1a63efae 100644 --- a/pkg/distributor/model/push.go +++ b/pkg/distributor/model/push.go @@ -35,10 +35,12 @@ type ProfileSeries struct { Annotations []*v1.ProfileAnnotation - // always 1 + // always 1 todo delete TotalProfiles int64 TotalBytesUncompressed int64 + TotalBytesUncompressedProcessed int64 // after normalization and other size-reducing manipulation + DiscardedProfilesRelabeling int64 DiscardedBytesRelabeling int64 } @@ -80,7 +82,6 @@ func getProfileLanguageFromSpy(spyName string) string { func (req *ProfileSeries) Clone() *ProfileSeries { c := &ProfileSeries{ TenantID: req.TenantID, - TotalProfiles: req.TotalProfiles, TotalBytesUncompressed: req.TotalBytesUncompressed, Labels: phlaremodel.Labels(req.Labels).Clone(), Profile: &pprof.Profile{Profile: req.Profile.CloneVT()},