Skip to content

Commit dbcc245

Browse files
committed
update store limit
Signed-off-by: okjiang <[email protected]>
1 parent 6173d50 commit dbcc245

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

server/cluster/cluster.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2423,6 +2423,7 @@ func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePer
24232423
log.Error("persist store limit meet error", errs.ZapError(err))
24242424
return err
24252425
}
2426+
c.refreshStoreRateLimit(storeID, typ)
24262427
log.Info("store limit changed", zap.Uint64("store-id", storeID), zap.String("type", typ.String()), zap.Float64("rate-per-min", ratePerMin))
24272428
return nil
24282429
}
@@ -2441,10 +2442,35 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64)
24412442
log.Error("persist store limit meet error", errs.ZapError(err))
24422443
return err
24432444
}
2445+
for storeID := range c.opt.GetAllStoresLimit() {
2446+
c.refreshStoreRateLimit(storeID, typ)
2447+
}
24442448
log.Info("all store limit changed", zap.String("type", typ.String()), zap.Float64("rate-per-min", ratePerMin))
24452449
return nil
24462450
}
24472451

2452+
// refreshStoreRateLimit applies the schedule config's store limit to the in-memory store limiter.
2453+
func (c *RaftCluster) refreshStoreRateLimit(storeID uint64, limitType storelimit.Type) {
2454+
// Only v1 uses StoreRateLimit for AddPeer/RemovePeer.
2455+
if c.opt.GetStoreLimitVersion() != storelimit.VersionV1 {
2456+
return
2457+
}
2458+
store := c.GetStore(storeID)
2459+
if store == nil {
2460+
return
2461+
}
2462+
limit, ok := store.GetStoreLimit().(*storelimit.StoreRateLimit)
2463+
if !ok {
2464+
return
2465+
}
2466+
// Schedule config stores the unit in rate-per-minute, but limiter uses rate-per-second.
2467+
const storeBalanceBaseTime = float64(60)
2468+
ratePerSec := c.opt.GetStoreLimitByType(storeID, limitType) / storeBalanceBaseTime
2469+
if limit.Rate(limitType) != ratePerSec {
2470+
c.ResetStoreLimit(storeID, limitType, ratePerSec)
2471+
}
2472+
}
2473+
24482474
// SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl.
24492475
func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) error {
24502476
return c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl)

server/cluster/cluster_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2923,6 +2923,40 @@ func TestCheckCache(t *testing.T) {
29232923
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol"))
29242924
}
29252925

2926+
func TestStoreLimitChangeRefreshLimiter(t *testing.T) {
2927+
re := require.New(t)
2928+
2929+
ctx, cancel := context.WithCancel(context.Background())
2930+
defer cancel()
2931+
2932+
_, opt, err := newTestScheduleConfig()
2933+
re.NoError(err)
2934+
// StoreRateLimit (v1) is used for AddPeer/RemovePeer and participates in scheduler filters.
2935+
opt.GetScheduleConfig().StoreLimitVersion = storelimit.VersionV1
2936+
2937+
rc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
2938+
2939+
storeID := uint64(1)
2940+
store := core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))
2941+
rc.PutStore(store)
2942+
2943+
// Simulate that the limiter has been set to an extremely low rate and already consumed,
2944+
// which would make scheduler filters throttle this store.
2945+
lowRatePerSec := float64(0.0001) / 60.0
2946+
rc.ResetStoreLimit(storeID, storelimit.AddPeer, lowRatePerSec)
2947+
store = rc.GetStore(storeID)
2948+
re.NotNil(store)
2949+
re.True(store.GetStoreLimit().Take(storelimit.RegionInfluence[storelimit.AddPeer], storelimit.AddPeer, constant.Low))
2950+
re.False(store.IsAvailable(storelimit.AddPeer, constant.Low))
2951+
2952+
// Increase store limit via config API. Without refreshing the in-memory limiter,
2953+
// the store would remain throttled and be filtered out.
2954+
re.NoError(rc.SetStoreLimit(storeID, storelimit.AddPeer, 30))
2955+
store = rc.GetStore(storeID)
2956+
re.NotNil(store)
2957+
re.True(store.IsAvailable(storelimit.AddPeer, constant.Low))
2958+
}
2959+
29262960
func TestPatrolRegionConcurrency(t *testing.T) {
29272961
re := require.New(t)
29282962

0 commit comments

Comments
 (0)