Skip to content

Commit b315ed0

Browse files
authored
Sum values in unwrapped rate aggregation instead of treating them as counter (#6361)
* Revert unwrapped rate aggregation to previous implementation This PR reverts the implementation done in #5013 to the original implementation that sums the extracted values from the log lines instead of treating them like a Prometheus counter metric. Signed-off-by: Christian Haudum <[email protected]> * Move changelog entry Signed-off-by: Christian Haudum <[email protected]> * Remove unused/dead code Signed-off-by: Christian Haudum <[email protected]> * Clean changelog Signed-off-by: Christian Haudum <[email protected]>
1 parent 21f6964 commit b315ed0

File tree

4 files changed

+40
-86
lines changed

4 files changed

+40
-86
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
* [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values
9393
* [6375](https://github.com/grafana/loki/pull/6375) **dannykopping**: Fix bug that prevented users from using the `json` parser after a `line_format` pipeline stage.
9494
##### Changes
95+
* [6361](https://github.com/grafana/loki/pull/6361) **chaudum**: Sum values in unwrapped rate aggregation instead of treating them as counter.
9596
* [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time.
9697
* [6120](https://github.com/grafana/loki/pull/6120) **KMiller-Grafana**: Rename configuration parameter fudge_duplicate_timestamp to be increment_duplicate_timestamp.
9798
* [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable

docs/sources/logql/metric_queries.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ We currently support the functions:
6868
6969
Supported function for operating over unwrapped ranges are:
7070
71-
- `rate(unwrapped-range)`: calculates per second rate of all values in the specified interval.
71+
- `rate(unwrapped-range)`: calculates per second rate of the sum of all values in the specified interval.
7272
- `sum_over_time(unwrapped-range)`: the sum of all values in the specified interval.
7373
- `avg_over_time(unwrapped-range)`: the average value of all points in the specified interval.
7474
- `max_over_time(unwrapped-range)`: the maximum value of all points in the specified interval.

pkg/logql/engine_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,36 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
4949
expected interface{}
5050
}{
5151
{
52-
`rate({app="foo"} | unwrap foo [30s])`, time.Unix(60, 0), logproto.FORWARD, 10,
52+
`rate({app="foo"} | unwrap foo [30s])`,
53+
time.Unix(60, 0),
54+
logproto.FORWARD,
55+
10,
56+
[][]logproto.Series{
57+
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
58+
{newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)},
59+
},
60+
[]SelectSampleParams{
61+
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
62+
},
63+
// SUM(n=47, 61, 1) = 15
64+
// 15 / 30 = 0.5
65+
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.5}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
66+
},
67+
{
68+
`rate({app="foo"} | unwrap foo [30s])`,
69+
time.Unix(60, 0),
70+
logproto.FORWARD,
71+
10,
5372
[][]logproto.Series{
5473
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
55-
{newSeries(testSize, offset(46, incValue(10)), `{app="foo"}`)},
74+
{newSeries(testSize, offset(46, incValue(1)), `{app="foo"}`)},
5675
},
5776
[]SelectSampleParams{
5877
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
5978
},
60-
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.46666766666666665}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
79+
// SUM(n=47, 61, n) = 810
80+
// 810 / 30 = 27
81+
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 27}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
6182
},
6283
} {
6384
test := test
@@ -150,7 +171,9 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
150171
[]SelectSampleParams{
151172
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
152173
},
153-
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
174+
// SUM(n=46, 61, 2) = 30
175+
// 30 / 30 = 1
176+
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 1.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
154177
},
155178
{
156179
`count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10,
@@ -1287,19 +1310,22 @@ func TestEngine_RangeQuery(t *testing.T) {
12871310
{
12881311
`rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
12891312
[][]logproto.Series{
1290-
{newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`), newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`)},
1313+
{
1314+
newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`),
1315+
newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`),
1316+
},
12911317
},
12921318
[]SelectSampleParams{
12931319
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"|unwrap bar[1m])`}},
12941320
},
12951321
promql.Matrix{
12961322
promql.Series{
12971323
Metric: labels.Labels{{Name: "app", Value: "bar"}},
1298-
Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}},
1324+
Points: []promql.Point{{T: 60 * 1000, V: 0.4}, {T: 90 * 1000, V: 0.4}, {T: 120 * 1000, V: 0.4}, {T: 150 * 1000, V: 0.4}, {T: 180 * 1000, V: 0.4}},
12991325
},
13001326
promql.Series{
13011327
Metric: labels.Labels{{Name: "app", Value: "foo"}},
1302-
Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}},
1328+
Points: []promql.Point{{T: 60 * 1000, V: 0.2}, {T: 90 * 1000, V: 0.2}, {T: 120 * 1000, V: 0.2}, {T: 150 * 1000, V: 0.2}, {T: 180 * 1000, V: 0.2}},
13031329
},
13041330
},
13051331
},

