Skip to content

Commit 9dd952b

Browse files
committed
[close #421] br: Split for smaller batch of keys (#422)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent 1d8847b commit 9dd952b

File tree

13 files changed

+182
-69
lines changed

13 files changed

+182
-69
lines changed

.github/config/br_pd.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22
[replication]
33
enable-placement-rules = true
44
max-replicas = 1
5+
6+
[schedule]
7+
max-merge-region-size = 2
8+
max-merge-region-keys = 40000
9+
split-merge-interval = "10s"

.github/config/br_rawkv.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,18 @@ pd-heartbeat-tick-interval = "2s"
77
pd-store-heartbeat-tick-interval = "5s"
88
split-region-check-tick-interval = "1s"
99

10+
# 10000000 keys & ~480MB data size in integration tests
11+
[coprocessor]
12+
region-split-size = "10MiB"
13+
region-split-keys = 200000
14+
batch-split-limit = 100
15+
1016
[rocksdb]
1117
max-open-files = 10000
1218

1319
[raftdb]
1420
max-open-files = 10000
1521

1622
[storage]
23+
reserve-space = "0MiB"
1724
enable-ttl = true

br/Makefile

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ PACKAGES := go list ./...
2222
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'
2323

2424
# build & test
25-
BR_BIN_PATH ?= bin/tikv-br
26-
TEST_BIN_PATH ?= bin/tikv-br.test
27-
COVERAGE_DIR ?= build
28-
TEST_PARALLEL ?= 8
29-
PD_ADDR ?= 127.0.0.1:2379
30-
BR_LOCAL_STORE ?= /tmp/backup_restore_test
31-
API_VERSION ?= 1
25+
BR_BIN_PATH ?= bin/tikv-br
26+
TEST_BIN_PATH ?= bin/tikv-br.test
27+
COVERAGE_DIR ?= build
28+
TEST_PARALLEL ?= 8
29+
PD_ADDR ?= 127.0.0.1:2379
30+
SPLIT_REGION_MAX_KEYS ?= 4
31+
BR_LOCAL_STORE ?= /tmp/backup_restore_test
32+
API_VERSION ?= 1
3233

3334
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)"
3435
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
@@ -61,6 +62,7 @@ test: tools/bin/gocov tools/bin/gocov-xml
6162
test/integration: build/br-test build/rawkv-integration-test
6263
./bin/rawkv_test --pd=${PD_ADDR} \
6364
--br='${TEST_BIN_PATH}' \
65+
--split-region-max-keys=${SPLIT_REGION_MAX_KEYS} \
6466
--br-storage=${BR_LOCAL_STORE} \
6567
--api-version=${API_VERSION}
6668

br/pkg/restore/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Client struct {
4343
workerPool *utils.WorkerPool
4444
tlsConf *tls.Config
4545
keepaliveConf keepalive.ClientParameters
46+
spliterConf SpliterConfig
4647

4748
backupMeta *backuppb.BackupMeta
4849
dstAPIVersion kvrpcpb.APIVersion
@@ -64,6 +65,7 @@ func NewRestoreClient(
6465
pdClient pd.Client,
6566
tlsConf *tls.Config,
6667
keepaliveConf keepalive.ClientParameters,
68+
spliterConf SpliterConfig,
6769
isRawKv bool,
6870
) (*Client, error) {
6971
apiVerion, err := conn.GetTiKVApiVersion(context.Background(), pdClient, tlsConf)
@@ -72,9 +74,10 @@ func NewRestoreClient(
7274
}
7375
return &Client{
7476
pdClient: pdClient,
75-
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
77+
toolClient: NewSplitClient(pdClient, tlsConf, spliterConf, isRawKv),
7678
tlsConf: tlsConf,
7779
keepaliveConf: keepaliveConf,
80+
spliterConf: spliterConf,
7881
switchCh: make(chan struct{}),
7982
dstAPIVersion: apiVerion,
8083
}, nil
@@ -132,7 +135,7 @@ func (rc *Client) InitBackupMeta(
132135
}
133136
rc.backupMeta = backupMeta
134137

135-
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
138+
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.spliterConf, rc.backupMeta.IsRawKv)
136139
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
137140
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
138141
rc.backupMeta.ApiVersion, rc.rateLimit)

