Skip to content

Commit ad1f072

Browse files
init commit
Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com>
1 parent 9d6bbb8 commit ad1f072

File tree

8 files changed

+359
-4
lines changed

8 files changed

+359
-4
lines changed

pkg/keyspace/gc.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Copyright 2023 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package keyspace
16+
17+
import (
18+
"context"
19+
"sync"
20+
"time"
21+
22+
"github.com/pingcap/failpoint"
23+
"github.com/pingcap/kvproto/pkg/keyspacepb"
24+
"github.com/pingcap/log"
25+
"github.com/tikv/pd/pkg/member"
26+
"github.com/tikv/pd/pkg/storage/kv"
27+
"go.uber.org/zap"
28+
)
29+
30+
// Some common per-loop config.
31+
const (
32+
// gcPerLoopTimeout specifies timeout for a single loop, it must be less than minKeyspaceGCInterval (10m).
33+
gcPerLoopTimeout = 5 * time.Minute
34+
// gcKeyspaceBatch specifies batch size when fetching keyspaces.
35+
gcKeyspaceBatch = 100
36+
)
37+
38+
// gcWorker is used to clean up keyspace related information.
39+
type gcWorker struct {
40+
sync.RWMutex
41+
manager *Manager
42+
member *member.EmbeddedEtcdMember
43+
44+
nextKeyspaceID uint32
45+
ticker *time.Ticker
46+
// gcLifeTime specifies how long should we keep archived keyspace.
47+
gcLifeTime time.Duration
48+
}
49+
50+
// newGCWorker returns a newGCWorker.
51+
func (manager *Manager) newGCWorker() *gcWorker {
52+
dummyTicker := time.NewTicker(time.Hour)
53+
dummyTicker.Stop()
54+
55+
worker := &gcWorker{
56+
manager: manager,
57+
member: manager.member,
58+
ticker: dummyTicker, // A dummy ticker, real ticker will be setup by reload.
59+
gcLifeTime: manager.config.GetGCLifeTime(),
60+
}
61+
return worker
62+
}
63+
64+
// run starts the main loop of the gc worker.
65+
func (worker *gcWorker) run() {
66+
for {
67+
select {
68+
// If manager's context done, stop the loop completely.
69+
case <-worker.manager.ctx.Done():
70+
worker.ticker.Stop()
71+
log.Info("[keyspace] gc loop stopped due to context cancel")
72+
return
73+
case now := <-worker.ticker.C:
74+
if !worker.member.IsLeader() {
75+
// If server currently not leader, stop the ticker and don't do gc,
76+
// reload will be called and reset the ticker when this server is elected.
77+
worker.ticker.Stop()
78+
log.Info("[keyspace] gc loop skipped, server is not leader")
79+
continue
80+
}
81+
// If a keyspace archived before safePoint, we should clean up the keyspace.
82+
worker.RLock()
83+
safePoint := now.Add(-worker.gcLifeTime).Unix()
84+
worker.RUnlock()
85+
log.Info("[keyspace] starting gc")
86+
worker.nextKeyspaceID = worker.scanKeyspacesAndDoGC(worker.manager.ctx, safePoint)
87+
}
88+
}
89+
}
90+
91+
func (worker *gcWorker) reload(cfg Config) {
92+
worker.ticker.Stop()
93+
worker.Lock()
94+
defer worker.Unlock()
95+
96+
if !cfg.ToGCKeyspace() {
97+
log.Info("[keyspace] gc disabled")
98+
return
99+
}
100+
// Set the worker's gc lifetime and run duration
101+
worker.gcLifeTime = cfg.GetGCLifeTime()
102+
worker.ticker.Reset(cfg.GetGCRunInterval())
103+
log.Info("[keyspace] gc config reloaded",
104+
zap.Duration("gc lifetime", worker.gcLifeTime),
105+
zap.Duration("run interval", cfg.GetGCRunInterval()),
106+
)
107+
}
108+
109+
// scanKeyspacesAndDoGC scans all current keyspaces and attempts to do one round of gc within gcPerLoopTimeout.
110+
// It starts with nextKeyspaceID, and return the last garbage collected keyspace's id + 1 as the starting id
111+
// for the next round.
112+
func (worker *gcWorker) scanKeyspacesAndDoGC(ctx context.Context, safePoint int64) uint32 {
113+
ctx, cancel := context.WithTimeout(ctx, gcPerLoopTimeout)
114+
defer cancel()
115+
nextID := worker.nextKeyspaceID
116+
manager := worker.manager
117+
var (
118+
batch []*keyspacepb.KeyspaceMeta
119+
err error
120+
)
121+
// Scan all keyspaces.
122+
for {
123+
select {
124+
case <-ctx.Done():
125+
log.Info("[keyspace] stopping gc loop due to context cancel")
126+
return nextID
127+
default:
128+
}
129+
130+
err = manager.store.RunInTxn(ctx, func(txn kv.Txn) error {
131+
if batch, err = manager.store.LoadRangeKeyspace(txn, nextID, gcKeyspaceBatch); err != nil {
132+
return err
133+
}
134+
return nil
135+
})
136+
if err != nil {
137+
log.Error("[keyspace] stopping gc loop, failed to fetch keyspace meta", zap.Error(err))
138+
return nextID
139+
}
140+
if len(batch) == 0 {
141+
// Return here directly and wait for the next tick instead of redo scanning from the beginning,
142+
// this could prevent scanning keyspace at high frequency when keyspace count is low.
143+
return 0
144+
}
145+
for i, meta := range batch {
146+
if canGC(meta, safePoint) {
147+
worker.gcKeyspace(ctx, meta)
148+
}
149+
if i == len(batch)-1 {
150+
nextID = meta.GetId() + 1
151+
}
152+
}
153+
}
154+
}
155+
156+
// gcKeyspace gc one keyspace related information.
157+
// It will be tried again in the next scan if it fails.
158+
func (worker *gcWorker) gcKeyspace(ctx context.Context, meta *keyspacepb.KeyspaceMeta) {
159+
select {
160+
case <-ctx.Done():
161+
log.Info("[keyspace] skipping gc due to context cancel",
162+
zap.Uint32("ID", meta.GetId()), zap.String("name", meta.GetName()))
163+
return
164+
default:
165+
}
166+
167+
log.Info("[keyspace] start cleaning keyspace meta data",
168+
zap.String("name", meta.GetName()),
169+
zap.Uint32("ID", meta.GetId()),
170+
zap.String("state", meta.GetState().String()),
171+
zap.Int64("last state change", meta.GetStateChangedAt()),
172+
)
173+
174+
// Following section should be idempotent:
175+
// TODO: Clean TiKV range.
176+
// TODO: Clean TiFlash placement rules.
177+
// TODO: Clean Region Label rules.
178+
// TODO: Clean keyspace related etcd paths.
179+
// And only when all of the above succeeded:
180+
// TODO: Set keyspace state to TOMBSTONE
181+
182+
// Inject a failpoint to test cleaning framework during test.
183+
failpoint.Inject("doGC", func() {
184+
_, err := worker.manager.UpdateKeyspaceState(meta.GetName(), keyspacepb.KeyspaceState_TOMBSTONE, time.Now().Unix())
185+
if err != nil {
186+
log.Warn("[keyspace] fail to tombstone keyspace when doing gc")
187+
}
188+
})
189+
}

