Skip to content

Commit 7d13c2f

Browse files
authored
Merge pull request #1576 from cortexproject/1522-chunkstorerandom-flake
Chunk time ranges are inclusive.
2 parents 53f2043 + b700996 commit 7d13c2f

File tree

5 files changed

+81
-94
lines changed

5 files changed

+81
-94
lines changed

pkg/chunk/chunk_store_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ func TestChunkStoreRandom(t *testing.T) {
592592
},
593593
chunks[0],
594594
ts,
595-
ts.Add(chunkLen*time.Second),
595+
ts.Add(chunkLen*time.Second).Add(-1*time.Second),
596596
)
597597
err := chunk.Encode()
598598
require.NoError(t, err)
@@ -602,8 +602,8 @@ func TestChunkStoreRandom(t *testing.T) {
602602

603603
// pick two random numbers and do a query
604604
for i := 0; i < 100; i++ {
605-
start := rand.Int63n(100 * chunkLen)
606-
end := start + rand.Int63n((100*chunkLen)-start)
605+
start := rand.Int63n(99 * chunkLen)
606+
end := start + 1 + rand.Int63n((99*chunkLen)-start)
607607
assert.True(t, start < end)
608608

609609
startTime := model.TimeFromUnix(start)

pkg/chunk/schema_caching.go

Lines changed: 39 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,80 +14,61 @@ type schemaCaching struct {
1414
}
1515

1616
func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
17-
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))
18-
19-
cacheableQueries, err := s.Schema.GetReadQueriesForMetric(cFrom, cThrough, userID, metricName)
20-
if err != nil {
21-
return nil, err
22-
}
23-
24-
activeQueries, err := s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
25-
if err != nil {
26-
return nil, err
27-
}
28-
29-
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
17+
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
18+
return s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
19+
})
3020
}
3121

3222
func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
33-
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))
34-
35-
cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabel(cFrom, cThrough, userID, metricName, labelName)
36-
if err != nil {
37-
return nil, err
38-
}
39-
40-
activeQueries, err := s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
41-
if err != nil {
42-
return nil, err
43-
}
44-
45-
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
23+
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
24+
return s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
25+
})
4626
}
4727

4828
func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
49-
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))
50-
51-
cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(cFrom, cThrough, userID, metricName, labelName, labelValue)
52-
if err != nil {
53-
return nil, err
54-
}
55-
56-
activeQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
57-
if err != nil {
58-
return nil, err
59-
}
60-
61-
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
29+
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
30+
return s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
31+
})
6232
}
6333

6434
// If the query resulted in series IDs, use this method to find chunks.
6535
func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
66-
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))
67-
68-
cacheableQueries, err := s.Schema.GetChunksForSeries(cFrom, cThrough, userID, seriesID)
69-
if err != nil {
70-
return nil, err
71-
}
72-
73-
activeQueries, err := s.Schema.GetChunksForSeries(from, through, userID, seriesID)
74-
if err != nil {
75-
return nil, err
76-
}
77-
78-
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
36+
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
37+
return s.Schema.GetChunksForSeries(from, through, userID, seriesID)
38+
})
7939
}
8040