pkg/logql/range_vector.go

Lines changed: 5 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -218,92 +218,19 @@ func aggregator(r *syntax.RangeAggregationExpr) (RangeVectorAggregator, error) {
218218
}
219219
}
220220

221-
// rateLogs calculates the per-second rate of log lines.
221+
// rateLogs calculates the per-second rate of log lines or values extracted
222+
// from log lines
222223
func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.Point) float64 {
223224
return func(samples []promql.Point) float64 {
224225
if !computeValues {
225226
return float64(len(samples)) / selRange.Seconds()
226227
}
227-
return extrapolatedRate(samples, selRange, true, true)
228-
}
229-
}
230-
231-
// extrapolatedRate function is taken from prometheus code promql/functions.go:59
232-
// extrapolatedRate is a utility function for rate/increase/delta.
233-
// It calculates the rate (allowing for counter resets if isCounter is true),
234-
// extrapolates if the first/last sample is close to the boundary, and returns
235-
// the result as either per-second (if isRate is true) or overall.
236-
func extrapolatedRate(samples []promql.Point, selRange time.Duration, isCounter, isRate bool) float64 {
237-
// No sense in trying to compute a rate without at least two points. Drop
238-
// this Vector element.
239-
if len(samples) < 2 {
240-
return 0
241-
}
242-
var (
243-
rangeStart = samples[0].T - durationMilliseconds(selRange)
244-
rangeEnd = samples[len(samples)-1].T
245-
)
246-
247-
resultValue := samples[len(samples)-1].V - samples[0].V
248-
if isCounter {
249-
var lastValue float64
228+
var result float64
250229
for _, sample := range samples {
251-
if sample.V < lastValue {
252-
resultValue += lastValue
253-
}
254-
lastValue = sample.V
255-
}
256-
}
257-
258-
// Duration between first/last samples and boundary of range.
259-
durationToStart := float64(samples[0].T-rangeStart) / 1000
260-
durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000
261-
262-
sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000
263-
averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1)
264-
265-
if isCounter && resultValue > 0 && samples[0].V >= 0 {
266-
// Counters cannot be negative. If we have any slope at
267-
// all (i.e. resultValue went up), we can extrapolate
268-
// the zero point of the counter. If the duration to the
269-
// zero point is shorter than the durationToStart, we
270-
// take the zero point as the start of the series,
271-
// thereby avoiding extrapolation to negative counter
272-
// values.
273-
durationToZero := sampledInterval * (samples[0].V / resultValue)
274-
if durationToZero < durationToStart {
275-
durationToStart = durationToZero
230+
result += sample.V
276231
}
232+
return result / selRange.Seconds()
277233
}
278-
279-
// If the first/last samples are close to the boundaries of the range,
280-
// extrapolate the result. This is as we expect that another sample
281-
// will exist given the spacing between samples we've seen thus far,
282-
// with an allowance for noise.
283-
extrapolationThreshold := averageDurationBetweenSamples * 1.1
284-
extrapolateToInterval := sampledInterval
285-
286-
if durationToStart < extrapolationThreshold {
287-
extrapolateToInterval += durationToStart
288-
} else {
289-
extrapolateToInterval += averageDurationBetweenSamples / 2
290-
}
291-
if durationToEnd < extrapolationThreshold {
292-
extrapolateToInterval += durationToEnd
293-
} else {
294-
extrapolateToInterval += averageDurationBetweenSamples / 2
295-
}
296-
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
297-
if isRate {
298-
seconds := selRange.Seconds()
299-
resultValue = resultValue / seconds
300-
}
301-
302-
return resultValue
303-
}
304-
305-
func durationMilliseconds(d time.Duration) int64 {
306-
return int64(d / (time.Millisecond / time.Nanosecond))
307234
}
308235

309236
// rateLogBytes calculates the per-second rate of log bytes.

0 commit comments

Comments
 (0)