Skip to content

Commit 49dcd72

Browse files
authored
[close #421] br: Split for smaller batch of keys (#422)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent fa80bee commit 49dcd72

File tree

14 files changed

+198
-80
lines changed

14 files changed

+198
-80
lines changed

.github/config/br_pd.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22
[replication]
33
enable-placement-rules = true
44
max-replicas = 1
5+
6+
[schedule]
7+
max-merge-region-size = 2
8+
max-merge-region-keys = 40000
9+
split-merge-interval = "10s"

.github/config/br_rawkv.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,18 @@ pd-heartbeat-tick-interval = "2s"
77
pd-store-heartbeat-tick-interval = "5s"
88
split-region-check-tick-interval = "1s"
99

10+
# 10000000 keys & ~480MB data size in integration tests
11+
[coprocessor]
12+
region-split-size = "10MiB"
13+
region-split-keys = 200000
14+
batch-split-limit = 100
15+
1016
[rocksdb]
1117
max-open-files = 10000
1218

1319
[raftdb]
1420
max-open-files = 10000
1521

1622
[storage]
23+
reserve-space = "0MiB"
1724
enable-ttl = true

br/Makefile

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,18 @@ PACKAGES := go list ./...
2424
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'
2525

2626
# build & test
27-
BR_BIN_PATH ?= bin/tikv-br
28-
TEST_BIN_PATH ?= bin/tikv-br.test
29-
COVERAGE_DIR ?= build
30-
TEST_PARALLEL ?= 8
31-
PD_ADDR ?= 127.0.0.1:2379
32-
BR_LOCAL_STORE ?= /tmp/backup_restore_test
33-
API_VERSION ?= 1
34-
CLUSTER_VERSION ?= nightly
35-
TLS_CA ?=
36-
TLS_CERT ?=
37-
TLS_KEY ?=
27+
BR_BIN_PATH ?= bin/tikv-br
28+
TEST_BIN_PATH ?= bin/tikv-br.test
29+
COVERAGE_DIR ?= build
30+
TEST_PARALLEL ?= 8
31+
PD_ADDR ?= 127.0.0.1:2379
32+
SPLIT_REGION_MAX_KEYS ?= 4
33+
BR_LOCAL_STORE ?= /tmp/backup_restore_test
34+
API_VERSION ?= 1
35+
CLUSTER_VERSION ?= nightly
36+
TLS_CA ?=
37+
TLS_CERT ?=
38+
TLS_KEY ?=
3839

3940
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)"
4041
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
@@ -71,6 +72,7 @@ test/integration: build/br-test build/rawkv-integration-test
7172
mkdir -p $(COVERAGE_DIR)
7273
./bin/rawkv_test --pd=${PD_ADDR} \
7374
--br='${TEST_BIN_PATH}' \
75+
--split-region-max-keys=${SPLIT_REGION_MAX_KEYS} \
7476
--br-storage=${BR_LOCAL_STORE} \
7577
--api-version=${API_VERSION} \
7678
--cluster-version=${CLUSTER_VERSION} \

br/pkg/restore/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type Client struct {
4444
workerPool *utils.WorkerPool
4545
tlsConf *tls.Config
4646
keepaliveConf keepalive.ClientParameters
47+
spliterConf SpliterConfig
4748

4849
backupMeta *backuppb.BackupMeta
4950
dstAPIVersion kvrpcpb.APIVersion
@@ -64,6 +65,7 @@ func NewRestoreClient(
6465
pdClient pd.Client,
6566
tlsConf *tls.Config,
6667
keepaliveConf keepalive.ClientParameters,
68+
spliterConf SpliterConfig,
6769
isRawKv bool,
6870
) (*Client, error) {
6971
apiVerion, err := conn.GetTiKVApiVersion(context.Background(), pdClient, tlsConf)
@@ -72,9 +74,10 @@ func NewRestoreClient(
7274
}
7375
return &Client{
7476
pdClient: pdClient,
75-
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
77+
toolClient: NewSplitClient(pdClient, tlsConf, spliterConf, isRawKv),
7678
tlsConf: tlsConf,
7779
keepaliveConf: keepaliveConf,
80+
spliterConf: spliterConf,
7881
switchCh: make(chan struct{}),
7982
dstAPIVersion: apiVerion,
8083
}, nil
@@ -132,7 +135,7 @@ func (rc *Client) InitBackupMeta(
132135
}
133136
rc.backupMeta = backupMeta
134137

135-
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
138+
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.spliterConf, rc.backupMeta.IsRawKv)
136139
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
137140
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
138141
rc.backupMeta.ApiVersion)

br/pkg/restore/client_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{
2323
Timeout: 10 * time.Second,
2424
}
2525

