Skip to content

Commit 7377e0e

Browse files
committed
distributor: add new metrics (#4367)
1 parent 76a7e73 commit 7377e0e

File tree

3 files changed

+44
-9
lines changed

3 files changed

+44
-9
lines changed

pkg/distributor/distributor.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,11 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
352352
}
353353
sort.Sort(phlaremodel.Labels(req.Labels))
354354

355-
d.calculateRequestSize(req)
355+
req.TotalProfiles = 1
356+
req.TotalBytesUncompressed = calculateRequestSize(req)
357+
req.TotalBytesUncompressedProcessed = req.TotalBytesUncompressed
358+
359+
d.metrics.receivedDecompressedBytesTotal.WithLabelValues(tenantID).Observe(float64(req.TotalBytesUncompressed))
356360

357361
if err := d.checkIngestLimit(req); err != nil {
358362
level.Debug(logger).Log("msg", "rejecting push request due to global ingest limit", "tenant", tenantID)
@@ -391,6 +395,9 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
391395
}
392396

393397
profLanguage := d.GetProfileLanguage(req)
398+
defer func() { // defer to allow re-calculate the size of the profile after normalization
399+
d.metrics.processedDecompressedBytes.WithLabelValues(tenantID).Observe(float64(req.TotalBytesUncompressedProcessed))
400+
}()
394401

395402
usagestats.NewCounter(fmt.Sprintf("distributor_profile_type_%s_received", profName)).Inc(1)
396403
d.profileReceivedStats.Inc(1, profLanguage)
@@ -428,6 +435,8 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
428435
req.Profile.Normalize()
429436
sp.Finish()
430437

438+
req.TotalBytesUncompressedProcessed = calculateRequestSize(req)
439+
431440
if len(req.Profile.Sample) == 0 {
432441
// TODO(kolesnikovae):
433442
// Normalization may cause all profiles and series to be empty.
@@ -847,14 +856,16 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.ProfileSe
847856
return nil
848857
}
849858

850-
func (d *Distributor) calculateRequestSize(req *distributormodel.ProfileSeries) {
859+
func calculateRequestSize(req *distributormodel.ProfileSeries) int64 {
851860
// include the labels in the size calculation
861+
bs := int64(0)
852862
for _, lbs := range req.Labels {
853-
req.TotalBytesUncompressed += int64(len(lbs.Name))
854-
req.TotalBytesUncompressed += int64(len(lbs.Value))
863+
bs += int64(len(lbs.Name))
864+
bs += int64(len(lbs.Value))
855865
}
856-
req.TotalProfiles += 1
857-
req.TotalBytesUncompressed += int64(req.Profile.SizeVT())
866+
867+
bs += int64(req.Profile.SizeVT())
868+
return bs
858869
}
859870

860871
func (d *Distributor) checkIngestLimit(req *distributormodel.ProfileSeries) error {

pkg/distributor/metrics.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type metrics struct {
1717
receivedSamplesBytes *prometheus.HistogramVec
1818
receivedSymbolsBytes *prometheus.HistogramVec
1919
replicationFactor prometheus.Gauge
20+
21+
receivedDecompressedBytesTotal *prometheus.HistogramVec
22+
processedDecompressedBytes *prometheus.HistogramVec
2023
}
2124

2225
func newMetrics(reg prometheus.Registerer) *metrics {
@@ -39,7 +42,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {
3942
prometheus.HistogramOpts{
4043
Namespace: "pyroscope",
4144
Name: "distributor_received_decompressed_bytes",
42-
Help: "The number of decompressed bytes per profiles received by the distributor.",
45+
Help: "The number of decompressed bytes per profiles received by the distributor after limits/sampling checks.",
4346
Buckets: prometheus.ExponentialBucketsRange(minBytes, maxBytes, bucketsCount),
4447
},
4548
[]string{"type", "tenant"},
@@ -71,6 +74,24 @@ func newMetrics(reg prometheus.Registerer) *metrics {
7174
},
7275
[]string{"type", "tenant"},
7376
),
77+
receivedDecompressedBytesTotal: prometheus.NewHistogramVec(
78+
prometheus.HistogramOpts{
79+
Namespace: "pyroscope",
80+
Name: "distributor_received_decompressed_bytes_total",
81+
Help: "The total number of decompressed bytes per profile received by the distributor before limits/sampling checks.",
82+
Buckets: prometheus.ExponentialBucketsRange(minBytes, maxBytes, bucketsCount),
83+
},
84+
[]string{"tenant"},
85+
),
86+
processedDecompressedBytes: prometheus.NewHistogramVec(
87+
prometheus.HistogramOpts{
88+
Namespace: "pyroscope",
89+
Name: "distributor_processed_decompressed_bytes",
90+
Help: "The number of decompressed bytes per profile received (processed) by the distributor after limits/sampling checks and normalization.",
91+
Buckets: prometheus.ExponentialBucketsRange(minBytes, maxBytes, bucketsCount),
92+
},
93+
[]string{"tenant"},
94+
),
7495
}
7596
if reg != nil {
7697
reg.MustRegister(
@@ -80,6 +101,8 @@ func newMetrics(reg prometheus.Registerer) *metrics {
80101
m.receivedSamplesBytes,
81102
m.receivedSymbolsBytes,
82103
m.replicationFactor,
104+
m.receivedDecompressedBytesTotal,
105+
m.processedDecompressedBytes,
83106
)
84107
}
85108
return m

pkg/distributor/model/push.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ type ProfileSeries struct {
3535

3636
Annotations []*v1.ProfileAnnotation
3737

38-
// always 1
38+
// always 1 todo delete
3939
TotalProfiles int64
4040
TotalBytesUncompressed int64
4141

42+
TotalBytesUncompressedProcessed int64 // after normalization and other size-reducing manipulation
43+
4244
DiscardedProfilesRelabeling int64
4345
DiscardedBytesRelabeling int64
4446
}
@@ -80,7 +82,6 @@ func getProfileLanguageFromSpy(spyName string) string {
8082
func (req *ProfileSeries) Clone() *ProfileSeries {
8183
c := &ProfileSeries{
8284
TenantID: req.TenantID,
83-
TotalProfiles: req.TotalProfiles,
8485
TotalBytesUncompressed: req.TotalBytesUncompressed,
8586
Labels: phlaremodel.Labels(req.Labels).Clone(),
8687
Profile: &pprof.Profile{Profile: req.Profile.CloneVT()},

0 commit comments

Comments
 (0)