Skip to content

Commit 03f926b

Browse files
authored
Adding Test for getShardedRules call (#4449)
Signed-off-by: Alan Protasio <[email protected]>
1 parent 32b1b40 commit 03f926b

File tree

5 files changed

+259
-14
lines changed

5 files changed

+259
-14
lines changed

pkg/ruler/client_pool.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package ruler
33
import (
44
"time"
55

6+
"github.com/grafana/dskit/services"
7+
68
"github.com/go-kit/kit/log"
79
"github.com/pkg/errors"
810
"github.com/prometheus/client_golang/prometheus"
@@ -14,7 +16,26 @@ import (
1416
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1517
)
1618

17-
func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) *client.Pool {
19+
// ClientsPool is the interface used to get the client from the pool for a specified address.
20+
type ClientsPool interface {
21+
services.Service
22+
// GetClientFor returns the ruler client for the given address.
23+
GetClientFor(addr string) (RulerClient, error)
24+
}
25+
26+
type rulerClientsPool struct {
27+
*client.Pool
28+
}
29+
30+
func (p *rulerClientsPool) GetClientFor(addr string) (RulerClient, error) {
31+
c, err := p.Pool.GetClientFor(addr)
32+
if err != nil {
33+
return nil, err
34+
}
35+
return c.(RulerClient), nil
36+
}
37+
38+
func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) ClientsPool {
1839
// We prefer sane defaults instead of exposing further config options.
1940
poolCfg := client.PoolConfig{
2041
CheckInterval: time.Minute,
@@ -27,7 +48,9 @@ func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prom
2748
Help: "The current number of ruler clients in the pool.",
2849
})
2950

30-
return client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger)
51+
return &rulerClientsPool{
52+
client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger),
53+
}
3154
}
3255

3356
func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {

pkg/ruler/lifecycle_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestRulerShutdown(t *testing.T) {
2424
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
2525
defer cleanup()
2626

27-
r, rcleanup := newRuler(t, config)
27+
r, rcleanup := buildRuler(t, config, nil)
2828
defer rcleanup()
2929

3030
r.cfg.EnableSharding = true
@@ -59,7 +59,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
5959
ctx := context.Background()
6060
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
6161
defer cleanup()
62-
r, rcleanup := newRuler(t, config)
62+
r, rcleanup := buildRuler(t, config, nil)
6363
defer rcleanup()
6464
r.cfg.EnableSharding = true
6565
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond

pkg/ruler/ruler.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929

3030
"github.com/cortexproject/cortex/pkg/cortexpb"
3131
"github.com/cortexproject/cortex/pkg/ring"
32-
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
3332
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
3433
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
3534
"github.com/cortexproject/cortex/pkg/tenant"
@@ -237,7 +236,7 @@ type Ruler struct {
237236
subservicesWatcher *services.FailureWatcher
238237

239238
// Pool of clients used to connect to other ruler replicas.
240-
clientsPool *ring_client.Pool
239+
clientsPool ClientsPool
241240

242241
ringCheckErrors prometheus.Counter
243242
rulerSync *prometheus.CounterVec
@@ -250,14 +249,18 @@ type Ruler struct {
250249

251250
// NewRuler creates a new ruler from a distributor and chunk store.
252251
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
252+
return newRuler(cfg, manager, reg, logger, ruleStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg))
253+
}
254+
255+
func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
253256
ruler := &Ruler{
254257
cfg: cfg,
255258
store: ruleStore,
256259
manager: manager,
257260
registry: reg,
258261
logger: logger,
259262
limits: limits,
260-
clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
263+
clientsPool: clientPool,
261264
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
262265

263266
ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
@@ -763,12 +766,12 @@ func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error)
763766
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
764767
addr := job.(string)
765768

766-
grpcClient, err := r.clientsPool.GetClientFor(addr)
769+
rulerClient, err := r.clientsPool.GetClientFor(addr)
767770
if err != nil {
768771
return errors.Wrapf(err, "unable to get client for ruler %s", addr)
769772
}
770773

771-
newGrps, err := grpcClient.(RulerClient).Rules(ctx, &RulesRequest{})
774+
newGrps, err := rulerClient.Rules(ctx, &RulesRequest{})
772775
if err != nil {
773776
return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr)
774777
}

pkg/ruler/ruler_test.go

Lines changed: 222 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import (
1515
"testing"
1616
"time"
1717

18+
"go.uber.org/atomic"
19+
20+
"google.golang.org/grpc"
21+
1822
"github.com/go-kit/kit/log"
1923
"github.com/go-kit/kit/log/level"
2024
"github.com/gorilla/mux"
@@ -135,7 +139,45 @@ func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) {
135139
return manager, cleanup
136140
}
137141

138-
func newRuler(t *testing.T, cfg Config) (*Ruler, func()) {
142+
type mockRulerClientsPool struct {
143+
ClientsPool
144+
cfg Config
145+
rulerAddrMap map[string]*Ruler
146+
numberOfCalls atomic.Int32
147+
}
148+
149+
type mockRulerClient struct {
150+
ruler *Ruler
151+
numberOfCalls *atomic.Int32
152+
}
153+
154+
func (c *mockRulerClient) Rules(ctx context.Context, in *RulesRequest, _ ...grpc.CallOption) (*RulesResponse, error) {
155+
c.numberOfCalls.Inc()
156+
return c.ruler.Rules(ctx, in)
157+
}
158+
159+
func (p *mockRulerClientsPool) GetClientFor(addr string) (RulerClient, error) {
160+
for _, r := range p.rulerAddrMap {
161+
if r.lifecycler.GetInstanceAddr() == addr {
162+
return &mockRulerClient{
163+
ruler: r,
164+
numberOfCalls: &p.numberOfCalls,
165+
}, nil
166+
}
167+
}
168+
169+
return nil, fmt.Errorf("unable to find ruler for add %s", addr)
170+
}
171+
172+
func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer, rulerAddrMap map[string]*Ruler) *mockRulerClientsPool {
173+
return &mockRulerClientsPool{
174+
ClientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
175+
cfg: cfg,
176+
rulerAddrMap: rulerAddrMap,
177+
}
178+
}
179+
180+
func buildRuler(t *testing.T, cfg Config, rulerAddrMap map[string]*Ruler) (*Ruler, func()) {
139181
engine, noopQueryable, pusher, logger, overrides, cleanup := testSetup(t, cfg)
140182
storage, err := NewLegacyRuleStore(cfg.StoreConfig, promRules.FileLoader{}, log.NewNopLogger())
141183
require.NoError(t, err)
@@ -145,21 +187,21 @@ func newRuler(t *testing.T, cfg Config) (*Ruler, func()) {
145187
manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, reg, log.NewNopLogger())
146188
require.NoError(t, err)
147189

148-
ruler, err := NewRuler(
190+
ruler, err := newRuler(
149191
cfg,
150192
manager,
151193
reg,
152194
logger,
153195
storage,
154196
overrides,
197+
newMockClientsPool(cfg, logger, reg, rulerAddrMap),
155198
)
156199
require.NoError(t, err)
157-
158200
return ruler, cleanup
159201
}
160202

161203
func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) {
162-
ruler, cleanup := newRuler(t, cfg)
204+
ruler, cleanup := buildRuler(t, cfg, nil)
163205
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))
164206

