Skip to content

Commit 3071814

Browse files
authored
[close #421] [br-release-1.0] br: Split for smaller batch of keys (#422) (#423)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent 1d8847b commit 3071814

File tree

14 files changed

+201
-76
lines changed

14 files changed

+201
-76
lines changed

.github/config/br_pd.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22
[replication]
33
enable-placement-rules = true
44
max-replicas = 1
5+
6+
[schedule]
7+
max-merge-region-size = 2
8+
max-merge-region-keys = 40000
9+
split-merge-interval = "10s"

.github/config/br_rawkv.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,18 @@ pd-heartbeat-tick-interval = "2s"
77
pd-store-heartbeat-tick-interval = "5s"
88
split-region-check-tick-interval = "1s"
99

10+
# 10000000 keys & ~480MB data size in integration tests
11+
[coprocessor]
12+
region-split-size = "10MiB"
13+
region-split-keys = 200000
14+
batch-split-limit = 100
15+
1016
[rocksdb]
1117
max-open-files = 10000
1218

1319
[raftdb]
1420
max-open-files = 10000
1521

1622
[storage]
23+
reserve-space = "0MiB"
1724
enable-ttl = true

.github/workflows/ci-br.yml

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
name: TiKV-BR
22
on:
33
push:
4-
branches: main
4+
branches:
5+
- main
6+
- br-release-*
57
paths: br/**
68
pull_request:
7-
branches: main
9+
branches:
10+
- main
11+
- br-release-*
812
paths: br/**
913

1014
permissions:
@@ -64,7 +68,7 @@ jobs:
6468
runs-on: ubuntu-latest
6569
strategy:
6670
matrix:
67-
tikv_version: [nightly]
71+
tikv_version: [v6.5.3]
6872
api_version: [1, 2]
6973
steps:
7074
- uses: actions/checkout@v2
@@ -77,11 +81,14 @@ jobs:
7781
run: |
7882
# start tikv
7983
echo -e "\napi-version = ${{ matrix.api_version }}\n" >> /home/runner/work/migration/migration/.github/config/br_rawkv.toml
80-
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/migration/migration/.github/config/br_rawkv.toml --pd.config /home/runner/work/migration/migration/.github/config/br_pd.toml &> raw.out 2>&1 &
84+
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/migration/migration/.github/config/br_rawkv.toml --pd.port 2379 --pd.config /home/runner/work/migration/migration/.github/config/br_pd.toml &> raw.out 2>&1 &
8185
# The first run of `tiup` has to download all components so it'll take longer.
82-
sleep 1m 30s
83-
# Parse PD address from `tiup` output
84-
echo "PD_ADDR=$(cat raw.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV
86+
timeout 180 tail -f raw.out | grep -q 'PD Endpoints'
87+
if [ $? -ne 0 ]; then
88+
echo "Failed to start TiKV cluster"
89+
exit 1
90+
fi
91+
echo "PD_ADDR=127.0.0.1:2379" >> $GITHUB_ENV
8592
# Output the api version
8693
echo "API_VERSION=${{ matrix.api_version }}" >> $GITHUB_ENV
8794
# Log the output

br/Makefile

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ PACKAGES := go list ./...
2222
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'
2323

2424
# build & test
25-
BR_BIN_PATH ?= bin/tikv-br
26-
TEST_BIN_PATH ?= bin/tikv-br.test
27-
COVERAGE_DIR ?= build
28-
TEST_PARALLEL ?= 8
29-
PD_ADDR ?= 127.0.0.1:2379
30-
BR_LOCAL_STORE ?= /tmp/backup_restore_test
31-
API_VERSION ?= 1
25+
BR_BIN_PATH ?= bin/tikv-br
26+
TEST_BIN_PATH ?= bin/tikv-br.test
27+
COVERAGE_DIR ?= build
28+
TEST_PARALLEL ?= 8
29+
PD_ADDR ?= 127.0.0.1:2379
30+
SPLIT_REGION_MAX_KEYS ?= 4
31+
BR_LOCAL_STORE ?= /tmp/backup_restore_test
32+
API_VERSION ?= 1
3233

3334
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)"
3435
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
@@ -61,6 +62,7 @@ test: tools/bin/gocov tools/bin/gocov-xml
6162
test/integration: build/br-test build/rawkv-integration-test
6263
./bin/rawkv_test --pd=${PD_ADDR} \
6364
--br='${TEST_BIN_PATH}' \
65+
--split-region-max-keys=${SPLIT_REGION_MAX_KEYS} \
6466
--br-storage=${BR_LOCAL_STORE} \
6567
--api-version=${API_VERSION}
6668

br/pkg/restore/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Client struct {
4343
workerPool *utils.WorkerPool
4444
tlsConf *tls.Config
4545
keepaliveConf keepalive.ClientParameters
46+
spliterConf SpliterConfig
4647

4748
backupMeta *backuppb.BackupMeta
4849
dstAPIVersion kvrpcpb.APIVersion
@@ -64,6 +65,7 @@ func NewRestoreClient(
6465
pdClient pd.Client,
6566
tlsConf *tls.Config,
6667
keepaliveConf keepalive.ClientParameters,
68+
spliterConf SpliterConfig,
6769
isRawKv bool,
6870
) (*Client, error) {
6971
apiVerion, err := conn.GetTiKVApiVersion(context.Background(), pdClient, tlsConf)
@@ -72,9 +74,10 @@ func NewRestoreClient(
7274
}
7375
return &Client{
7476
pdClient: pdClient,
75-
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
77+
toolClient: NewSplitClient(pdClient, tlsConf, spliterConf, isRawKv),
7678
tlsConf: tlsConf,
7779
keepaliveConf: keepaliveConf,
80+
spliterConf: spliterConf,
7881
switchCh: make(chan struct{}),
7982
dstAPIVersion: apiVerion,
8083
}, nil
@@ -132,7 +135,7 @@ func (rc *Client) InitBackupMeta(
132135
}
133136
rc.backupMeta = backupMeta
134137

135-
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
138+
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.spliterConf, rc.backupMeta.IsRawKv)
136139
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
137140
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
138141
rc.backupMeta.ApiVersion, rc.rateLimit)

br/pkg/restore/split.go

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,23 @@ var (
5353
ScanRegionAttemptTimes = 30
5454
)
5555

56+
// SpliterConfig is the config for region split.
57+
type SpliterConfig struct {
58+
GRPCMaxRecvMsgSize int
59+
SplitRegionMaxKeys int
60+
}
61+
5662
// RegionSplitter is a executor of region split by rules.
5763
type RegionSplitter struct {
5864
client SplitClient
65+
conf SpliterConfig
5966
}
6067

6168
// NewRegionSplitter returns a new RegionSplitter.
62-
func NewRegionSplitter(client SplitClient) *RegionSplitter {
69+
func NewRegionSplitter(client SplitClient, conf SpliterConfig) *RegionSplitter {
6370
return &RegionSplitter{
6471
client: client,
72+
conf: conf,
6573
}
6674
}
6775

@@ -122,46 +130,68 @@ SplitRegions:
122130
for _, region := range regions {
123131
regionMap[region.Region.GetId()] = region
124132
}
125-
for regionID, keys := range splitKeyMap {
126-
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
127-
var newRegions []*RegionInfo
128-
region := regionMap[regionID]
129-
log.Info("split regions",
130-
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
131-
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
132-
if errSplit != nil {
133-
if strings.Contains(errSplit.Error(), "no valid key") {
134-
for _, key := range keys {
135-
// Region start/end keys are encoded. split_region RPC
136-
// requires raw keys (without encoding).
137-
log.Error("split regions no valid key",
138-
logutil.Key("startKey", region.Region.StartKey),
139-
logutil.Key("endKey", region.Region.EndKey),
140-
logutil.Key("key", codec.EncodeBytes(nil, key)),
141-
rtree.ZapRanges(ranges))
133+
for regionID, regionKeys := range splitKeyMap {
134+
log.Info("get split keys for region", zap.Int("len", len(regionKeys)), zap.Uint64("region", regionID))
135+
136+
for i := 0; i < len(regionKeys); i += rs.conf.SplitRegionMaxKeys {
137+
end := i + rs.conf.SplitRegionMaxKeys
138+
if end > len(regionKeys) {
139+
end = len(regionKeys)
140+
}
141+
keys := regionKeys[i:end]
142+
143+
var region *RegionInfo
144+
if i == 0 {
145+
region = regionMap[regionID]
146+
} else {
147+
encodedKey := codec.EncodeBytes(nil, keys[0])
148+
var errGetRegion error
149+
region, errGetRegion = rs.client.GetRegion(ctx, encodedKey)
150+
if errGetRegion != nil {
151+
time.Sleep(interval)
152+
log.Warn("get region failed, retry", logutil.Key("encodedKey", encodedKey), zap.Error(errGetRegion))
153+
continue SplitRegions
142154
}
143-
return errors.Trace(errSplit)
144155
}
145-
interval = 2 * interval
146-
if interval > SplitMaxRetryInterval {
147-
interval = SplitMaxRetryInterval
156+
157+
log.Info("split regions",
158+
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
159+
var newRegions []*RegionInfo
160+
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
161+
if errSplit != nil {
162+
if strings.Contains(errSplit.Error(), "no valid key") {
163+
for _, key := range keys {
164+
// Region start/end keys are encoded. split_region RPC
165+
// requires raw keys (without encoding).
166+
log.Error("split regions no valid key",
167+
logutil.Key("startKey", region.Region.StartKey),
168+
logutil.Key("endKey", region.Region.EndKey),
169+
logutil.Key("key", codec.EncodeBytes(nil, key)),
170+
rtree.ZapRanges(ranges))
171+
}
172+
return errors.Trace(errSplit)
173+
}
174+
interval = 2 * interval
175+
if interval > SplitMaxRetryInterval {
176+
interval = SplitMaxRetryInterval
177+
}
178+
time.Sleep(interval)
179+
log.Warn("split regions failed, retry",
180+
zap.Error(errSplit),
181+
logutil.Region(region.Region),
182+
logutil.Leader(region.Leader),
183+
logutil.Keys(keys), rtree.ZapRanges(ranges))
184+
continue SplitRegions
148185
}
149-
time.Sleep(interval)
150-
log.Warn("split regions failed, retry",
151-
zap.Error(errSplit),
152-
logutil.Region(region.Region),
153-
logutil.Leader(region.Leader),
154-
logutil.Keys(keys), rtree.ZapRanges(ranges))
155-
continue SplitRegions
156-
}
157-
log.Info("scattered regions", zap.Int("count", len(newRegions)))
158-
if len(newRegions) != len(keys) {
159-
log.Warn("split key count and new region count mismatch",
160-
zap.Int("new region count", len(newRegions)),
161-
zap.Int("split key count", len(keys)))
186+
log.Info("scattered regions", zap.Int("count", len(newRegions)))
187+
if len(newRegions) != len(keys) {
188+
log.Warn("split key count and new region count mismatch",
189+
zap.Int("new region count", len(newRegions)),
190+
zap.Int("split key count", len(keys)))
191+
}
192+
scatterRegions = append(scatterRegions, newRegions...)
193+
onSplit(keys)
162194
}
163-
scatterRegions = append(scatterRegions, newRegions...)
164-
onSplit(keys)
165195
}
166196
break
167197
}

br/pkg/restore/split_client.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,11 @@ type SplitClient interface {
7979

8080
// pdClient is a wrapper of pd client, can be used by RegionSplitter.
8181
type pdClient struct {
82-
mu sync.Mutex
83-
client pd.Client
84-
tlsConf *tls.Config
85-
storeCache map[uint64]*metapb.Store
82+
mu sync.Mutex
83+
client pd.Client
84+
tlsConf *tls.Config
85+
spliterConf SpliterConfig
86+
storeCache map[uint64]*metapb.Store
8687

8788
// FIXME when config changed during the lifetime of pdClient,
8889
// this may mislead the scatter.
@@ -93,12 +94,13 @@ type pdClient struct {
9394
}
9495

9596
// NewSplitClient returns a client used by RegionSplitter.
96-
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
97+
func NewSplitClient(client pd.Client, tlsConf *tls.Config, spliterConf SpliterConfig, isRawKv bool) SplitClient {
9798
cli := &pdClient{
98-
client: client,
99-
tlsConf: tlsConf,
100-
storeCache: make(map[uint64]*metapb.Store),
101-
isRawKv: isRawKv,
99+
client: client,
100+
tlsConf: tlsConf,
101+
spliterConf: spliterConf,
102+
storeCache: make(map[uint64]*metapb.Store),
103+
isRawKv: isRawKv,
102104
}
103105
return cli
104106
}
@@ -194,7 +196,12 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
194196
if err != nil {
195197
return nil, errors.Trace(err)
196198
}
197-
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
199+
opt := grpc.WithInsecure()
200+
if c.tlsConf != nil {
201+
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
202+
}
203+
conn, err := grpc.Dial(store.GetAddress(), opt,
204+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
198205
if err != nil {
199206
return nil, errors.Trace(err)
200207
}
@@ -318,7 +325,8 @@ func (c *pdClient) sendSplitRegionRequest(
318325
if c.tlsConf != nil {
319326
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
320327
}
321-
conn, err := grpc.Dial(store.GetAddress(), opt)
328+
conn, err := grpc.Dial(store.GetAddress(), opt,
329+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
322330
if err != nil {
323331
return nil, multierr.Append(splitErrors, err)
324332
}

br/pkg/restore/split_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424
"google.golang.org/grpc/status"
2525
)
2626

27+
var defaultSpliterCfg = restore.SpliterConfig{
28+
GRPCMaxRecvMsgSize: 1024 * 1024,
29+
SplitRegionMaxKeys: 16,
30+
}
31+
2732
type TestClient struct {
2833
mu sync.RWMutex
2934
stores map[uint64]*metapb.Store
@@ -276,7 +281,7 @@ func ScatterFinishInTimeImpl(t *testing.T, needEncodeKey bool) {
276281
client := initTestClient()
277282
ranges := initRanges()
278283
rewriteRules := initRewriteRules()
279-
regionSplitter := restore.NewRegionSplitter(client)
284+
regionSplitter := restore.NewRegionSplitter(client, defaultSpliterCfg)
280285

281286
ctx := context.Background()
282287
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
@@ -335,7 +340,7 @@ func TestSplitAndScatter(t *testing.T) {
335340
func runTestSplitAndScatterWith(t *testing.T, client *TestClient, needEncodeKey bool) {
336341
ranges := initRanges()
337342
rewriteRules := initRewriteRules()
338-
regionSplitter := restore.NewRegionSplitter(client)
343+
regionSplitter := restore.NewRegionSplitter(client, defaultSpliterCfg)
339344

340345
ctx := context.Background()
341346
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"

br/pkg/restore/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,14 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
110110
func SplitRanges(
111111
ctx context.Context,
112112
client *Client,
113+
spliterConf SpliterConfig,
113114
ranges []rtree.Range,
114115
rewriteRules *RewriteRules,
115116
updateCh glue.Progress,
116117
isRawKv bool,
117118
needEncodeKey bool,
118119
) error {
119-
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))
120+
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), spliterConf, isRawKv), spliterConf)
120121

121122
return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) {
122123
updateCh.Inc()

0 commit comments

Comments
 (0)