Skip to content

Commit 1906dc2

Browse files
fix(alloc): set a limit on preallocations (backport release-3.5.x) (#20919)
Co-authored-by: Ivan Kalita <ivan.kalita@grafana.com>
1 parent 9c102c0 commit 1906dc2

File tree

5 files changed

+76
-6
lines changed

5 files changed

+76
-6
lines changed

pkg/querier/querier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,8 @@ func determineType(value string) logproto.DetectedFieldType {
10181018
}
10191019

10201020
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
1021-
detectedFields := make(map[string]*parsedFields, limit)
1021+
const maxDetectedFieldsPreAlloc = 1000
1022+
detectedFields := make(map[string]*parsedFields, min(maxDetectedFieldsPreAlloc, limit))
10221023
fieldCount := uint32(0)
10231024
emtpyparsers := []string{}
10241025

pkg/querier/queryrange/detected_fields.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ func determineType(value string) logproto.DetectedFieldType {
280280
}
281281

282282
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
283-
detectedFields := make(map[string]*parsedFields, limit)
283+
const maxDetectedFieldsPreAlloc = 1000
284+
detectedFields := make(map[string]*parsedFields, min(maxDetectedFieldsPreAlloc, limit))
284285
fieldCount := uint32(0)
285286
emtpyparsers := []string{}
286287

pkg/querier/queryrange/detected_fields_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"math"
7+
runtime "runtime"
78
"slices"
89
"testing"
910
"time"
@@ -117,6 +118,27 @@ func Test_parseDetectedFields(t *testing.T) {
117118
}
118119
})
119120

121+
t.Run("detects fields with huge limit doesn't explode memory", func(t *testing.T) {
122+
runtime.GC()
123+
var before runtime.MemStats
124+
runtime.ReadMemStats(&before)
125+
126+
df := parseDetectedFields(1000000, logqlmodel.Streams([]push.Stream{rulerStream}))
127+
require.True(t, len(df) > 0)
128+
129+
runtime.GC()
130+
var after runtime.MemStats
131+
runtime.ReadMemStats(&after)
132+
133+
delta := int64(after.TotalAlloc) - int64(before.TotalAlloc)
134+
// 10 MB
135+
if delta > 10*1024*1024 {
136+
t.Fatalf("heap grew too much: %d MB", delta/1024/1024)
137+
}
138+
139+
runtime.KeepAlive(df)
140+
})
141+
120142
t.Run("detects json fields", func(t *testing.T) {
121143
df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream}))
122144
for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} {

pkg/storage/detected/fields.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ func MergeFields(
5555
fields []*logproto.DetectedField,
5656
limit uint32,
5757
) ([]*logproto.DetectedField, error) {
58-
mergedFields := make(map[string]*UnmarshaledDetectedField, limit)
58+
const maxMergedFieldsPreAlloc = 1000
59+
mergedFields := make(map[string]*UnmarshaledDetectedField, min(maxMergedFieldsPreAlloc, limit))
5960
foundFields := uint32(0)
6061

6162
for _, field := range fields {
@@ -86,7 +87,7 @@ func MergeFields(
8687
}
8788
}
8889

89-
result := make([]*logproto.DetectedField, 0, limit)
90+
result := make([]*logproto.DetectedField, 0, len(mergedFields))
9091
for _, field := range mergedFields {
9192
detectedField := &logproto.DetectedField{
9293
Label: field.Label,
@@ -105,7 +106,8 @@ func MergeValues(
105106
values []string,
106107
limit uint32,
107108
) ([]string, error) {
108-
mergedValues := make(map[string]struct{}, limit)
109+
const maxMergedValuesPreAlloc = 1000
110+
mergedValues := make(map[string]struct{}, min(maxMergedValuesPreAlloc, limit))
109111

110112
for _, value := range values {
111113
if value == "" {
@@ -119,7 +121,7 @@ func MergeValues(
119121
mergedValues[value] = struct{}{}
120122
}
121123

122-
result := make([]string, 0, limit)
124+
result := make([]string, 0, len(mergedValues))
123125
for value := range mergedValues {
124126
result = append(result, value)
125127
}

pkg/storage/detected/fields_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package detected
22

33
import (
4+
"runtime"
45
"testing"
56

67
"github.com/axiomhq/hyperloglog"
@@ -88,6 +89,27 @@ func Test_MergeFields(t *testing.T) {
8889
assert.Equal(t, logproto.DetectedFieldString, baz.Type)
8990
})
9091

92+
t.Run("huge limit doesn't explode the heap", func(t *testing.T) {
93+
runtime.GC()
94+
var before runtime.MemStats
95+
runtime.ReadMemStats(&before)
96+
97+
result, err := MergeFields(fields, 10000000)
98+
require.NoError(t, err)
99+
100+
runtime.GC()
101+
var after runtime.MemStats
102+
runtime.ReadMemStats(&after)
103+
104+
delta := int64(after.TotalAlloc) - int64(before.TotalAlloc)
105+
// 10 MB
106+
if delta > 10*1024*1024 {
107+
t.Fatalf("heap grew too much: %d MB", delta/1024/1024)
108+
}
109+
110+
runtime.KeepAlive(result)
111+
})
112+
91113
t.Run("returns up to limit number of fields", func(t *testing.T) {
92114
lowLimit := uint32(1)
93115
result, err := MergeFields(fields, lowLimit)
@@ -125,6 +147,28 @@ func Test_MergeValues(t *testing.T) {
125147
assert.ElementsMatch(t, []string{"foo", "bar", "baz", "qux"}, result)
126148
})
127149

150+
t.Run("huge limit doesn't explode the heap", func(t *testing.T) {
151+
runtime.GC()
152+
var before runtime.MemStats
153+
runtime.ReadMemStats(&before)
154+
155+
values := []string{"foo", "bar", "baz", "qux"}
156+
result, err := MergeValues(values, 1000000)
157+
require.NoError(t, err)
158+
159+
runtime.GC()
160+
var after runtime.MemStats
161+
runtime.ReadMemStats(&after)
162+
163+
delta := int64(after.TotalAlloc) - int64(before.TotalAlloc)
164+
// 10 MB
165+
if delta > 10*1024*1024 {
166+
t.Fatalf("heap grew too much: %d MB", delta/1024/1024)
167+
}
168+
169+
runtime.KeepAlive(result)
170+
})
171+
128172
t.Run("merges repeating values", func(t *testing.T) {
129173
values := []string{"foo", "bar", "baz", "qux", "foo", "bar", "baz", "qux"}
130174
limit := uint32(50)

0 commit comments

Comments
 (0)