165207
// Ensure all rules are loaded before usage
@@ -252,6 +294,171 @@ func compareRuleGroupDescToStateDesc(t *testing.T, expected *rulespb.RuleGroupDe
252294
}
253295
}
254296

297+
func TestGetRules(t *testing.T) {
298+
// ruler ID -> (user ID -> list of groups).
299+
type expectedRulesMap map[string]map[string]rulespb.RuleGroupList
300+
301+
type testCase struct {
302+
sharding bool
303+
shardingStrategy string
304+
shuffleShardSize int
305+
}
306+
307+
expectedRules := expectedRulesMap{
308+
"ruler1": map[string]rulespb.RuleGroupList{
309+
"user1": {
310+
&rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "first", Interval: 10 * time.Second},
311+
&rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "second", Interval: 10 * time.Second},
312+
},
313+
"user2": {
314+
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "third", Interval: 10 * time.Second},
315+
},
316+
},
317+
"ruler2": map[string]rulespb.RuleGroupList{
318+
"user1": {
319+
&rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "third", Interval: 10 * time.Second},
320+
},
321+
"user2": {
322+
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "first", Interval: 10 * time.Second},
323+
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "second", Interval: 10 * time.Second},
324+
},
325+
},
326+
"ruler3": map[string]rulespb.RuleGroupList{
327+
"user3": {
328+
&rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "third", Interval: 10 * time.Second},
329+
},
330+
"user2": {
331+
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "forth", Interval: 10 * time.Second},
332+
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "fifty", Interval: 10 * time.Second},
333+
},
334+
},
335+
}
336+
337+
testCases := map[string]testCase{
338+
"No Sharding": {
339+
sharding: false,
340+
},
341+
"Default Sharding": {
342+
sharding: true,
343+
shardingStrategy: util.ShardingStrategyDefault,
344+
},
345+
"Shuffle Sharding and ShardSize = 2": {
346+
sharding: true,
347+
shuffleShardSize: 2,
348+
shardingStrategy: util.ShardingStrategyShuffle,
349+
},
350+
}
351+
352+
for name, tc := range testCases {
353+
t.Run(name, func(t *testing.T) {
354+
kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
355+
t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) })
356+
allRulesByUser := map[string]rulespb.RuleGroupList{}
357+
allRulesByRuler := map[string]rulespb.RuleGroupList{}
358+
allTokensByRuler := map[string][]uint32{}
359+
rulerAddrMap := map[string]*Ruler{}
360+
361+
createRuler := func(id string) *Ruler {
362+
cfg, cleanUp := defaultRulerConfig(t, newMockRuleStore(allRulesByUser))
363+
t.Cleanup(cleanUp)
364+
365+
cfg.ShardingStrategy = tc.shardingStrategy
366+
cfg.EnableSharding = tc.sharding
367+
368+
cfg.Ring = RingConfig{
369+
InstanceID: id,
370+
InstanceAddr: id,
371+
KVStore: kv.Config{
372+
Mock: kvStore,
373+
},
374+
}
375+
376+
r, cleanUp := buildRuler(t, cfg, rulerAddrMap)
377+
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
378+
t.Cleanup(cleanUp)
379+
rulerAddrMap[id] = r
380+
if r.ring != nil {
381+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring))
382+
t.Cleanup(r.ring.StopAsync)
383+
}
384+
return r
385+
}
386+
387+
for rID, r := range expectedRules {
388+
createRuler(rID)
389+
for user, rules := range r {
390+
allRulesByUser[user] = append(allRulesByUser[user], rules...)
391+
allRulesByRuler[rID] = append(allRulesByRuler[rID], rules...)
392+
allTokensByRuler[rID] = generateTokenForGroups(rules, 1)
393+
}
394+
}
395+
396+
if tc.sharding {
397+
err := kvStore.CAS(context.Background(), ring.RulerRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
398+
d, _ := in.(*ring.Desc)
399+
if d == nil {
400+
d = ring.NewDesc()
401+
}
402+
for rID, tokens := range allTokensByRuler {
403+
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now())
404+
}
405+
return d, true, nil
406+
})
407+
require.NoError(t, err)
408+
// Wait a bit to make sure ruler's ring is updated.
409+
time.Sleep(100 * time.Millisecond)
410+
}
411+
412+
forEachRuler := func(f func(rID string, r *Ruler)) {
413+
for rID, r := range rulerAddrMap {
414+
f(rID, r)
415+
}
416+
}
417+
418+
// Sync Rules
419+
forEachRuler(func(_ string, r *Ruler) {
420+
r.syncRules(context.Background(), rulerSyncReasonInitial)
421+
})
422+
423+
for u := range allRulesByUser {
424+
ctx := user.InjectOrgID(context.Background(), u)
425+
forEachRuler(func(_ string, r *Ruler) {
426+
rules, err := r.GetRules(ctx)
427+
require.NoError(t, err)
428+
require.Equal(t, len(allRulesByUser[u]), len(rules))
429+
if tc.sharding {
430+
mockPoolLClient := r.clientsPool.(*mockRulerClientsPool)
431+
432+
// Right now we are calling all rules even with shuffle sharding
433+
require.Equal(t, int32(len(rulerAddrMap)), mockPoolLClient.numberOfCalls.Load())
434+
mockPoolLClient.numberOfCalls.Store(0)
435+
}
436+
})
437+
}
438+
439+
totalLoadedRules := 0
440+
totalConfiguredRules := 0
441+
442+
forEachRuler(func(rID string, r *Ruler) {
443+
localRules, err := r.listRules(context.Background())
444+
require.NoError(t, err)
445+
for _, rules := range localRules {
446+
totalLoadedRules += len(rules)
447+
}
448+
totalConfiguredRules += len(allRulesByRuler[rID])
449+
})
450+
451+
if tc.sharding {
452+
require.Equal(t, totalConfiguredRules, totalLoadedRules)
453+
} else {
454+
// Not sharding means that all rules will be loaded on all rulers
455+
numberOfRulers := len(rulerAddrMap)
456+
require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules)
457+
}
458+
})
459+
}
460+
}
461+
255462
func TestSharding(t *testing.T) {
256463
const (
257464
user1 = "user1"
@@ -666,7 +873,7 @@ func TestSharding(t *testing.T) {
666873
DisabledTenants: tc.disabledUsers,
667874
}
668875

669-
r, cleanup := newRuler(t, cfg)
876+
r, cleanup := buildRuler(t, cfg, nil)
670877
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
671878
t.Cleanup(cleanup)
672879

@@ -814,6 +1021,16 @@ func TestDeleteTenantRuleGroups(t *testing.T) {
8141021
}
8151022
}
8161023

1024+
func generateTokenForGroups(groups []*rulespb.RuleGroupDesc, offset uint32) []uint32 {
1025+
var tokens []uint32
1026+
1027+
for _, g := range groups {
1028+
tokens = append(tokens, tokenForGroup(g)+offset)
1029+
}
1030+
1031+
return tokens
1032+
}
1033+
8171034
func callDeleteTenantAPI(t *testing.T, api *Ruler, userID string) {
8181035
ctx := user.InjectOrgID(context.Background(), userID)
8191036

0 commit comments

Comments
 (0)