Skip to content

Commit a56a88b

Browse files
authored
Add unwrapped aggregation rate_counter() (grafana#6412)
`rate_counter()` takes an unwrapped range as input, but unlike `rate()` it treats the values extracted from the log lines as "counter metric" like in Prometheus' `rate()` function. This is a replacement for the reverted change of grafana#5013 Signed-off-by: Christian Haudum <[email protected]>
1 parent 35cb40a commit a56a88b

File tree

10 files changed

+455
-507
lines changed

10 files changed

+455
-507
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields
1010
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target
1111
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
12-
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
1312
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
1413
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
1514
* [5971](https://github.com/grafana/loki/pull/5971) **kavirajk**: Record statistics about metadata queries such as labels and series queries in `metrics.go` as well
@@ -87,6 +86,7 @@
8786
#### Loki
8887

8988
##### Enhancements
89+
* [6361](https://github.com/grafana/loki/pull/6361) **chaudum**: Add new unwrapped range aggregation `rate_counter()` to LogQL
9090
* [6317](https://github.com/grafana/loki/pull/6317/files) **dannykoping**: General: add cache usage statistics
9191

9292
##### Fixes

docs/sources/logql/metric_queries.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ We currently support the functions:
6969
Supported function for operating over unwrapped ranges are:
7070
7171
- `rate(unwrapped-range)`: calculates per second rate of the sum of all values in the specified interval.
72+
- `rate_counter(unwrapped-range)`: calculates per second rate of the values in the specified interval and treating them as "counter metric"
7273
- `sum_over_time(unwrapped-range)`: the sum of all values in the specified interval.
7374
- `avg_over_time(unwrapped-range)`: the average value of all points in the specified interval.
7475
- `max_over_time(unwrapped-range)`: the maximum value of all points in the specified interval.

docs/sources/upgrading/_index.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,15 @@ The output is incredibly verbose as it shows the entire internal config struct u
3131

3232
## Main / Unreleased
3333

34-
3534
### Loki
3635

36+
#### Implementation of unwrapped `rate` aggregation changed
37+
38+
The implementation of the `rate()` aggregation function changed back to the previous implemention prior to [#5013](https://github.com/grafana/loki/pulls/5013).
39+
This means that the rate per second is calculated based on the sum of the extracted values, instead of the average increase over time.
40+
41+
If you want the extracted values to be treated as [Counter](https://prometheus.io/docs/concepts/metric_types/#counter) metric, you should use the new `rate_counter()` aggregation function, which calculates the per-second average rate of increase of the vector.
42+
3743
#### Default value for `azure.container-name` changed
3844

3945
This value now defaults to `loki`, it was previously set to `cortex`. If you are relying on this container name for your chunks or ruler storage, you will have to manually specify `-azure.container-name=cortex` or `-ruler.storage.azure.container-name=cortex` respectively.

pkg/logql/engine_test.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
5353
time.Unix(60, 0),
5454
logproto.FORWARD,
5555
10,
56+
// create a stream {app="foo"} with 300 samples starting at 46s and ending at 345s with a constant value of 1
5657
[][]logproto.Series{
5758
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
5859
{newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)},
5960
},
6061
[]SelectSampleParams{
6162
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
6263
},
64+
// there are 15 samples (from 47 to 61) matched from the generated series
6365
// SUM(n=47, 61, 1) = 15
6466
// 15 / 30 = 0.5
6567
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.5}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
@@ -69,17 +71,53 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
6971
time.Unix(60, 0),
7072
logproto.FORWARD,
7173
10,
74+
// create a stream {app="foo"} with 300 samples starting at 46s and ending at 345s with an increasing value by 1
7275
[][]logproto.Series{
7376
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
7477
{newSeries(testSize, offset(46, incValue(1)), `{app="foo"}`)},
7578
},
7679
[]SelectSampleParams{
7780
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
7881
},
79-
// SUM(n=47, 61, n) = 810
82+
// there are 15 samples (from 47 to 61) matched from the generated series
83+
// SUM(n=47, 61, n) = (47+48+...+61) = 810
8084
// 810 / 30 = 27
8185
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 27}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
8286
},
87+
{
88+
`rate_counter({app="foo"} | unwrap foo [30s])`,
89+
time.Unix(60, 0),
90+
logproto.FORWARD,
91+
10,
92+
// create a stream {app="foo"} with 300 samples starting at 46s and ending at 345s with a constant value of 1
93+
[][]logproto.Series{
94+
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
95+
{newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)},
96+
},
97+
[]SelectSampleParams{
98+
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate_counter({app="foo"} | unwrap foo[30s])`}},
99+
},
100+
// there are 15 samples (from 47 to 61) matched from the generated series
101+
// (1 - 1) / 30 = 0
102+
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
103+
},
104+
{
105+
`rate_counter({app="foo"} | unwrap foo [30s])`,
106+
time.Unix(60, 0),
107+
logproto.FORWARD,
108+
10,
109+
// create a stream {app="foo"} with 300 samples starting at 46s and ending at 345s with an increasing value by 1
110+
[][]logproto.Series{
111+
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
112+
{newSeries(testSize, offset(46, incValue(1)), `{app="foo"}`)},
113+
},
114+
[]SelectSampleParams{
115+
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate_counter({app="foo"} | unwrap foo[30s])`}},
116+
},
117+
// there are 15 samples (from 47 to 61) matched from the generated series
118+
// (61 - 47) / 30 = 0.4666
119+
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.46666766666666665}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
120+
},
83121
} {
84122
test := test
85123
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {

pkg/logql/range_vector.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ func aggregator(r *syntax.RangeAggregationExpr) (RangeVectorAggregator, error) {
189189
switch r.Operation {
190190
case syntax.OpRangeTypeRate:
191191
return rateLogs(r.Left.Interval, r.Left.Unwrap != nil), nil
192+
case syntax.OpRangeTypeRateCounter:
193+
return rateCounter(r.Left.Interval), nil
192194
case syntax.OpRangeTypeCount:
193195
return countOverTime, nil
194196
case syntax.OpRangeTypeBytesRate:
@@ -233,6 +235,92 @@ func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.
233235
}
234236
}
235237

238+
// rateCounter calculates the per-second rate of values extracted from log lines
239+
// and treat them like a "counter" metric.
240+
func rateCounter(selRange time.Duration) func(samples []promql.Point) float64 {
241+
return func(samples []promql.Point) float64 {
242+
return extrapolatedRate(samples, selRange, true, true)
243+
}
244+
}
245+
246+
// extrapolatedRate function is taken from prometheus code promql/functions.go:59
247+
// extrapolatedRate is a utility function for rate/increase/delta.
248+
// It calculates the rate (allowing for counter resets if isCounter is true),
249+
// extrapolates if the first/last sample is close to the boundary, and returns
250+
// the result as either per-second (if isRate is true) or overall.
251+
func extrapolatedRate(samples []promql.Point, selRange time.Duration, isCounter, isRate bool) float64 {
252+
// No sense in trying to compute a rate without at least two points. Drop
253+
// this Vector element.
254+
if len(samples) < 2 {
255+
return 0
256+
}
257+
var (
258+
rangeStart = samples[0].T - durationMilliseconds(selRange)
259+
rangeEnd = samples[len(samples)-1].T
260+
)
261+
262+
resultValue := samples[len(samples)-1].V - samples[0].V
263+
if isCounter {
264+
var lastValue float64
265+
for _, sample := range samples {
266+
if sample.V < lastValue {
267+
resultValue += lastValue
268+
}
269+
lastValue = sample.V
270+
}
271+
}
272+
273+
// Duration between first/last samples and boundary of range.
274+
durationToStart := float64(samples[0].T-rangeStart) / 1000
275+
durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000
276+
277+
sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000
278+
averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1)
279+
280+
if isCounter && resultValue > 0 && samples[0].V >= 0 {
281+
// Counters cannot be negative. If we have any slope at
282+
// all (i.e. resultValue went up), we can extrapolate
283+
// the zero point of the counter. If the duration to the
284+
// zero point is shorter than the durationToStart, we
285+
// take the zero point as the start of the series,
286+
// thereby avoiding extrapolation to negative counter
287+
// values.
288+
durationToZero := sampledInterval * (samples[0].V / resultValue)
289+
if durationToZero < durationToStart {
290+
durationToStart = durationToZero
291+
}
292+
}
293+
294+
// If the first/last samples are close to the boundaries of the range,
295+
// extrapolate the result. This is as we expect that another sample
296+
// will exist given the spacing between samples we've seen thus far,
297+
// with an allowance for noise.
298+
extrapolationThreshold := averageDurationBetweenSamples * 1.1
299+
extrapolateToInterval := sampledInterval
300+
301+
if durationToStart < extrapolationThreshold {
302+
extrapolateToInterval += durationToStart
303+
} else {
304+
extrapolateToInterval += averageDurationBetweenSamples / 2
305+
}
306+
if durationToEnd < extrapolationThreshold {
307+
extrapolateToInterval += durationToEnd
308+
} else {
309+
extrapolateToInterval += averageDurationBetweenSamples / 2
310+
}
311+
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
312+
if isRate {
313+
seconds := selRange.Seconds()
314+
resultValue = resultValue / seconds
315+
}
316+
317+
return resultValue
318+
}
319+
320+
func durationMilliseconds(d time.Duration) int64 {
321+
return int64(d / (time.Millisecond / time.Nanosecond))
322+
}
323+
236324
// rateLogBytes calculates the per-second rate of log bytes.
237325
func rateLogBytes(selRange time.Duration) func(samples []promql.Point) float64 {
238326
return func(samples []promql.Point) float64 {

pkg/logql/syntax/ast.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -616,20 +616,21 @@ const (
616616
OpTypeTopK = "topk"
617617

618618
// range vector ops
619-
OpRangeTypeCount = "count_over_time"
620-
OpRangeTypeRate = "rate"
621-
OpRangeTypeBytes = "bytes_over_time"
622-
OpRangeTypeBytesRate = "bytes_rate"
623-
OpRangeTypeAvg = "avg_over_time"
624-
OpRangeTypeSum = "sum_over_time"
625-
OpRangeTypeMin = "min_over_time"
626-
OpRangeTypeMax = "max_over_time"
627-
OpRangeTypeStdvar = "stdvar_over_time"
628-
OpRangeTypeStddev = "stddev_over_time"
629-
OpRangeTypeQuantile = "quantile_over_time"
630-
OpRangeTypeFirst = "first_over_time"
631-
OpRangeTypeLast = "last_over_time"
632-
OpRangeTypeAbsent = "absent_over_time"
619+
OpRangeTypeCount = "count_over_time"
620+
OpRangeTypeRate = "rate"
621+
OpRangeTypeRateCounter = "rate_counter"
622+
OpRangeTypeBytes = "bytes_over_time"
623+
OpRangeTypeBytesRate = "bytes_rate"
624+
OpRangeTypeAvg = "avg_over_time"
625+
OpRangeTypeSum = "sum_over_time"
626+
OpRangeTypeMin = "min_over_time"
627+
OpRangeTypeMax = "max_over_time"
628+
OpRangeTypeStdvar = "stdvar_over_time"
629+
OpRangeTypeStddev = "stddev_over_time"
630+
OpRangeTypeQuantile = "quantile_over_time"
631+
OpRangeTypeFirst = "first_over_time"
632+
OpRangeTypeLast = "last_over_time"
633+
OpRangeTypeAbsent = "absent_over_time"
633634

634635
// binops - logical/set
635636
OpTypeOr = "or"
@@ -778,7 +779,9 @@ func (e RangeAggregationExpr) validate() error {
778779
}
779780
if e.Left.Unwrap != nil {
780781
switch e.Operation {
781-
case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeRate, OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast:
782+
case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev,
783+
OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeRate, OpRangeTypeRateCounter,
784+
OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast:
782785
return nil
783786
default:
784787
return fmt.Errorf("invalid aggregation %s with unwrap", e.Operation)

pkg/logql/syntax/expr.y

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ import (
107107
%token <str> IDENTIFIER STRING NUMBER
108108
%token <duration> DURATION RANGE
109109
%token <val> MATCHERS LABELS EQ RE NRE OPEN_BRACE CLOSE_BRACE OPEN_BRACKET CLOSE_BRACKET COMMA DOT PIPE_MATCH PIPE_EXACT
110-
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
110+
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE RATE_COUNTER SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
111111
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
112112
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
113113
FIRST_OVER_TIME LAST_OVER_TIME ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET PATTERN IP ON IGNORING GROUP_LEFT GROUP_RIGHT
@@ -457,6 +457,7 @@ vectorOp:
457457
rangeOp:
458458
COUNT_OVER_TIME { $$ = OpRangeTypeCount }
459459
| RATE { $$ = OpRangeTypeRate }
460+
| RATE_COUNTER { $$ = OpRangeTypeRateCounter }
460461
| BYTES_OVER_TIME { $$ = OpRangeTypeBytes }
461462
| BYTES_RATE { $$ = OpRangeTypeBytesRate }
462463
| AVG_OVER_TIME { $$ = OpRangeTypeAvg }

0 commit comments

Comments
 (0)