26+
var defaultSpliterCfg = SpliterConfig{
27+
GRPCMaxRecvMsgSize: 1024 * 1024,
28+
SplitRegionMaxKeys: 16,
29+
}
30+
2631
type fakePDClient struct {
2732
pd.Client
2833
stores []*metapb.Store
@@ -106,7 +111,7 @@ func TestSetSpeedLimit(t *testing.T) {
106111
// 1. The cost of concurrent communication is expected to be less than the cost of serial communication.
107112
client, err := NewRestoreClient(fakePDClient{
108113
stores: mockStores,
109-
}, nil, defaultKeepaliveCfg, true)
114+
}, nil, defaultKeepaliveCfg, defaultSpliterCfg, true)
110115
require.NoError(t, err)
111116
client.fileImporter = NewFileImporter(nil, FakeImporterClient{}, nil, true, kvrpcpb.APIVersion_V2)
112117
ctx := context.Background()

br/pkg/restore/split.go

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,23 @@ var (
5353
ScanRegionAttemptTimes = 30
5454
)
5555

56+
// SpliterConfig is the config for region split.
57+
type SpliterConfig struct {
58+
GRPCMaxRecvMsgSize int
59+
SplitRegionMaxKeys int
60+
}
61+
5662
// RegionSplitter is a executor of region split by rules.
5763
type RegionSplitter struct {
5864
client SplitClient
65+
conf SpliterConfig
5966
}
6067

