Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/config/br_pd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
[replication]
enable-placement-rules = true
max-replicas = 1

[schedule]
max-merge-region-size = 2
max-merge-region-keys = 40000
split-merge-interval = "10s"
7 changes: 7 additions & 0 deletions .github/config/br_rawkv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"

# 10000000 keys & ~480MB data size in integration tests
[coprocessor]
region-split-size = "10MiB"
region-split-keys = 200000
batch-split-limit = 100

[rocksdb]
max-open-files = 10000

[raftdb]
max-open-files = 10000

[storage]
reserve-space = "0MiB"
enable-ttl = true
21 changes: 14 additions & 7 deletions .github/workflows/ci-br.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
name: TiKV-BR
on:
push:
branches: main
branches:
- main
- br-release-*
paths: br/**
pull_request:
branches: main
branches:
- main
- br-release-*
paths: br/**

permissions:
Expand Down Expand Up @@ -64,7 +68,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
tikv_version: [nightly]
tikv_version: [v6.5.3]
api_version: [1, 2]
steps:
- uses: actions/checkout@v2
Expand All @@ -77,11 +81,14 @@ jobs:
run: |
# start tikv
echo -e "\napi-version = ${{ matrix.api_version }}\n" >> /home/runner/work/migration/migration/.github/config/br_rawkv.toml
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/migration/migration/.github/config/br_rawkv.toml --pd.config /home/runner/work/migration/migration/.github/config/br_pd.toml &> raw.out 2>&1 &
/home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/migration/migration/.github/config/br_rawkv.toml --pd.port 2379 --pd.config /home/runner/work/migration/migration/.github/config/br_pd.toml &> raw.out 2>&1 &
# The first run of `tiup` has to download all components so it'll take longer.
sleep 1m 30s
# Parse PD address from `tiup` output
echo "PD_ADDR=$(cat raw.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV
timeout 180 tail -f raw.out | grep -q 'PD Endpoints'
if [ $? -ne 0 ]; then
echo "Failed to start TiKV cluster"
exit 1
fi
echo "PD_ADDR=127.0.0.1:2379" >> $GITHUB_ENV
# Output the api version
echo "API_VERSION=${{ matrix.api_version }}" >> $GITHUB_ENV
# Log the output
Expand Down
16 changes: 9 additions & 7 deletions br/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ PACKAGES := go list ./...
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'

# build & test
BR_BIN_PATH ?= bin/tikv-br
TEST_BIN_PATH ?= bin/tikv-br.test
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1
BR_BIN_PATH ?= bin/tikv-br
TEST_BIN_PATH ?= bin/tikv-br.test
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
SPLIT_REGION_MAX_KEYS ?= 4
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1

LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
Expand Down Expand Up @@ -61,6 +62,7 @@ test: tools/bin/gocov tools/bin/gocov-xml
test/integration: build/br-test build/rawkv-integration-test
./bin/rawkv_test --pd=${PD_ADDR} \
--br='${TEST_BIN_PATH}' \
--split-region-max-keys=${SPLIT_REGION_MAX_KEYS} \
--br-storage=${BR_LOCAL_STORE} \
--api-version=${API_VERSION}

Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Client struct {
workerPool *utils.WorkerPool
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
spliterConf SpliterConfig

backupMeta *backuppb.BackupMeta
dstAPIVersion kvrpcpb.APIVersion
Expand All @@ -64,6 +65,7 @@ func NewRestoreClient(
pdClient pd.Client,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
spliterConf SpliterConfig,
isRawKv bool,
) (*Client, error) {
apiVerion, err := conn.GetTiKVApiVersion(context.Background(), pdClient, tlsConf)
Expand All @@ -72,9 +74,10 @@ func NewRestoreClient(
}
return &Client{
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
toolClient: NewSplitClient(pdClient, tlsConf, spliterConf, isRawKv),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
spliterConf: spliterConf,
switchCh: make(chan struct{}),
dstAPIVersion: apiVerion,
}, nil
Expand Down Expand Up @@ -132,7 +135,7 @@ func (rc *Client) InitBackupMeta(
}
rc.backupMeta = backupMeta

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.spliterConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
rc.backupMeta.ApiVersion, rc.rateLimit)
Expand Down
104 changes: 67 additions & 37 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,23 @@ var (
ScanRegionAttemptTimes = 30
)

// SpliterConfig is the config for region split.
type SpliterConfig struct {
GRPCMaxRecvMsgSize int
SplitRegionMaxKeys int
}

// RegionSplitter is a executor of region split by rules.
type RegionSplitter struct {
client SplitClient
conf SpliterConfig
}

// NewRegionSplitter returns a new RegionSplitter.
func NewRegionSplitter(client SplitClient) *RegionSplitter {
func NewRegionSplitter(client SplitClient, conf SpliterConfig) *RegionSplitter {
return &RegionSplitter{
client: client,
conf: conf,
}
}

Expand Down Expand Up @@ -122,46 +130,68 @@ SplitRegions:
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}
for regionID, keys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
var newRegions []*RegionInfo
region := regionMap[regionID]
log.Info("split regions",
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
if errSplit != nil {
if strings.Contains(errSplit.Error(), "no valid key") {
for _, key := range keys {
// Region start/end keys are encoded. split_region RPC
// requires raw keys (without encoding).
log.Error("split regions no valid key",
logutil.Key("startKey", region.Region.StartKey),
logutil.Key("endKey", region.Region.EndKey),
logutil.Key("key", codec.EncodeBytes(nil, key)),
rtree.ZapRanges(ranges))
for regionID, regionKeys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(regionKeys)), zap.Uint64("region", regionID))

for i := 0; i < len(regionKeys); i += rs.conf.SplitRegionMaxKeys {
end := i + rs.conf.SplitRegionMaxKeys
if end > len(regionKeys) {
end = len(regionKeys)
}
keys := regionKeys[i:end]

var region *RegionInfo
if i == 0 {
region = regionMap[regionID]
} else {
encodedKey := codec.EncodeBytes(nil, keys[0])
var errGetRegion error
region, errGetRegion = rs.client.GetRegion(ctx, encodedKey)
if errGetRegion != nil {
time.Sleep(interval)
log.Warn("get region failed, retry", logutil.Key("encodedKey", encodedKey), zap.Error(errGetRegion))
continue SplitRegions
}
return errors.Trace(errSplit)
}
interval = 2 * interval
if interval > SplitMaxRetryInterval {
interval = SplitMaxRetryInterval

log.Info("split regions",
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
var newRegions []*RegionInfo
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
if errSplit != nil {
if strings.Contains(errSplit.Error(), "no valid key") {
for _, key := range keys {
// Region start/end keys are encoded. split_region RPC
// requires raw keys (without encoding).
log.Error("split regions no valid key",
logutil.Key("startKey", region.Region.StartKey),
logutil.Key("endKey", region.Region.EndKey),
logutil.Key("key", codec.EncodeBytes(nil, key)),
rtree.ZapRanges(ranges))
}
return errors.Trace(errSplit)
}
interval = 2 * interval
if interval > SplitMaxRetryInterval {
interval = SplitMaxRetryInterval
}
time.Sleep(interval)
log.Warn("split regions failed, retry",
zap.Error(errSplit),
logutil.Region(region.Region),
logutil.Leader(region.Leader),
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
time.Sleep(interval)
log.Warn("split regions failed, retry",
zap.Error(errSplit),
logutil.Region(region.Region),
logutil.Leader(region.Leader),
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(keys)))
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(keys)))
}
scatterRegions = append(scatterRegions, newRegions...)
onSplit(keys)
}
scatterRegions = append(scatterRegions, newRegions...)
onSplit(keys)
}
break
}
Expand Down
30 changes: 19 additions & 11 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ type SplitClient interface {

// pdClient is a wrapper of pd client, can be used by RegionSplitter.
type pdClient struct {
mu sync.Mutex
client pd.Client
tlsConf *tls.Config
storeCache map[uint64]*metapb.Store
mu sync.Mutex
client pd.Client
tlsConf *tls.Config
spliterConf SpliterConfig
storeCache map[uint64]*metapb.Store

// FIXME when config changed during the lifetime of pdClient,
// this may mislead the scatter.
Expand All @@ -93,12 +94,13 @@ type pdClient struct {
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config, spliterConf SpliterConfig, isRawKv bool) SplitClient {
cli := &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
client: client,
tlsConf: tlsConf,
spliterConf: spliterConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
}
return cli
}
Expand Down Expand Up @@ -194,7 +196,12 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -318,7 +325,8 @@ func (c *pdClient) sendSplitRegionRequest(
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
conn, err := grpc.Dial(store.GetAddress(), opt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"google.golang.org/grpc/status"
)

var defaultSpliterCfg = restore.SpliterConfig{
GRPCMaxRecvMsgSize: 1024 * 1024,
SplitRegionMaxKeys: 16,
}

type TestClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
Expand Down Expand Up @@ -276,7 +281,7 @@ func ScatterFinishInTimeImpl(t *testing.T, needEncodeKey bool) {
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)
regionSplitter := restore.NewRegionSplitter(client, defaultSpliterCfg)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
Expand Down Expand Up @@ -335,7 +340,7 @@ func TestSplitAndScatter(t *testing.T) {
func runTestSplitAndScatterWith(t *testing.T, client *TestClient, needEncodeKey bool) {
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)
regionSplitter := restore.NewRegionSplitter(client, defaultSpliterCfg)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
func SplitRanges(
ctx context.Context,
client *Client,
spliterConf SpliterConfig,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool,
needEncodeKey bool,
) error {
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), spliterConf, isRawKv), spliterConf)

return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) {
updateCh.Inc()
Expand Down
Loading