Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/config/br_pd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
[replication]
enable-placement-rules = true
max-replicas = 1

[schedule]
max-merge-region-size = 2
max-merge-region-keys = 40000
split-merge-interval = "10s"
7 changes: 7 additions & 0 deletions .github/config/br_rawkv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"

# 10000000 keys & ~480MB data size in integration tests
[coprocessor]
region-split-size = "10MiB"
region-split-keys = 200000
batch-split-limit = 100

[rocksdb]
max-open-files = 10000

[raftdb]
max-open-files = 10000

[storage]
reserve-space = "0MiB"
enable-ttl = true
24 changes: 13 additions & 11 deletions br/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ PACKAGES := go list ./...
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'

# build & test
BR_BIN_PATH ?= bin/tikv-br
TEST_BIN_PATH ?= bin/tikv-br.test
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1
CLUSTER_VERSION ?= nightly
TLS_CA ?=
TLS_CERT ?=
TLS_KEY ?=
BR_BIN_PATH ?= bin/tikv-br
TEST_BIN_PATH ?= bin/tikv-br.test
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
SPLIT_REGION_MAX_KEYS ?= 4
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1
CLUSTER_VERSION ?= nightly
TLS_CA ?=
TLS_CERT ?=
TLS_KEY ?=

LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
Expand Down Expand Up @@ -71,6 +72,7 @@ test/integration: build/br-test build/rawkv-integration-test
mkdir -p $(COVERAGE_DIR)
./bin/rawkv_test --pd=${PD_ADDR} \
--br='${TEST_BIN_PATH}' \
--split-region-max-keys=${SPLIT_REGION_MAX_KEYS} \
--br-storage=${BR_LOCAL_STORE} \
--api-version=${API_VERSION} \
--cluster-version=${CLUSTER_VERSION} \
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Client struct {
workerPool *utils.WorkerPool
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
spliterConf SpliterConfig

backupMeta *backuppb.BackupMeta
dstAPIVersion kvrpcpb.APIVersion
Expand All @@ -64,6 +65,7 @@ func NewRestoreClient(
pdClient pd.Client,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
spliterConf SpliterConfig,
isRawKv bool,
) (*Client, error) {
apiVerion, err := conn.GetTiKVApiVersion(context.Background(), pdClient, tlsConf)
Expand All @@ -72,9 +74,10 @@ func NewRestoreClient(
}
return &Client{
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
toolClient: NewSplitClient(pdClient, tlsConf, spliterConf, isRawKv),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
spliterConf: spliterConf,
switchCh: make(chan struct{}),
dstAPIVersion: apiVerion,
}, nil
Expand Down Expand Up @@ -132,7 +135,7 @@ func (rc *Client) InitBackupMeta(
}
rc.backupMeta = backupMeta

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.spliterConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
rc.backupMeta.ApiVersion)
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{
Timeout: 10 * time.Second,
}

var defaultSpliterCfg = SpliterConfig{
GRPCMaxRecvMsgSize: 1024 * 1024,
SplitRegionMaxKeys: 16,
}

type fakePDClient struct {
pd.Client
stores []*metapb.Store
Expand Down Expand Up @@ -106,7 +111,7 @@ func TestSetSpeedLimit(t *testing.T) {
// 1. The cost of concurrent communication is expected to be less than the cost of serial communication.
client, err := NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, defaultKeepaliveCfg, true)
}, nil, defaultKeepaliveCfg, defaultSpliterCfg, true)
require.NoError(t, err)
client.fileImporter = NewFileImporter(nil, FakeImporterClient{}, nil, true, kvrpcpb.APIVersion_V2)
ctx := context.Background()
Expand Down
104 changes: 67 additions & 37 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,23 @@ var (
ScanRegionAttemptTimes = 30
)

// SpliterConfig is the config for region split.
type SpliterConfig struct {
GRPCMaxRecvMsgSize int
SplitRegionMaxKeys int
}

// RegionSplitter is a executor of region split by rules.
type RegionSplitter struct {
client SplitClient
conf SpliterConfig
}