81-
func splitTimesByCacheability(from, through model.Time, cacheBefore model.Time) (model.Time, model.Time, model.Time, model.Time) {
41+
func (s *schemaCaching) splitTimesByCacheability(from, through model.Time, f func(from, through model.Time) ([]IndexQuery, error)) ([]IndexQuery, error) {
42+
var (
43+
cacheableQueries []IndexQuery
44+
activeQueries []IndexQuery
45+
err error
46+
cacheBefore = model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
47+
)
48+
8249
if from.After(cacheBefore) {
83-
return 0, 0, from, through
84-
}
50+
activeQueries, err = f(from, through)
51+
if err != nil {
52+
return nil, err
53+
}
54+
} else if through.Before(cacheBefore) {
55+
cacheableQueries, err = f(from, through)
56+
if err != nil {
57+
return nil, err
58+
}
59+
} else {
60+
cacheableQueries, err = f(from, cacheBefore)
61+
if err != nil {
62+
return nil, err
63+
}
8564

86-
if through.Before(cacheBefore) {
87-
return from, through, 0, 0
65+
activeQueries, err = f(cacheBefore, through)
66+
if err != nil {
67+
return nil, err
68+
}
8869
}
8970

90-
return from, cacheBefore, cacheBefore, through
71+
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
9172
}
9273

9374
func mergeCacheableAndActiveQueries(cacheableQueries []IndexQuery, activeQueries []IndexQuery) []IndexQuery {

pkg/chunk/schema_caching_test.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package chunk
22

33
import (
4+
"strconv"
45
"testing"
56
"time"
67

78
"github.com/prometheus/common/model"
9+
"github.com/stretchr/testify/assert"
810
"github.com/stretchr/testify/require"
911
"github.com/weaveworks/common/mtime"
1012
)
@@ -26,7 +28,7 @@ func TestCachingSchema(t *testing.T) {
2628

2729
mtime.NowForce(baseTime)
2830

29-
for _, tc := range []struct {
31+
for i, tc := range []struct {
3032
from, through time.Time
3133

3234
cacheableIdx int
@@ -56,20 +58,16 @@ func TestCachingSchema(t *testing.T) {
5658
0,
5759
},
5860
} {
59-
have, err := schema.GetReadQueriesForMetric(
60-
model.TimeFromUnix(tc.from.Unix()), model.TimeFromUnix(tc.through.Unix()),
61-
userID, "foo",
62-
)
63-
if err != nil {
64-
t.Fatal(err)
65-
}
61+
t.Run(strconv.Itoa(i), func(t *testing.T) {
62+
have, err := schema.GetReadQueriesForMetric(
63+
model.TimeFromUnix(tc.from.Unix()), model.TimeFromUnix(tc.through.Unix()),
64+
userID, "foo",
65+
)
66+
require.NoError(t, err)
6667

67-
for i := range have {
68-
if i <= tc.cacheableIdx {
69-
require.True(t, have[i].Immutable)
70-
} else {
71-
require.False(t, have[i].Immutable)
68+
for i := range have {
69+
assert.Equal(t, have[i].Immutable, i <= tc.cacheableIdx, i)
7270
}
73-
}
71+
})
7472
}
7573
}

pkg/chunk/schema_config.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,6 @@ func (cfg *PeriodConfig) hourlyBuckets(from, through model.Time, userID string)
259259
result = []Bucket{}
260260
)
261261

262-
// If through ends on the hour, don't include the upcoming hour
263-
if through.Unix()%secondsInHour == 0 {
264-
throughHour--
265-
}
266-
267262
for i := fromHour; i <= throughHour; i++ {
268263
relativeFrom := util.Max64(0, int64(from)-(i*millisecondsInHour))
269264
relativeThrough := util.Min64(millisecondsInHour, int64(through)-(i*millisecondsInHour))
@@ -284,11 +279,6 @@ func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) [
284279
result = []Bucket{}
285280
)
286281

287-
// If through ends on 00:00 of the day, don't include the upcoming day
288-
if through.Unix()%secondsInDay == 0 {
289-
throughDay--
290-
}
291-
292282
for i := fromDay; i <= throughDay; i++ {
293283
// The idea here is that the hash key contains the bucket start time (rounded to
294284
// the nearest day). The range key can contain the offset from that, to the

pkg/chunk/schema_config_test.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package chunk
22

33
import (
4-
"reflect"
54
"testing"
65
"time"
76

87
"github.com/prometheus/common/model"
8+
"github.com/stretchr/testify/assert"
99
"github.com/stretchr/testify/require"
1010
)
1111

@@ -34,7 +34,12 @@ func TestHourlyBuckets(t *testing.T) {
3434
from: model.TimeFromUnix(0),
3535
through: model.TimeFromUnix(0),
3636
},
37-
[]Bucket{},
37+
[]Bucket{{
38+
from: 0,
39+
through: 0,
40+
tableName: "table",
41+
hashKey: "0:0",
42+
}},
3843
},
3944
{
4045
"30 minute window",
@@ -60,6 +65,11 @@ func TestHourlyBuckets(t *testing.T) {
6065
through: 3600 * 1000, // ms
6166
tableName: "table",
6267
hashKey: "0:0",
68+
}, {
69+
from: 0,
70+
through: 0, // ms
71+
tableName: "table",
72+
hashKey: "0:1",
6373
}},
6474
},
6575
{
@@ -88,9 +98,8 @@ func TestHourlyBuckets(t *testing.T) {
8898
}
8999
for _, tt := range tests {
90100
t.Run(tt.name, func(t *testing.T) {
91-
if got := cfg.hourlyBuckets(tt.args.from, tt.args.through, userID); !reflect.DeepEqual(got, tt.want) {
92-
t.Errorf("SchemaConfig.dailyBuckets() = %v, want %v", got, tt.want)
93-
}
101+
got := cfg.hourlyBuckets(tt.args.from, tt.args.through, userID)
102+
assert.Equal(t, tt.want, got)
94103
})
95104
}
96105
}
@@ -120,7 +129,12 @@ func TestDailyBuckets(t *testing.T) {
120129
from: model.TimeFromUnix(0),
121130
through: model.TimeFromUnix(0),
122131
},
123-
[]Bucket{},
132+
[]Bucket{{
133+
from: 0,
134+
through: 0,
135+
tableName: "table",
136+
hashKey: "0:d0",
137+
}},
124138
},
125139
{
126140
"6 hour window",
@@ -146,6 +160,11 @@ func TestDailyBuckets(t *testing.T) {
146160
through: (24 * 3600) * 1000, // ms
147161
tableName: "table",
148162
hashKey: "0:d0",
163+
}, {
164+
from: 0,
165+
through: 0,
166+
tableName: "table",
167+
hashKey: "0:d1",
149168
}},
150169
},
151170
{
@@ -174,9 +193,8 @@ func TestDailyBuckets(t *testing.T) {
174193
}
175194
for _, tt := range tests {
176195
t.Run(tt.name, func(t *testing.T) {
177-
if got := cfg.dailyBuckets(tt.args.from, tt.args.through, userID); !reflect.DeepEqual(got, tt.want) {
178-
t.Errorf("SchemaConfig.dailyBuckets() = %v, want %v", got, tt.want)
179-
}
196+
got := cfg.dailyBuckets(tt.args.from, tt.args.through, userID)
197+
assert.Equal(t, tt.want, got)
180198
})
181199
}
182200
}

0 commit comments

Comments
 (0)