Skip to content

Commit a89d2cd

Browse files
committed
Adding cache isolation test
Signed-off-by: alanprot <[email protected]>
1 parent 7474d0c commit a89d2cd

File tree

2 files changed

+66
-3
lines changed

2 files changed

+66
-3
lines changed

pkg/ingester/ingester_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5080,6 +5080,64 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) {
50805080
`), "cortex_ingester_instance_limits"))
50815081
}
50825082

5083+
func TestExpendedPostingsCacheIsolation(t *testing.T) {
5084+
cfg := defaultIngesterTestConfig(t)
5085+
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
5086+
cfg.LifecyclerConfig.JoinAfter = 0
5087+
cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{
5088+
SeedSize: 1, // lets make sure all metric names collide
5089+
Head: cortex_tsdb.PostingsCacheConfig{
5090+
Enabled: true,
5091+
},
5092+
Blocks: cortex_tsdb.PostingsCacheConfig{
5093+
Enabled: true,
5094+
},
5095+
}
5096+
5097+
r := prometheus.NewRegistry()
5098+
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
5099+
require.NoError(t, err)
5100+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
5101+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
5102+
5103+
numberOfTenants := 100
5104+
wg := sync.WaitGroup{}
5105+
wg.Add(numberOfTenants)
5106+
5107+
for j := 0; j < numberOfTenants; j++ {
5108+
go func() {
5109+
defer wg.Done()
5110+
userId := fmt.Sprintf("user%v", j)
5111+
ctx := user.InjectOrgID(context.Background(), userId)
5112+
_, err = i.Push(ctx, cortexpb.ToWriteRequest(
5113+
[]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API))
5114+
require.NoError(t, err)
5115+
}()
5116+
}
5117+
5118+
wg.Wait()
5119+
5120+
wg.Add(numberOfTenants)
5121+
for j := 0; j < numberOfTenants; j++ {
5122+
go func() {
5123+
defer wg.Done()
5124+
userId := fmt.Sprintf("user%v", j)
5125+
ctx := user.InjectOrgID(context.Background(), userId)
5126+
s := &mockQueryStreamServer{ctx: ctx}
5127+
5128+
err := i.QueryStream(&client.QueryRequest{
5129+
StartTimestampMs: 0,
5130+
EndTimestampMs: math.MaxInt64,
5131+
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
5132+
}, s)
5133+
require.NoError(t, err)
5134+
require.Len(t, s.series, 1)
5135+
require.Len(t, s.series[0].Labels, 2)
5136+
}()
5137+
}
5138+
wg.Wait()
5139+
}
5140+
50835141
func TestExpendedPostingsCache(t *testing.T) {
50845142
cfg := defaultIngesterTestConfig(t)
50855143
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ type TSDBPostingsCacheConfig struct {
6868
Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."`
6969
Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."`
7070

71+
// The configurations below are used only for testing purpose
7172
PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"`
73+
SeedSize int `yaml:"-"`
7274
timeNow func() time.Time `yaml:"-"`
7375
}
7476

@@ -97,10 +99,13 @@ type ExpandedPostingsCacheFactory struct {
9799

98100
func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory {
99101
if cfg.Head.Enabled || cfg.Blocks.Enabled {
102+
if cfg.SeedSize == 0 {
103+
cfg.SeedSize = seedArraySize
104+
}
100105
logutil.WarnExperimentalUse("expanded postings cache")
101106
return &ExpandedPostingsCacheFactory{
102107
cfg: cfg,
103-
seedByHash: newSeedByHash(),
108+
seedByHash: newSeedByHash(cfg.SeedSize),
104109
}
105110
}
106111

@@ -287,9 +292,9 @@ type seedByHash struct {
287292
seedByHash []int
288293
}
289294

290-
func newSeedByHash() *seedByHash {
295+
func newSeedByHash(size int) *seedByHash {
291296
return &seedByHash{
292-
seedByHash: make([]int, seedArraySize),
297+
seedByHash: make([]int, size),
293298
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
294299
}
295300
}

0 commit comments

Comments
 (0)