pkg/keyspace/keyspace.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strconv"
2121
"time"
2222

23+
"github.com/tikv/pd/pkg/member"
2324
"go.uber.org/zap"
2425

2526
"github.com/pingcap/errors"
@@ -70,6 +71,9 @@ type Config interface {
7071
ToWaitRegionSplit() bool
7172
GetWaitRegionSplitTimeout() time.Duration
7273
GetCheckRegionSplitInterval() time.Duration
74+
ToGCKeyspace() bool
75+
GetGCRunInterval() time.Duration
76+
GetGCLifeTime() time.Duration
7377
}
7478

7579
// Manager manages keyspace related data.
@@ -85,12 +89,16 @@ type Manager struct {
8589
store endpoint.KeyspaceStorage
8690
// rc is the raft cluster of the server.
8791
cluster core.ClusterInformer
92+
// member is the current pd's member information, used to check if server is leader.
93+
member *member.EmbeddedEtcdMember
8894
// config is the configurations of the manager.
8995
config Config
9096
// kgm is the keyspace group manager of the server.
9197
kgm *GroupManager
9298
// nextPatrolStartID is the next start id of keyspace assignment patrol.
9399
nextPatrolStartID uint32
100+
// gcWorker is used to clean up archived keyspace.
101+
gcWorker *gcWorker
94102
}
95103