br/pkg/restore/split.go

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,23 @@ var (
5353
ScanRegionAttemptTimes = 30
5454
)
5555

56+
// SpliterConfig is the config for region split.
57+
type SpliterConfig struct {
58+
GRPCMaxRecvMsgSize int
59+
SplitRegionMaxKeys int
60+
}
61+
5662
// RegionSplitter is a executor of region split by rules.
5763
type RegionSplitter struct {
5864
client SplitClient
65+
conf SpliterConfig
5966
}
6067

6168
// NewRegionSplitter returns a new RegionSplitter.
62-
func NewRegionSplitter(client SplitClient) *RegionSplitter {
69+
func NewRegionSplitter(client SplitClient, conf SpliterConfig) *RegionSplitter {
6370
return &RegionSplitter{
6471
client: client,
72+
conf: conf,
6573
}
6674
}
6775

@@ -122,46 +130,68 @@ SplitRegions:
122130
for _, region := range regions {
123131
regionMap[region.Region.GetId()] = region
124132
}
125-
for regionID, keys := range splitKeyMap {
126-
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
127-
var newRegions []*RegionInfo
128-
region := regionMap[regionID]
129-
log.Info("split regions",
130-
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
131-
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
132-
if errSplit != nil {
133-
if strings.Contains(errSplit.Error(), "no valid key") {
134-
for _, key := range keys {
135-
// Region start/end keys are encoded. split_region RPC
136-
// requires raw keys (without encoding).
137-
log.Error("split regions no valid key",
138-
logutil.Key("startKey", region.Region.StartKey),
139-
logutil.Key("endKey", region.Region.EndKey),
140-
logutil.Key("key", codec.EncodeBytes(nil, key)),
141-
rtree.ZapRanges(ranges))
133+
for regionID, regionKeys := range splitKeyMap {
134+
log.Info("get split keys for region", zap.Int("len", len(regionKeys)), zap.Uint64("region", regionID))
135+
136+
for i := 0; i < len(regionKeys); i += rs.conf.SplitRegionMaxKeys {
137+
end := i + rs.conf.SplitRegionMaxKeys
138+
if end > len(regionKeys) {
139+
end = len(regionKeys)
140+
}
141+
keys := regionKeys[i:end]
142+
143+
var region *RegionInfo
144+
if i == 0 {
145+
region = regionMap[regionID]
146+
} else {
147+
encodedKey := codec.EncodeBytes(nil, keys[0])
148+
var errGetRegion error
149+
region, errGetRegion = rs.client.GetRegion(ctx, encodedKey)
150+
if errGetRegion != nil {
151+
time.Sleep(interval)
152+
log.Warn("get region failed, retry", logutil.Key("encodedKey", encodedKey), zap.Error(errGetRegion))
153+
continue SplitRegions
142154
}
143-
return errors.Trace(errSplit)
144155
}
145-
interval = 2 * interval
146-
if interval > SplitMaxRetryInterval {
147-
interval = SplitMaxRetryInterval
156+
157+
log.Info("split regions",
158+
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
159+
var newRegions []*RegionInfo
160+
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
161+
if errSplit != nil {
162+
if strings.Contains(errSplit.Error(), "no valid key") {
163+
for _, key := range keys {
164+
// Region start/end keys are encoded. split_region RPC
165+
// requires raw keys (without encoding).
166+
log.Error("split regions no valid key",
167+
logutil.Key("startKey", region.Region.StartKey),
168+
logutil.Key("endKey", region.Region.EndKey),
169+
logutil.Key("key", codec.EncodeBytes(nil, key)),
170+
rtree.ZapRanges(ranges))
171+
}
172+
return errors.Trace(errSplit)
173+
}
174+
interval = 2 * interval
175+
if interval > SplitMaxRetryInterval {
176+
interval = SplitMaxRetryInterval
177+
}
178+
time.Sleep(interval)
179+
log.Warn("split regions failed, retry",
180+
zap.Error(errSplit),
181+
logutil.Region(region.Region),
182+
logutil.Leader(region.Leader),
183+
logutil.Keys(keys), rtree.ZapRanges(ranges))
184+
continue SplitRegions
148185
}
149-
time.Sleep(interval)
150-
log.Warn("split regions failed, retry",
151-
zap.Error(errSplit),
152-
logutil.Region(region.Region),
153-
logutil.Leader(region.Leader),
154-
logutil.Keys(keys), rtree.ZapRanges(ranges))
155-
continue SplitRegions
156-
}
157-
log.Info("scattered regions", zap.Int("count", len(newRegions)))
158-
if len(newRegions) != len(keys) {
159-
log.Warn("split key count and new region count mismatch",
160-
zap.Int("new region count", len(newRegions)),
161-
zap.Int("split key count", len(keys)))
186+
log.Info("scattered regions", zap.Int("count", len(newRegions)))
187+
if len(newRegions) != len(keys) {
188+
log.Warn("split key count and new region count mismatch",
189+
zap.Int("new region count", len(newRegions)),
190+
zap.Int("split key count", len(keys)))
191+
}
192+
scatterRegions = append(scatterRegions, newRegions...)
193+
onSplit(keys)
162194
}
163-
scatterRegions = append(scatterRegions, newRegions...)
164-
onSplit(keys)
165195
}
166196
break
167197
}

br/pkg/restore/split_client.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,11 @@ type SplitClient interface {
7979

8080
// pdClient is a wrapper of pd client, can be used by RegionSplitter.
8181
type pdClient struct {
82-
mu sync.Mutex
83-
client pd.Client
84-
tlsConf *tls.Config
85-
storeCache map[uint64]*metapb.Store
82+
mu sync.Mutex
83+
client pd.Client
84+
tlsConf *tls.Config
85+
spliterConf SpliterConfig
86+
storeCache map[uint64]*metapb.Store
8687

8788
// FIXME when config changed during the lifetime of pdClient,
8889
// this may mislead the scatter.
@@ -93,12 +94,13 @@ type pdClient struct {
9394
}
9495

9596
// NewSplitClient returns a client used by RegionSplitter.
96-
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
97+
func NewSplitClient(client pd.Client, tlsConf *tls.Config, spliterConf SpliterConfig, isRawKv bool) SplitClient {
9798
cli := &pdClient{
98-
client: client,
99-
tlsConf: tlsConf,
100-
storeCache: make(map[uint64]*metapb.Store),
101-
isRawKv: isRawKv,
99+
client: client,
100+
tlsConf: tlsConf,
101+
spliterConf: spliterConf,
102+
storeCache: make(map[uint64]*metapb.Store),
103+
isRawKv: isRawKv,
102104
}
103105
return cli
104106
}
@@ -194,7 +196,12 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
194196
if err != nil {
195197
return nil, errors.Trace(err)
196198
}
197-
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
199+
opt := grpc.WithInsecure()
200+
if c.tlsConf != nil {
201+
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
202+
}
203+
conn, err := grpc.Dial(store.GetAddress(), opt,
204+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
198205
if err != nil {
199206
return nil, errors.Trace(err)
200207
}
@@ -318,7 +325,8 @@ func (c *pdClient) sendSplitRegionRequest(
318325
if c.tlsConf != nil {
319326
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
320327
}
321-
conn, err := grpc.Dial(store.GetAddress(), opt)
328+
conn, err := grpc.Dial(store.GetAddress(), opt,
329+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
322330
if err != nil {
323331
return nil, multierr.Append(splitErrors, err)
324332
}

br/pkg/restore/split_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func ScatterFinishInTimeImpl(t *testing.T, needEncodeKey bool) {
276276
client := initTestClient()
277277
ranges := initRanges()
278278
rewriteRules := initRewriteRules()
279-
regionSplitter := restore.NewRegionSplitter(client)
279+
regionSplitter := restore.NewRegionSplitter(client, defaultSpliterCfg)
280280

281281
ctx := context.Background()
282282
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
@@ -335,7 +335,7 @@ func TestSplitAndScatter(t *testing.T) {
335335
func runTestSplitAndScatterWith(t *testing.T, client *TestClient, needEncodeKey bool) {
336336
ranges := initRanges()
337337
rewriteRules := initRewriteRules()
338-
regionSplitter := restore.NewRegionSplitter(client)
338+
regionSplitter := restore.NewRegionSplitter(client, defaultSpliterCfg)
339339

340340
ctx := context.Background()
341341
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"

br/pkg/restore/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,14 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
110110
func SplitRanges(
111111
ctx context.Context,
112112
client *Client,
113+
spliterConf SpliterConfig,
113114
ranges []rtree.Range,
114115
rewriteRules *RewriteRules,
115116
updateCh glue.Progress,
116117
isRawKv bool,
117118
needEncodeKey bool,
118119
) error {
119-
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))
120+
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), spliterConf, isRawKv), spliterConf)
120121

121122
return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) {
122123
updateCh.Inc()

br/pkg/task/common.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
berrors "github.com/tikv/migration/br/pkg/errors"
2828
"github.com/tikv/migration/br/pkg/glue"
2929
"github.com/tikv/migration/br/pkg/metautil"
30+
"github.com/tikv/migration/br/pkg/restore"
3031
"github.com/tikv/migration/br/pkg/storage"
3132
pd "github.com/tikv/pd/client"
3233
"go.uber.org/zap"
@@ -60,14 +61,19 @@ const (
6061
flagGrpcKeepaliveTime = "grpc-keepalive-time"
6162
// flagGrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
6263
flagGrpcKeepaliveTimeout = "grpc-keepalive-timeout"
64+
// flagGrpcMaxRecvMsgSize is the max allowed size of a message received from tikv.
65+
flagGrpcMaxRecvMsgSize = "grpc-max-recv-msg-size"
6366
// flagEnableOpenTracing is whether to enable opentracing
64-
flagEnableOpenTracing = "enable-opentracing"
65-
flagSkipCheckPath = "skip-check-path"
67+
flagEnableOpenTracing = "enable-opentracing"
68+
flagSkipCheckPath = "skip-check-path"
69+
flagSplitRegionMaxKeys = "split-region-max-keys"
6670

6771
defaultSwitchInterval = 5 * time.Minute
6872
defaultGRPCKeepaliveTime = 10 * time.Second
6973
defaultGRPCKeepaliveTimeout = 3 * time.Second
74+
defaultGRPCMaxRecvMsgSize = 32 * 1024 * 1024 // 32MB
7075
defaultChecksumConcurrency = 512
76+
defaultSplitRegionMaxKeys = 1024
7177

7278
flagCipherType = "crypter.method"
7379
flagCipherKey = "crypter.key"
@@ -115,8 +121,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
115121
"the interval of pinging gRPC peer, must keep the same value with TiKV and PD")
116122
flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout,
117123
"the max time a gRPC connection can keep idle before killed, must keep the same value with TiKV and PD")
124+
flags.Uint(flagGrpcMaxRecvMsgSize, defaultGRPCMaxRecvMsgSize,
125+
"the max allowed size of a message received from TiKV")
118126
_ = flags.MarkHidden(flagGrpcKeepaliveTime)
119127
_ = flags.MarkHidden(flagGrpcKeepaliveTimeout)
128+
_ = flags.MarkHidden(flagGrpcMaxRecvMsgSize)
120129

121130
flags.Bool(flagEnableOpenTracing, false,
122131
"Set whether to enable opentracing during the backup/restore process")
@@ -125,6 +134,9 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
125134
_ = flags.MarkHidden(flagNoCreds)
126135
flags.BoolP(flagSkipCheckPath, "", false, "Skip path verification")
127136
_ = flags.MarkHidden(flagSkipCheckPath)
137+
flags.Uint(flagSplitRegionMaxKeys, defaultSplitRegionMaxKeys,
138+
"The max number of keys in a single split region request")
139+
_ = flags.MarkHidden(flagSplitRegionMaxKeys)
128140

129141
flags.String(flagCipherType, "plaintext", "Encrypt/decrypt method, "+
130142
"be one of plaintext|aes128-ctr|aes192-ctr|aes256-ctr case-insensitively, "+
@@ -343,6 +355,14 @@ func GetKeepalive(cfg *Config) keepalive.ClientParameters {
343355
}
344356
}
345357

358+
// GetSpliterConfig get the spliter config.
359+
func GetSpliterConfig(cfg *Config) restore.SpliterConfig {
360+
return restore.SpliterConfig{
361+
GRPCMaxRecvMsgSize: int(cfg.GRPCMaxRecvMsgSize),
362+
SplitRegionMaxKeys: int(cfg.SplitRegionMaxKeys),
363+
}
364+
}
365+
346366
func normalizePDURL(pd string, useTLS bool) (string, error) {
347367
if strings.HasPrefix(pd, "http://") {
348368
if useTLS {

0 commit comments

Comments
 (0)