Skip to content

Commit 97187a7

Browse files
authored
feat(tikv): try to evict leaders before deleting stores (#6239)
1 parent 9430103 commit 97187a7

File tree

4 files changed

+106
-28
lines changed

4 files changed

+106
-28
lines changed

pkg/apis/label/label.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,10 @@ const (
111111
AnnoOwnerGeneration = "tidb.pingcap.com/owner-generation"
112112

113113
// AnnPVCScaleInTime is pvc scaled in time key used in PVC for e2e test only
114-
AnnPVCScaleInTime = "tidb.pingcap.com/scale-in-time"
114+
AnnPVCScaleInTime = AnnoScaleInTime
115+
116+
// AnnoScaleInTime is scaled in time
117+
AnnoScaleInTime = "tidb.pingcap.com/scale-in-time"
115118

116119
// AnnForceUpgradeVal is tc annotation value to indicate whether force upgrade should be done
117120
AnnForceUpgradeVal = "true"

pkg/manager/member/tikv_scaler.go

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ import (
3232
"k8s.io/klog/v2"
3333
)
3434

35+
const (
36+
defaultEvictLeaderTimeoutWhenScaleIn = time.Minute * 5
37+
)
38+
3539
type tikvScaler struct {
3640
generalScaler
3741
}
@@ -102,7 +106,7 @@ func (s *tikvScaler) scaleOutOne(tc *v1alpha1.TidbCluster, ordinal int32) error
102106
}
103107

104108
func (s *tikvScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
105-
scaleInTime := time.Now().Format(time.RFC3339)
109+
scaleInTime := time.Now()
106110
tc, ok := meta.(*v1alpha1.TidbCluster)
107111
if !ok {
108112
klog.Errorf("tikvScaler.ScaleIn: failed to convert cluster %s/%s, scale in will do nothing", meta.GetNamespace(), meta.GetName())
@@ -114,7 +118,7 @@ func (s *tikvScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newSe
114118
_, ordinals, replicas, deleteSlots := scaleMulti(oldSet, newSet, scaleInParallelism)
115119

116120
klog.Infof("scaling in tikv statefulset %s/%s, ordinals: %v (replicas: %d, delete slots: %v), scaleInParallelism: %v, scaleInTime: %v",
117-
oldSet.Namespace, oldSet.Name, ordinals, replicas, deleteSlots.List(), scaleInParallelism, scaleInTime)
121+
oldSet.Namespace, oldSet.Name, ordinals, replicas, deleteSlots.List(), scaleInParallelism, scaleInTime.Format(time.RFC3339))
118122

119123
var (
120124
upTikvStoreCount int
@@ -174,7 +178,7 @@ func (s *tikvScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newSe
174178
return errorutils.NewAggregate(errs)
175179
}
176180

177-
func (s *tikvScaler) scaleInOne(tc *v1alpha1.TidbCluster, skipPreCheck bool, upTikvStoreCount, deletedUpStoreCount, maxReplicas int, ordinal int32, scaleInTime string) (deletedUpStore int, err error) {
181+
func (s *tikvScaler) scaleInOne(tc *v1alpha1.TidbCluster, skipPreCheck bool, upTikvStoreCount, deletedUpStoreCount, maxReplicas int, ordinal int32, currentTime time.Time) (deletedUpStore int, err error) {
178182
tcName := tc.GetName()
179183
ns := tc.GetNamespace()
180184
podName := ordinalPodName(v1alpha1.TiKVMemberType, tcName, ordinal)
@@ -206,8 +210,47 @@ func (s *tikvScaler) scaleInOne(tc *v1alpha1.TidbCluster, skipPreCheck bool, upT
206210
if err != nil {
207211
return deletedUpStore, err
208212
}
209-
if state != v1alpha1.TiKVStateOffline {
210-
if err := controller.GetPDClient(s.deps.PDControl, tc).DeleteStore(id); err != nil {
213+
pdc := controller.GetPDClient(s.deps.PDControl, tc)
214+
215+
var startTime *time.Time
216+
startStr, ok := pod.Annotations[label.AnnoScaleInTime]
217+
if ok {
218+
t, err := time.Parse(time.RFC3339, startStr)
219+
if err != nil {
220+
klog.Warningf("tikvScaler.ScaleIn: cannot parse annotation %s in pod %s, cluster %s/%s: %v", label.AnnoScaleInTime, podName, ns, tcName, err)
221+
// use current time as startInTime
222+
startTime = &currentTime
223+
} else {
224+
startTime = &t
225+
}
226+
} else {
227+
startTime = &currentTime
228+
}
229+
if err := ensureScaleInTimeAnnoInPod(tc, pod, s.deps.PodControl, startTime.Format(time.RFC3339)); err != nil {
230+
return deletedUpStore, fmt.Errorf("cannot add annotation to pod: %w", err)
231+
}
232+
233+
var leaderEvictedOrTimeout bool
234+
if startTime.Add(defaultEvictLeaderTimeoutWhenScaleIn).Before(currentTime) {
235+
leaderEvictedOrTimeout = true
236+
}
237+
if store.LeaderCount == 0 {
238+
leaderEvictedOrTimeout = true
239+
}
240+
if !leaderEvictedOrTimeout {
241+
schedulerMap, err := pdc.GetEvictLeaderSchedulersForStores(id)
242+
if err != nil {
243+
return deletedUpStore, fmt.Errorf("cannot get scheduler of store %v: %w", id, err)
244+
}
245+
if _, ok := schedulerMap[id]; !ok {
246+
if err := pdc.BeginEvictLeader(id); err != nil {
247+
return deletedUpStore, fmt.Errorf("cannot evict leaders of store %v: %w", id, err)
248+
}
249+
}
250+
}
251+
252+
if state != v1alpha1.TiKVStateOffline && leaderEvictedOrTimeout {
253+
if err := pdc.DeleteStore(id); err != nil {
211254
klog.Errorf("tikvScaler.ScaleIn: failed to delete store %d, %v", id, err)
212255
return deletedUpStore, err
213256
}
@@ -217,7 +260,6 @@ func (s *tikvScaler) scaleInOne(tc *v1alpha1.TidbCluster, skipPreCheck bool, upT
217260
}
218261
}
219262
return deletedUpStore, controller.RequeueErrorf("TiKV %s/%s store %d is still in cluster, state: %s", ns, podName, id, state)
220-
221263
}
222264
}
223265

@@ -245,7 +287,7 @@ func (s *tikvScaler) scaleInOne(tc *v1alpha1.TidbCluster, skipPreCheck bool, upT
245287
return deletedUpStore, fmt.Errorf("tikvScaler.ScaleIn: failed to get pvcs for pod %s/%s in tc %s/%s, error: %s", ns, pod.Name, ns, tcName, err)
246288
}
247289
for _, pvc := range pvcs {
248-
if err := addDeferDeletingAnnoToPVC(tc, pvc, s.deps.PVCControl, scaleInTime); err != nil {
290+
if err := addDeferDeletingAnnoToPVC(tc, pvc, s.deps.PVCControl, currentTime.Format(time.RFC3339)); err != nil {
249291
return deletedUpStore, err
250292
}
251293
}

pkg/manager/member/tikv_scaler_test.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
corev1 "k8s.io/api/core/v1"
3333
"k8s.io/apimachinery/pkg/api/errors"
3434
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/runtime"
3536
errorutils "k8s.io/apimachinery/pkg/util/errors"
3637
"k8s.io/apimachinery/pkg/util/sets"
3738
"k8s.io/client-go/tools/cache"
@@ -1073,7 +1074,6 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
10731074
pod.Labels[label.StoreIDLabelKey] = s.storeIdLabel
10741075
}
10751076
podIndexer.Add(pod)
1076-
10771077
}
10781078

10791079
for _, s := range test.pods {
@@ -1189,7 +1189,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
11891189
storeIdLabel: "13",
11901190
}},
11911191
scaleInParallelism: 1,
1192-
}, {
1192+
},
1193+
{
11931194
name: "1 scaleInParallelism, store state is tombstone, update pvc failed",
11941195
tikvUpgrading: false,
11951196
storeFun: multiTombstoneStoreFun,
@@ -1213,7 +1214,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
12131214
storeIdLabel: "13",
12141215
}},
12151216
scaleInParallelism: 1,
1216-
}, {
1217+
},
1218+
{
12171219
name: "1 scaleInParallelism, store state is tombstone",
12181220
tikvUpgrading: false,
12191221
storeFun: multiTombstoneStoreFun,
@@ -1237,7 +1239,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
12371239
storeIdLabel: "13",
12381240
}},
12391241
scaleInParallelism: 1,
1240-
}, {
1242+
},
1243+
{
12411244
name: "2 scaleInParallelism, store is up, delete store failed",
12421245
tikvUpgrading: false,
12431246
storeFun: normalStoreFun,
@@ -1261,7 +1264,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
12611264
storeIdLabel: "13",
12621265
}},
12631266
scaleInParallelism: 2,
1264-
}, {
1267+
},
1268+
{
12651269
name: "2 scaleInParallelism, store is up",
12661270
tikvUpgrading: false,
12671271
storeFun: normalStoreFun,
@@ -1285,7 +1289,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
12851289
storeIdLabel: "13",
12861290
}},
12871291
scaleInParallelism: 2,
1288-
}, {
1292+
},
1293+
{
12891294
name: "2 scaleInParallelism, store state is tombstone",
12901295
tikvUpgrading: false,
12911296
storeFun: multiTombstoneStoreFun,
@@ -1309,7 +1314,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
13091314
storeIdLabel: "13",
13101315
}},
13111316
scaleInParallelism: 2,
1312-
}, {
1317+
},
1318+
{
13131319
name: "3 scaleInParallelism, store state is tombstone, scaleInParallelism is bigger than needed",
13141320
tikvUpgrading: false,
13151321
storeFun: multiTombstoneStoreFun,
@@ -1353,7 +1359,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
13531359
}
13541360
g.Expect(len(tc.Status.TiKV.TombstoneStores)).To(Equal(2))
13551361
},
1356-
}, {
1362+
},
1363+
{
13571364
name: "2 scaleInParallelism, store state is tombstone, scaleInParallelism is smaller than needed",
13581365
tikvUpgrading: false,
13591366
storeFun: func(tc *v1alpha1.TidbCluster) {
@@ -1411,7 +1418,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
14111418
errExpectNil(g, err)
14121419
g.Expect(int(*newSet.Spec.Replicas)).To(Equal(3))
14131420
},
1414-
}, {
1421+
},
1422+
{
14151423
name: "2 scaleInParallelism, able to scale in simultaneously while is upgrading",
14161424
tikvUpgrading: true,
14171425
storeFun: multiTombstoneStoreFun,
@@ -1435,7 +1443,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
14351443
storeIdLabel: "13",
14361444
}},
14371445
scaleInParallelism: 2,
1438-
}, {
1446+
},
1447+
{
14391448
name: "2 maxScaleInReplica, tikv pod is not ready now, not sure if the status has been synced",
14401449
tikvUpgrading: false,
14411450
storeFun: func(tc *v1alpha1.TidbCluster) {
@@ -1462,7 +1471,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
14621471
storeIdLabel: "13",
14631472
}},
14641473
scaleInParallelism: 2,
1465-
}, {
1474+
},
1475+
{
14661476
name: "2 maxScaleInReplica, tikv pod is not ready now, make sure if the status has been synced",
14671477
tikvUpgrading: false,
14681478
storeFun: func(tc *v1alpha1.TidbCluster) {
@@ -1489,7 +1499,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
14891499
storeIdLabel: "13",
14901500
}},
14911501
scaleInParallelism: 2,
1492-
}, {
1502+
},
1503+
{
14931504
name: "2 maxScaleInReplica, store state is tombstone, don't have pvc",
14941505
tikvUpgrading: false,
14951506
storeFun: multiTombstoneStoreFun,
@@ -1513,7 +1524,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
15131524
storeIdLabel: "13",
15141525
}},
15151526
scaleInParallelism: 2,
1516-
}, {
1527+
},
1528+
{
15171529
name: "2 maxScaleInReplica, 4 up stores, scale in TiKV simultaneously works but only scales one",
15181530
tikvUpgrading: false,
15191531
storeFun: func(tc *v1alpha1.TidbCluster) {
@@ -1578,7 +1590,8 @@ func TestTiKVScalerScaleInSimultaneously(t *testing.T) {
15781590
}
15791591
g.Expect(len(tc.Status.TiKV.TombstoneStores)).To(Equal(1))
15801592
},
1581-
}, {
1593+
},
1594+
{
15821595
name: "2 maxScaleInReplica, 5 up stores with tiflash store, scale in TiKV simultaneously works but only scales one",
15831596
tikvUpgrading: false,
15841597
storeFun: func(tc *v1alpha1.TidbCluster) {
@@ -2060,7 +2073,11 @@ func newFakeTiKVScaler(resyncDuration ...time.Duration) (*tikvScaler, *pdapi.Fak
20602073
if len(resyncDuration) > 0 {
20612074
fakeDeps.CLIConfig.ResyncDuration = resyncDuration[0]
20622075
}
2063-
fakeDeps.PodControl = &podCtlMock{} // So that UpdateMetaInfo is no-op instead of changing labels.
2076+
fakeDeps.PodControl = &podCtlMock{
2077+
updatePod: func(_ runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) {
2078+
return pod, nil
2079+
},
2080+
} // So that UpdateMetaInfo is no-op instead of changing labels.
20642081
pvcIndexer := fakeDeps.KubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer()
20652082
podIndexer := fakeDeps.KubeInformerFactory.Core().V1().Pods().Informer().GetIndexer()
20662083
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)

pkg/manager/member/utils.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,9 @@ const (
5353
ErrImagePull = "ErrImagePull"
5454
)
5555

56-
var (
57-
// The first version that moves the rocksdb info and raft info log to store and rotate as the TiKV log is v5.0.0
58-
// https://github.com/tikv/tikv/pull/7358
59-
tikvLessThanV500, _ = semver.NewConstraint("<v5.0.0-0")
60-
)
56+
// The first version that moves the rocksdb info and raft info log to store and rotate as the TiKV log is v5.0.0
57+
// https://github.com/tikv/tikv/pull/7358
58+
var tikvLessThanV500, _ = semver.NewConstraint("<v5.0.0-0")
6159

6260
func annotationsMountVolume() (corev1.VolumeMount, corev1.Volume) {
6361
m := corev1.VolumeMount{Name: "annotations", ReadOnly: true, MountPath: "/etc/podinfo"}
@@ -460,6 +458,24 @@ func addDeferDeletingAnnoToPVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentV
460458
return nil
461459
}
462460

461+
// ensureScaleInTimeAnnoInPod set scale in time to pod annotation
462+
func ensureScaleInTimeAnnoInPod(tc *v1alpha1.TidbCluster, pod *corev1.Pod, podControl controller.PodControlInterface, scaleInTime string) error {
463+
val, ok := pod.Annotations[label.AnnoScaleInTime]
464+
if ok && val == scaleInTime {
465+
return nil
466+
}
467+
if pod.Annotations == nil {
468+
pod.Annotations = map[string]string{}
469+
}
470+
pod.Annotations[label.AnnoScaleInTime] = scaleInTime
471+
if _, err := podControl.UpdatePod(tc, pod); err != nil {
472+
klog.Errorf("failed to set pod %s/%s annotation %q to %q", tc.Namespace, pod.Name, label.AnnoScaleInTime, scaleInTime)
473+
return err
474+
}
475+
klog.Infof("set pod %s/%s annotation %q to %q successfully", tc.Namespace, pod.Name, label.AnnoScaleInTime, scaleInTime)
476+
return nil
477+
}
478+
463479
// GetPVCSelectorForPod compose a PVC selector from a tc/dm-cluster member pod at ordinal position
464480
func GetPVCSelectorForPod(controller runtime.Object, memberType v1alpha1.MemberType, ordinal int32) (labels.Selector, error) {
465481
meta := controller.(metav1.Object)

0 commit comments

Comments
 (0)