96104
// CreateKeyspaceRequest represents necessary arguments to create a keyspace.
@@ -110,11 +118,12 @@ func NewKeyspaceManager(
110118
ctx context.Context,
111119
store endpoint.KeyspaceStorage,
112120
cluster core.ClusterInformer,
121+
member *member.EmbeddedEtcdMember,
113122
idAllocator id.Allocator,
114123
config Config,
115124
kgm *GroupManager,
116125
) *Manager {
117-
return &Manager{
126+
manager := &Manager{
118127
ctx: ctx,
119128
// Remove the lock of the given key from the lock group when unlock to
120129
// keep minimal working set, which is suited for low qps, non-time-critical
@@ -125,10 +134,13 @@ func NewKeyspaceManager(
125134
idAllocator: idAllocator,
126135
store: store,
127136
cluster: cluster,
137+
member: member,
128138
config: config,
129139
kgm: kgm,
130140
nextPatrolStartID: constant.DefaultKeyspaceID,
131141
}
142+
manager.gcWorker = manager.newGCWorker()
143+
return manager
132144
}
133145

134146
// Bootstrap saves default keyspace info.
@@ -182,12 +194,15 @@ func (manager *Manager) Bootstrap() error {
182194
return err
183195
}
184196
}
197+
// start gc loop.
198+
go manager.gcWorker.run()
185199
return nil
186200
}
187201

188202
// UpdateConfig update keyspace manager's config.
189203
func (manager *Manager) UpdateConfig(cfg Config) {
190204
manager.config = cfg
205+
manager.gcWorker.reload(cfg)
191206
}
192207

193208
// CreateKeyspace create a keyspace meta with given config and save it to storage.

pkg/keyspace/keyspace_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ type mockConfig struct {
5959
WaitRegionSplit bool
6060
WaitRegionSplitTimeout typeutil.Duration
6161
CheckRegionSplitInterval typeutil.Duration
62+
GCKeyspace bool
63+
GCRunInterval time.Duration
64+
GCLifeTime time.Duration
6265
}
6366

6467
func (m *mockConfig) GetPreAlloc() []string {
@@ -77,13 +80,25 @@ func (m *mockConfig) GetCheckRegionSplitInterval() time.Duration {
7780
return m.CheckRegionSplitInterval.Duration
7881
}
7982

83+
func (m *mockConfig) ToGCKeyspace() bool {
84+
return m.GCKeyspace
85+
}
86+
87+
func (m *mockConfig) GetGCRunInterval() time.Duration {
88+
return m.GCRunInterval
89+
}
90+
91+
func (m *mockConfig) GetGCLifeTime() time.Duration {
92+
return m.GCLifeTime
93+
}
94+
8095
func (suite *keyspaceTestSuite) SetupTest() {
8196
re := suite.Require()
8297
suite.ctx, suite.cancel = context.WithCancel(context.Background())
8398
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
8499
allocator := mockid.NewIDAllocator()
85100
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil)
86-
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
101+
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, nil, allocator, &mockConfig{}, kgm)
87102
re.NoError(kgm.Bootstrap(suite.ctx))
88103
re.NoError(suite.manager.Bootstrap())
89104
}

pkg/keyspace/tso_keyspace_group_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
5252
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil)
5353
idAllocator := mockid.NewIDAllocator()
5454
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
55-
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
55+
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, nil, idAllocator, &mockConfig{}, suite.kgm)
5656
re.NoError(suite.kgm.Bootstrap(suite.ctx))
5757
}
5858

pkg/keyspace/util.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,11 @@ func (hp *indexedHeap) Remove(id uint32) *endpoint.KeyspaceGroup {
252252
}
253253
return nil
254254
}
255+
256+
// canGC checks keyspace's state and stateChangedAt against safePoint to determine if we can safely gc it.
257+
func canGC(meta *keyspacepb.KeyspaceMeta, safePoint int64) bool {
258+
if meta.GetState() != keyspacepb.KeyspaceState_ARCHIVED {
259+
return false
260+
}
261+
return meta.GetStateChangedAt() < safePoint
262+
}

0 commit comments

Comments
 (0)