// NewRegionSplitter returns a new RegionSplitter.
func NewRegionSplitter(client SplitClient) *RegionSplitter {
func NewRegionSplitter(client SplitClient, conf SpliterConfig) *RegionSplitter {
return &RegionSplitter{
client: client,
conf: conf,
}
}

Expand Down Expand Up @@ -122,46 +130,68 @@ SplitRegions:
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}
for regionID, keys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
var newRegions []*RegionInfo
region := regionMap[regionID]
log.Info("split regions",
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
if errSplit != nil {
if strings.Contains(errSplit.Error(), "no valid key") {
for _, key := range keys {
// Region start/end keys are encoded. split_region RPC
// requires raw keys (without encoding).
log.Error("split regions no valid key",
logutil.Key("startKey", region.Region.StartKey),
logutil.Key("endKey", region.Region.EndKey),
logutil.Key("key", codec.EncodeBytes(nil, key)),
rtree.ZapRanges(ranges))
for regionID, regionKeys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(regionKeys)), zap.Uint64("region", regionID))

for i := 0; i < len(regionKeys); i += rs.conf.SplitRegionMaxKeys {
end := i + rs.conf.SplitRegionMaxKeys
if end > len(regionKeys) {
end = len(regionKeys)
}
keys := regionKeys[i:end]

var region *RegionInfo
if i == 0 {
region = regionMap[regionID]
} else {
encodedKey := codec.EncodeBytes(nil, keys[0])
var errGetRegion error
region, errGetRegion = rs.client.GetRegion(ctx, encodedKey)
if errGetRegion != nil {
time.Sleep(interval)
log.Warn("get region failed, retry", logutil.Key("encodedKey", encodedKey), zap.Error(errGetRegion))
continue SplitRegions
}
return errors.Trace(errSplit)
}
interval = 2 * interval
if interval > SplitMaxRetryInterval {
interval = SplitMaxRetryInterval

log.Info("split regions",
logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges))
var newRegions []*RegionInfo
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
if errSplit != nil {
if strings.Contains(errSplit.Error(), "no valid key") {
for _, key := range keys {
// Region start/end keys are encoded. split_region RPC
// requires raw keys (without encoding).
log.Error("split regions no valid key",
logutil.Key("startKey", region.Region.StartKey),
logutil.Key("endKey", region.Region.EndKey),
logutil.Key("key", codec.EncodeBytes(nil, key)),
rtree.ZapRanges(ranges))
}
return errors.Trace(errSplit)
}
interval = 2 * interval
if interval > SplitMaxRetryInterval {
interval = SplitMaxRetryInterval
}
time.Sleep(interval)
log.Warn("split regions failed, retry",
zap.Error(errSplit),
logutil.Region(region.Region),
logutil.Leader(region.Leader),
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
time.Sleep(interval)
log.Warn("split regions failed, retry",
zap.Error(errSplit),
logutil.Region(region.Region),
logutil.Leader(region.Leader),
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(keys)))
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(keys)))
}
scatterRegions = append(scatterRegions, newRegions...)
onSplit(keys)
}
scatterRegions = append(scatterRegions, newRegions...)
onSplit(keys)
}
break
}
Expand Down
30 changes: 19 additions & 11 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ type SplitClient interface {

// pdClient is a wrapper of pd client, can be used by RegionSplitter.
type pdClient struct {
mu sync.Mutex
client pd.Client
tlsConf *tls.Config
storeCache map[uint64]*metapb.Store
mu sync.Mutex
client pd.Client
tlsConf *tls.Config
spliterConf SpliterConfig
storeCache map[uint64]*metapb.Store

// FIXME when config changed during the lifetime of pdClient,
// this may mislead the scatter.
Expand All @@ -80,12 +81,13 @@ type pdClient struct {
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config, spliterConf SpliterConfig, isRawKv bool) SplitClient {
cli := &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
client: client,
tlsConf: tlsConf,
spliterConf: spliterConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
}
return cli
}
Expand Down Expand Up @@ -181,7 +183,12 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -305,7 +312,8 @@ func (c *pdClient) sendSplitRegionRequest(
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
conn, err := grpc.Dial(store.GetAddress(), opt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize)))
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func ScatterFinishInTimeImpl(t *testing.T, needEncodeKey bool) {
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := NewRegionSplitter(client)
regionSplitter := NewRegionSplitter(client, defaultSpliterCfg)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestSplitAndScatter(t *testing.T) {
func runTestSplitAndScatterWith(t *testing.T, client *TestClient, needEncodeKey bool) {
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := NewRegionSplitter(client)
regionSplitter := NewRegionSplitter(client, defaultSpliterCfg)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
func SplitRanges(
ctx context.Context,
client *Client,
spliterConf SpliterConfig,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool,
needEncodeKey bool,
) error {
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), spliterConf, isRawKv), spliterConf)

return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) {
updateCh.Inc()
Expand Down
Loading
Loading