Skip to content

Commit 9fe5653

Browse files
authored
scatter:check operator exist before scatter (#10082)
close #10084 Signed-off-by: tongjian <1045931706@qq.com>
1 parent 553cee0 commit 9fe5653

File tree

5 files changed

+93
-20
lines changed

5 files changed

+93
-20
lines changed

pkg/schedule/operator/status_tracker.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,12 @@ func (o *Operator) SetAdditionalInfo(key string, value string) {
155155
o.additionalInfos.value[key] = value
156156
}
157157

158-
// GetAdditionalInfo returns additional info with key.
159-
func (o *Operator) GetAdditionalInfo(key string) string {
158+
// GetAdditionalInfo returns additional info value with key.
159+
func (o *Operator) GetAdditionalInfo(key string) (string, bool) {
160160
o.additionalInfos.RLock()
161161
defer o.additionalInfos.RUnlock()
162-
return o.additionalInfos.value[key]
162+
val, exist := o.additionalInfos.value[key]
163+
return val, exist
163164
}
164165

165166
// LogAdditionalInfo returns additional info with string
@@ -178,18 +179,23 @@ func (o *Operator) LogAdditionalInfo() string {
178179
// HasRelatedMergeRegion checks if the operator has a related merge region.
179180
// All merge operators (OpMerge and OpAffinity) have this info set.
180181
func (o *Operator) HasRelatedMergeRegion() bool {
182+
val, exist := o.GetAdditionalInfo(string(RelatedMergeRegion))
181183
if o == nil {
182184
return false
183185
}
184-
return o.GetAdditionalInfo(string(RelatedMergeRegion)) != ""
186+
return exist && val != ""
185187
}
186188

187189
// GetRelatedMergeRegion returns the related merge region ID.
188190
func (o *Operator) GetRelatedMergeRegion() uint64 {
189191
if !o.HasRelatedMergeRegion() {
190192
return 0
191193
}
192-
str := o.GetAdditionalInfo(string(RelatedMergeRegion))
194+
str, exist := o.GetAdditionalInfo(string(RelatedMergeRegion))
195+
if !exist {
196+
log.Debug("not found related merge region ID")
197+
return 0
198+
}
193199
relatedID, err := strconv.ParseUint(str, 10, 64)
194200
if err != nil {
195201
log.Warn("invalid related merge region ID",

pkg/schedule/operator/status_tracker_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ func TestAdditionalInfoConcurrent(t *testing.T) {
192192
key := fmt.Sprintf("key%d", i)
193193
value := fmt.Sprintf("value%d", i)
194194
op.SetAdditionalInfo(key, value)
195-
if op.GetAdditionalInfo(key) != value {
195+
val, ok := op.GetAdditionalInfo(key)
196+
if ok && val != value {
196197
t.Errorf("unexpected value for key %s", key)
197198
}
198199
}(i)

pkg/schedule/scatter/region_scatterer.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"math"
2121
"strconv"
22+
"strings"
2223
"sync"
2324
"time"
2425

@@ -44,8 +45,10 @@ import (
4445
const regionScatterName = "region-scatter"
4546

4647
var (
47-
gcInterval = time.Minute
48-
gcTTL = time.Minute * 3
48+
gcInterval = time.Minute
49+
gcTTL = time.Minute * 3
50+
operatorPriorityLevel = constant.High
51+
4952
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
5053
scatterSkipEmptyRegionCounter = scatterCounter.WithLabelValues("skip", "empty-region")
5154
scatterSkipNoRegionCounter = scatterCounter.WithLabelValues("skip", "no-region")
@@ -56,12 +59,15 @@ var (
5659
scatterUnnecessaryCounter = scatterCounter.WithLabelValues("unnecessary", "")
5760
scatterFailCounter = scatterCounter.WithLabelValues("fail", "")
5861
scatterSuccessCounter = scatterCounter.WithLabelValues("success", "")
62+
scatterOperatorRunningCounter = scatterCounter.WithLabelValues("skip", "running")
63+
scatterOperatorExistedCounter = scatterCounter.WithLabelValues("fail", "other-existed")
5964
)
6065

6166
const (
6267
maxSleepDuration = time.Minute
6368
initialSleepDuration = 100 * time.Millisecond
6469
maxRetryLimit = 30
70+
scatterOperatorDesc = "scatter-region"
6571
)
6672

6773
type selectedStores struct {
@@ -156,7 +162,7 @@ type engineContext struct {
156162

157163
func newEngineContext(ctx context.Context, filterFuncs ...filterFunc) engineContext {
158164
filterFuncs = append(filterFuncs, func() filter.Filter {
159-
return &filter.StoreStateFilter{ActionScope: regionScatterName, MoveRegion: true, ScatterRegion: true, OperatorLevel: constant.High}
165+
return &filter.StoreStateFilter{ActionScope: regionScatterName, MoveRegion: true, ScatterRegion: true, OperatorLevel: operatorPriorityLevel}
160166
})
161167
return engineContext{
162168
filterFuncs: filterFuncs,
@@ -287,6 +293,28 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipSto
287293
return nil, errors.Errorf("region %d is not fully replicated", region.GetID())
288294
}
289295

296+
// Check if there is any existing operator for the region.
297+
// if the exist operator level is higher than scatter operator level, give up to create new scatter operator new.
298+
// otherwise, create new scatter operator to replace the existing one.
299+
if op := r.opController.GetOperator(region.GetID()); op != nil && op.GetPriorityLevel() >= operatorPriorityLevel {
300+
val, exist := op.GetAdditionalInfo("group")
301+
// If the existing operator is created by the same group scatterer, just skip creating a new one.
302+
if strings.Contains(op.Desc(), scatterOperatorDesc) && exist && val == group {
303+
scatterOperatorRunningCounter.Inc()
304+
log.Debug("scatter operator is already running",
305+
zap.Uint64("region-id", region.GetID()))
306+
return nil, nil
307+
}
308+
scatterOperatorExistedCounter.Inc()
309+
log.Debug("the operator exist, but it does not meet requirement",
310+
zap.Uint64("region-id", region.GetID()),
311+
zap.String("additional-info-group", val),
312+
zap.String("operator-des", op.Desc()),
313+
zap.Bool("group-exist", exist),
314+
)
315+
return nil, errors.Errorf("the operator of region %d already exist", region.GetID())
316+
}
317+
290318
if region.GetLeader() == nil {
291319
scatterSkipNoLeaderCounter.Inc()
292320
log.Warn("region no leader during scatter", zap.Uint64("region-id", region.GetID()))
@@ -405,7 +433,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
405433
r.Put(targetPeers, targetLeader, group)
406434
return nil, nil
407435
}
408-
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader, skipStoreLimit)
436+
op, err := operator.CreateScatterRegionOperator(scatterOperatorDesc, r.cluster, region, targetPeers, targetLeader, skipStoreLimit)
409437
if err != nil {
410438
scatterFailCounter.Inc()
411439
for _, peer := range region.GetPeers() {
@@ -420,7 +448,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
420448
r.Put(targetPeers, targetLeader, group)
421449
op.SetAdditionalInfo("group", group)
422450
op.SetAdditionalInfo("leader-picked-count", strconv.FormatUint(leaderStorePickedCount, 10))
423-
op.SetPriorityLevel(constant.High)
451+
op.SetPriorityLevel(operatorPriorityLevel)
424452
}
425453
return op, nil
426454
}

pkg/schedule/scatter/region_scatterer_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/kvproto/pkg/metapb"
3131

3232
"github.com/tikv/pd/pkg/core"
33+
"github.com/tikv/pd/pkg/core/constant"
3334
"github.com/tikv/pd/pkg/core/storelimit"
3435
"github.com/tikv/pd/pkg/mock/mockcluster"
3536
"github.com/tikv/pd/pkg/mock/mockconfig"
@@ -701,7 +702,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
701702
re.NoError(err)
702703
re.False(isPeerCountChanged(op))
703704
if op != nil {
704-
re.Equal(group, op.GetAdditionalInfo("group"))
705+
val, exist := op.GetAdditionalInfo("group")
706+
re.True(exist)
707+
re.Equal(group, val)
705708
}
706709
}
707710
}
@@ -859,6 +862,31 @@ func TestRemoveStoreLimit(t *testing.T) {
859862
re.True(oc.AddOperator(op))
860863
}
861864
}
865+
866+
// same scatter operator should be skipped
867+
region := tc.GetRegion(2)
868+
op, err := scatterer.Scatter(region, "", true)
869+
re.NoError(err)
870+
re.Nil(op)
871+
872+
// different scatter operator should be added
873+
region = tc.GetRegion(3)
874+
op, err = scatterer.Scatter(region, "test", true)
875+
re.Error(err)
876+
re.Nil(op)
877+
878+
// exist lower operator
879+
regionID := uint64(5)
880+
op = oc.GetOperator(regionID)
881+
re.NotNil(op)
882+
re.True(oc.RemoveOperator(op))
883+
region = tc.GetRegion(regionID)
884+
op = operator.NewTestOperator(region.GetID(), region.GetRegionEpoch(), operator.OpRegion)
885+
op.SetPriorityLevel(constant.Low)
886+
re.True(oc.AddOperator(op))
887+
op, err = scatterer.Scatter(region, "", true)
888+
re.NoError(err)
889+
re.NotNil(op)
862890
}
863891

864892
func TestScatterWithAffinity(t *testing.T) {

pkg/schedule/schedulers/balance_range_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ import (
3232
"github.com/tikv/pd/pkg/utils/keyutil"
3333
)
3434

35+
func checkOperator(re *require.Assertions, op *operator.Operator, sourceScore string, targetScore string) {
36+
val, exist := op.GetAdditionalInfo("sourceScore")
37+
re.True(exist)
38+
re.Equal(sourceScore, val)
39+
val, exist = op.GetAdditionalInfo("targetScore")
40+
re.True(exist)
41+
re.Equal(targetScore, val)
42+
}
43+
3544
func TestPlacementRule(t *testing.T) {
3645
re := require.New(t)
3746
cancel, _, tc, oc := prepareSchedulersTest()
@@ -221,8 +230,7 @@ func TestTIKVEngine(t *testing.T) {
221230
ops, _ = scheduler.Schedule(tc, true)
222231
re.NotEmpty(ops)
223232
op := ops[0]
224-
re.Equal("3.00", op.GetAdditionalInfo("sourceScore"))
225-
re.Equal("0.00", op.GetAdditionalInfo("targetScore"))
233+
checkOperator(re, op, "3.00", "0.00")
226234
re.Contains(op.Brief(), "transfer leader: store 1 to 3")
227235

228236
// case2: move leader from store 1 to store 4
@@ -232,8 +240,7 @@ func TestTIKVEngine(t *testing.T) {
232240
ops, _ = scheduler.Schedule(tc, true)
233241
re.NotEmpty(ops)
234242
op = ops[0]
235-
re.Equal("3.00", op.GetAdditionalInfo("sourceScore"))
236-
re.Equal("0.00", op.GetAdditionalInfo("targetScore"))
243+
checkOperator(re, op, "3.00", "0.00")
237244
re.Contains(op.Brief(), "mv peer: store [1] to [4]")
238245
re.Equal("transfer leader from store 1 to store 4", op.Step(2).String())
239246
}
@@ -372,10 +379,13 @@ func TestTIFLASHEngine(t *testing.T) {
372379
ops, _ = scheduler.Schedule(tc, false)
373380
re.NotEmpty(ops)
374381
op := ops[0]
375-
re.Equal("3.00", op.GetAdditionalInfo("sourceScore"))
376-
re.Equal("0.00", op.GetAdditionalInfo("targetScore"))
377-
re.Equal("1.00", op.GetAdditionalInfo("sourceExpectScore"))
378-
re.Equal("1.00", op.GetAdditionalInfo("targetExpectScore"))
382+
checkOperator(re, op, "3.00", "0.00")
383+
sourceExpectScore, exist := op.GetAdditionalInfo("sourceExpectScore")
384+
re.True(exist)
385+
re.Equal("1.00", sourceExpectScore)
386+
targetExpectScore, exist := op.GetAdditionalInfo("targetExpectScore")
387+
re.True(exist)
388+
re.Equal("1.00", targetExpectScore)
379389
re.Contains(op.Brief(), "mv peer: store [4] to")
380390
}
381391

0 commit comments

Comments
 (0)