Skip to content

Commit 457616e

Browse files
authored
measure pre-aggregated metrics write latency (#104)
2 parents 6cf9daa + 8b4e854 commit 457616e

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

pkg/receive/handler.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ const (
6969
// Labels for metrics.
7070
labelSuccess = "success"
7171
labelError = "error"
72+
labelPreAgg = "__rollup__"
7273
)
7374

7475
type ReplicationProtocol string
@@ -226,9 +227,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
226227
Namespace: "thanos",
227228
Subsystem: "receive",
228229
Name: "write_e2e_latency_seconds",
229-
Help: "The end-to-end latency of the oldest sample in write requests.",
230+
Help: "The end-to-end latency of write requests.",
230231
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
231-
}, []string{"code", "tenant"},
232+
}, []string{"code", "tenant", "rollup"},
232233
),
233234
}
234235

@@ -484,16 +485,23 @@ func newWriteResponse(seriesIDs []int, err error, er endpointReplica) writeRespo
484485
}
485486
}
486487

487-
func secondsSinceOldestSample(toMS int64, ts prompb.TimeSeries) float64 {
488+
func secondsSinceFirstSample(toMS int64, ts prompb.TimeSeries) float64 {
488489
fromMS := toMS
489-
for _, s := range ts.Samples {
490-
if s.Timestamp < fromMS {
491-
fromMS = s.Timestamp
492-
}
490+
if len(ts.Samples) > 0 {
491+
fromMS = ts.Samples[0].Timestamp
493492
}
494493
return float64(toMS-fromMS) / 1000
495494
}
496495

496+
func isPreAgged(ts prompb.TimeSeries) bool {
497+
for _, l := range ts.Labels {
498+
if l.Name == labelPreAgg && l.Value == "true" {
499+
return true
500+
}
501+
}
502+
return false
503+
}
504+
497505
func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
498506
var err error
499507
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
@@ -640,8 +648,8 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
640648
}
641649
nowMS := time.Now().UnixNano() / int64(time.Millisecond)
642650
for _, ts := range wreq.Timeseries {
643-
if lat := secondsSinceOldestSample(nowMS, ts); lat > 0 {
644-
h.writeE2eLatency.WithLabelValues(strconv.Itoa(responseStatusCode), tenantHTTP).Observe(lat)
651+
if lat := secondsSinceFirstSample(nowMS, ts); lat > 0 {
652+
h.writeE2eLatency.WithLabelValues(strconv.Itoa(responseStatusCode), tenantHTTP, strconv.FormatBool(isPreAgged(ts))).Observe(lat)
645653
}
646654
}
647655
}

0 commit comments

Comments
 (0)