6168
// NewRegionSplitter returns a new RegionSplitter.
62-
func NewRegionSplitter(client SplitClient) *RegionSplitter {
69+
func NewRegionSplitter(client SplitClient, conf SpliterConfig) *RegionSplitter {
6370
return &RegionSplitter{
6471
client: client,
72+
conf: conf,
6573
}
6674
}
6775

@@ -122,46 +130,68 @@ SplitRegions:
122130
for _, region := range regions {
123131
regionMap[region.Region.GetId()] = region
124132
}
125-
for regionID, keys := range splitKeyMap {
126-
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
127-
var newRegions []*RegionInfo
128-
region := regionMap[regionID]
129-
log.Info("split regions",
130-
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
131-
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
132-
if errSplit != nil {
133-
if strings.Contains(errSplit.Error(), "no valid key") {
134-
for _, key := range keys {
135-
// Region start/end keys are encoded. split_region RPC
136-
// requires raw keys (without encoding).
137-
log.Error("split regions no valid key",
138-
logutil.Key("startKey", region.Region.StartKey),
139-
logutil.Key("endKey", region.Region.EndKey),
140-
logutil.Key("key", codec.EncodeBytes(nil, key)),
141-
rtree.ZapRanges(ranges))
133+
for regionID, regionKeys := range splitKeyMap {
134+
log.Info("get split keys for region", zap.Int("len", len(regionKeys)), zap.Uint64("region", regionID))
135+
136+
for i := 0; i < len(regionKeys); i += rs.conf.SplitRegionMaxKeys {
137+
end := i + rs.conf.SplitRegionMaxKeys
138+
if end > len(regionKeys) {
139+
end = len(regionKeys)
140+
}
141+
keys := regionKeys[i:end]
142+
143+
var region *RegionInfo
144+
if i == 0 {
145+
region = regionMap[regionID]
146+
} else {
147+
encodedKey := codec.EncodeBytes(nil, keys[0])
148+
var errGetRegion error
149+
region, errGetRegion = rs.client.GetRegion(ctx, encodedKey)
150+
if errGetRegion != nil {
151+
time.Sleep(interval)
152+
log.Warn("get region failed, retry", logutil.Key("encodedKey", encodedKey), zap.Error(errGetRegion))
153+
continue SplitRegions
142154
}
143-
return errors.Trace(errSplit)
144155
}
145-
interval = 2 * interval
146-
if interval > SplitMaxRetryInterval {
147-
interval = SplitMaxRetryInterval
156+
157+
log.Info("split regions",
158+
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
159+
var newRegions []*RegionInfo
160+
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
161+
if errSplit != nil {
162+
if strings.Contains(errSplit.Error(), "no valid key") {
163+
for _, key := range keys {
164+
// Region start/end keys are encoded. split_region RPC
165+
// requires raw keys (without encoding).
166+
log.Error("split regions no valid key",
167+
logutil.Key("startKey", region.Region.StartKey),
168+
logutil.Key("endKey", region.Region.EndKey),
169+
logutil.Key("key", codec.EncodeBytes(nil, key)),
170+
rtree.ZapRanges(ranges))
171+
}
172+
return errors.Trace(errSplit)
173+
}
174+
interval = 2 * interval
175+
if interval > SplitMaxRetryInterval {
176+
interval = SplitMaxRetryInterval
177+
}
178+
time.Sleep(interval)
179+
log.Warn("split regions failed, retry",
180+
zap.Error(errSplit),
181+
logutil.Region(region.Region),
182+
logutil.Leader(region.Leader),
183+
logutil.Keys(keys), rtree.ZapRanges(ranges))
184+
continue SplitRegions
148185
}
149-
time.Sleep(interval)
150-
log.Warn("split regions failed, retry",
151-
zap.Error(errSplit),
152-
logutil.Region(region.Region),
153-
logutil.Leader(region.Leader),
154-
logutil.Keys(keys), rtree.ZapRanges(ranges))
155-
continue SplitRegions
156-
}
157-
log.Info("scattered regions", zap.Int("count", len(newRegions)))
158-
if len(newRegions) != len(keys) {
159-
log.Warn("split key count and new region count mismatch",
160-
zap.Int("new region count", len(newRegions)),
161-
zap.Int("split key count", len(keys)))
186+
log.Info("scattered regions", zap.Int("count", len(newRegions)))
187+
if len(newRegions) != len(keys) {
188+
log.Warn("split key count and new region count mismatch",
189+
zap.Int("new region count", len(newRegions)),
190+
zap.Int("split key count", len(keys)))
191+
}
192+
scatterRegions = append(scatterRegions, newRegions...)
193+
onSplit(keys)
162194
}
163-
scatterRegions = append(scatterRegions, newRegions...)
164-
onSplit(keys)
165195
}
166196
break
167197
}

br/pkg/restore/split_client.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,11 @@ type SplitClient interface {
6666

6767
// pdClient is a wrapper of pd client, can be used by RegionSplitter.
6868
type pdClient struct {
69-
mu sync.Mutex
70-
client pd.Client
71-
tlsConf *tls.Config
72-
storeCache map[uint64]*metapb.Store
69+
mu sync.Mutex
70+
client pd.Client
71+
tlsConf *tls.Config
72+
spliterConf SpliterConfig
73+
storeCache map[uint64]*metapb.Store
7374

7475
// FIXME when config changed during the lifetime of pdClient,
7576
// this may mislead the scatter.
@@ -80,12 +81,13 @@ type pdClient struct {
8081
}
8182

8283
// NewSplitClient returns a client used by RegionSplitter.
83-
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
84+
func NewSplitClient(client pd.Client, tlsConf *tls.Config, spliterConf SpliterConfig, isRawKv bool) SplitClient {
8485
cli := &pdClient{
85-
client: client,
86-
tlsConf: tlsConf,
87-
storeCache: make(map[uint64]*metapb.Store),
88-
isRawKv: isRawKv,
86+
client: client,
87+
tlsConf: tlsConf,
88+
spliterConf: spliterConf,
89+
storeCache: make(map[uint64]*metapb.Store),
90+
isRawKv: isRawKv,
8991
}
9092
return cli
9193
}
@@ -181,7 +183,12 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
181183
if err != nil {
182184
return nil, errors.Trace(err)
183185
}
184-
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
186+
opt := grpc.WithInsecure()
187+
if c.tlsConf != nil {
188+
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
189+
}
190+
conn, err := grpc.Dial(store.GetAddress(), opt,
191+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
185192
if err != nil {
186193
return nil, errors.Trace(err)
187194
}
@@ -305,7 +312,8 @@ func (c *pdClient) sendSplitRegionRequest(
305312
if c.tlsConf != nil {
306313
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
307314
}
308-
conn, err := grpc.Dial(store.GetAddress(), opt)
315+
conn, err := grpc.Dial(store.GetAddress(), opt,
316+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
309317
if err != nil {
310318
return nil, multierr.Append(splitErrors, err)
311319
}

br/pkg/restore/split_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func ScatterFinishInTimeImpl(t *testing.T, needEncodeKey bool) {
275275
client := initTestClient()
276276
ranges := initRanges()
277277
rewriteRules := initRewriteRules()
278-
regionSplitter := NewRegionSplitter(client)
278+
regionSplitter := NewRegionSplitter(client, defaultSpliterCfg)
279279

280280
ctx := context.Background()
281281
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
@@ -335,7 +335,7 @@ func TestSplitAndScatter(t *testing.T) {
335335
func runTestSplitAndScatterWith(t *testing.T, client *TestClient, needEncodeKey bool) {
336336
ranges := initRanges()
337337
rewriteRules := initRewriteRules()
338-
regionSplitter := NewRegionSplitter(client)
338+
regionSplitter := NewRegionSplitter(client, defaultSpliterCfg)
339339

340340
ctx := context.Background()
341341
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"

br/pkg/restore/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,14 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
109109
func SplitRanges(
110110
ctx context.Context,
111111
client *Client,
112+
spliterConf SpliterConfig,
112113
ranges []rtree.Range,
113114
rewriteRules *RewriteRules,
114115
updateCh glue.Progress,
115116
isRawKv bool,
116117
needEncodeKey bool,
117118
) error {
118-
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))
119+
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), spliterConf, isRawKv), spliterConf)
119120

120121
return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) {
121122
updateCh.Inc()

0 commit comments

Comments
 (0)