diff --git a/.github/config/br_pd.toml b/.github/config/br_pd.toml index 526fa3fa..0e21317d 100644 --- a/.github/config/br_pd.toml +++ b/.github/config/br_pd.toml @@ -2,3 +2,8 @@ [replication] enable-placement-rules = true max-replicas = 1 + +[schedule] +max-merge-region-size = 2 +max-merge-region-keys = 40000 +split-merge-interval = "10s" diff --git a/.github/config/br_rawkv.toml b/.github/config/br_rawkv.toml index 2c9874fd..5372ec76 100644 --- a/.github/config/br_rawkv.toml +++ b/.github/config/br_rawkv.toml @@ -7,6 +7,12 @@ pd-heartbeat-tick-interval = "2s" pd-store-heartbeat-tick-interval = "5s" split-region-check-tick-interval = "1s" +# 10000000 keys & ~480MB data size in integration tests +[coprocessor] +region-split-size = "10MiB" +region-split-keys = 200000 +batch-split-limit = 100 + [rocksdb] max-open-files = 10000 @@ -14,4 +20,5 @@ max-open-files = 10000 max-open-files = 10000 [storage] +reserve-space = "0MiB" enable-ttl = true \ No newline at end of file diff --git a/br/Makefile b/br/Makefile index 94a12e1f..f0afc538 100644 --- a/br/Makefile +++ b/br/Makefile @@ -24,17 +24,18 @@ PACKAGES := go list ./... DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||' # build & test -BR_BIN_PATH ?= bin/tikv-br -TEST_BIN_PATH ?= bin/tikv-br.test -COVERAGE_DIR ?= build -TEST_PARALLEL ?= 8 -PD_ADDR ?= 127.0.0.1:2379 -BR_LOCAL_STORE ?= /tmp/backup_restore_test -API_VERSION ?= 1 -CLUSTER_VERSION ?= nightly -TLS_CA ?= -TLS_CERT ?= -TLS_KEY ?= +BR_BIN_PATH ?= bin/tikv-br +TEST_BIN_PATH ?= bin/tikv-br.test +COVERAGE_DIR ?= build +TEST_PARALLEL ?= 8 +PD_ADDR ?= 127.0.0.1:2379 +SPLIT_REGION_MAX_KEYS ?= 4 +BR_LOCAL_STORE ?= /tmp/backup_restore_test +API_VERSION ?= 1 +CLUSTER_VERSION ?= nightly +TLS_CA ?= +TLS_CERT ?= +TLS_KEY ?= LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)" LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')" @@ -71,6 +72,7 @@ test/integration: build/br-test build/rawkv-integration-test mkdir -p $(COVERAGE_DIR) ./bin/rawkv_test --pd=${PD_ADDR} \ --br='${TEST_BIN_PATH}' \ + --split-region-max-keys=${SPLIT_REGION_MAX_KEYS} \ --br-storage=${BR_LOCAL_STORE} \ --api-version=${API_VERSION} \ --cluster-version=${CLUSTER_VERSION} \ diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 0ee5b42c..7332ab95 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -44,6 +44,7 @@ type Client struct { workerPool *utils.WorkerPool tlsConf *tls.Config keepaliveConf keepalive.ClientParameters + spliterConf SpliterConfig backupMeta *backuppb.BackupMeta dstAPIVersion kvrpcpb.APIVersion @@ -64,6 +65,7 @@ func NewRestoreClient( pdClient pd.Client, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, + spliterConf SpliterConfig, isRawKv bool, ) (*Client, error) { apiVerion, err := conn.GetTiKVApiVersion(context.Background(), pdClient, tlsConf) @@ -72,9 +74,10 @@ func NewRestoreClient( } return &Client{ pdClient: pdClient, - toolClient: NewSplitClient(pdClient, tlsConf, isRawKv), + toolClient: NewSplitClient(pdClient, tlsConf, spliterConf, isRawKv), tlsConf: tlsConf, keepaliveConf: keepaliveConf, + spliterConf: spliterConf, switchCh: make(chan struct{}), dstAPIVersion: apiVerion, }, nil @@ -132,7 +135,7 @@ func (rc *Client) InitBackupMeta( } rc.backupMeta = backupMeta - metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv) + metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.spliterConf, rc.backupMeta.IsRawKv) importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.backupMeta.ApiVersion) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 352cb9ed..5dd0faf4 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -23,6 +23,11 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{ Timeout: 10 * time.Second, } +var defaultSpliterCfg = SpliterConfig{ + GRPCMaxRecvMsgSize: 1024 * 1024, + SplitRegionMaxKeys: 16, +} + type fakePDClient struct { pd.Client stores []*metapb.Store @@ -106,7 +111,7 @@ func TestSetSpeedLimit(t *testing.T) { // 1. The cost of concurrent communication is expected to be less than the cost of serial communication. client, err := NewRestoreClient(fakePDClient{ stores: mockStores, - }, nil, defaultKeepaliveCfg, true) + }, nil, defaultKeepaliveCfg, defaultSpliterCfg, true) require.NoError(t, err) client.fileImporter = NewFileImporter(nil, FakeImporterClient{}, nil, true, kvrpcpb.APIVersion_V2) ctx := context.Background() diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index f1cd6066..219e0779 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -53,15 +53,23 @@ var ( ScanRegionAttemptTimes = 30 ) +// SpliterConfig is the config for region split. +type SpliterConfig struct { + GRPCMaxRecvMsgSize int + SplitRegionMaxKeys int +} + // RegionSplitter is a executor of region split by rules. type RegionSplitter struct { client SplitClient + conf SpliterConfig } // NewRegionSplitter returns a new RegionSplitter. -func NewRegionSplitter(client SplitClient) *RegionSplitter { +func NewRegionSplitter(client SplitClient, conf SpliterConfig) *RegionSplitter { return &RegionSplitter{ client: client, + conf: conf, } } @@ -122,46 +130,68 @@ SplitRegions: for _, region := range regions { regionMap[region.Region.GetId()] = region } - for regionID, keys := range splitKeyMap { - log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID)) - var newRegions []*RegionInfo - region := regionMap[regionID] - log.Info("split regions", - logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges)) - newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys) - if errSplit != nil { - if strings.Contains(errSplit.Error(), "no valid key") { - for _, key := range keys { - // Region start/end keys are encoded. split_region RPC - // requires raw keys (without encoding). - log.Error("split regions no valid key", - logutil.Key("startKey", region.Region.StartKey), - logutil.Key("endKey", region.Region.EndKey), - logutil.Key("key", codec.EncodeBytes(nil, key)), - rtree.ZapRanges(ranges)) + for regionID, regionKeys := range splitKeyMap { + log.Info("get split keys for region", zap.Int("len", len(regionKeys)), zap.Uint64("region", regionID)) + + for i := 0; i < len(regionKeys); i += rs.conf.SplitRegionMaxKeys { + end := i + rs.conf.SplitRegionMaxKeys + if end > len(regionKeys) { + end = len(regionKeys) + } + keys := regionKeys[i:end] + + var region *RegionInfo + if i == 0 { + region = regionMap[regionID] + } else { + encodedKey := codec.EncodeBytes(nil, keys[0]) + var errGetRegion error + region, errGetRegion = rs.client.GetRegion(ctx, encodedKey) + if errGetRegion != nil { + time.Sleep(interval) + log.Warn("get region failed, retry", logutil.Key("encodedKey", encodedKey), zap.Error(errGetRegion)) + continue SplitRegions } - return errors.Trace(errSplit) } - interval = 2 * interval - if interval > SplitMaxRetryInterval { - interval = SplitMaxRetryInterval + + log.Info("split regions", + logutil.Region(region.Region), logutil.Keys(keys), rtree.ZapRanges(ranges)) + var newRegions []*RegionInfo + newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys) + if errSplit != nil { + if strings.Contains(errSplit.Error(), "no valid key") { + for _, key := range keys { + // Region start/end keys are encoded. split_region RPC + // requires raw keys (without encoding). + log.Error("split regions no valid key", + logutil.Key("startKey", region.Region.StartKey), + logutil.Key("endKey", region.Region.EndKey), + logutil.Key("key", codec.EncodeBytes(nil, key)), + rtree.ZapRanges(ranges)) + } + return errors.Trace(errSplit) + } + interval = 2 * interval + if interval > SplitMaxRetryInterval { + interval = SplitMaxRetryInterval + } + time.Sleep(interval) + log.Warn("split regions failed, retry", + zap.Error(errSplit), + logutil.Region(region.Region), + logutil.Leader(region.Leader), + logutil.Keys(keys), rtree.ZapRanges(ranges)) + continue SplitRegions } - time.Sleep(interval) - log.Warn("split regions failed, retry", - zap.Error(errSplit), - logutil.Region(region.Region), - logutil.Leader(region.Leader), - logutil.Keys(keys), rtree.ZapRanges(ranges)) - continue SplitRegions - } - log.Info("scattered regions", zap.Int("count", len(newRegions))) - if len(newRegions) != len(keys) { - log.Warn("split key count and new region count mismatch", - zap.Int("new region count", len(newRegions)), - zap.Int("split key count", len(keys))) + log.Info("scattered regions", zap.Int("count", len(newRegions))) + if len(newRegions) != len(keys) { + log.Warn("split key count and new region count mismatch", + zap.Int("new region count", len(newRegions)), + zap.Int("split key count", len(keys))) + } + scatterRegions = append(scatterRegions, newRegions...) + onSplit(keys) } - scatterRegions = append(scatterRegions, newRegions...) - onSplit(keys) } break } diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index 79646d71..ae741e1a 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -66,10 +66,11 @@ type SplitClient interface { // pdClient is a wrapper of pd client, can be used by RegionSplitter. type pdClient struct { - mu sync.Mutex - client pd.Client - tlsConf *tls.Config - storeCache map[uint64]*metapb.Store + mu sync.Mutex + client pd.Client + tlsConf *tls.Config + spliterConf SpliterConfig + storeCache map[uint64]*metapb.Store // FIXME when config changed during the lifetime of pdClient, // this may mislead the scatter. @@ -80,12 +81,13 @@ type pdClient struct { } // NewSplitClient returns a client used by RegionSplitter. -func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient { +func NewSplitClient(client pd.Client, tlsConf *tls.Config, spliterConf SpliterConfig, isRawKv bool) SplitClient { cli := &pdClient{ - client: client, - tlsConf: tlsConf, - storeCache: make(map[uint64]*metapb.Store), - isRawKv: isRawKv, + client: client, + tlsConf: tlsConf, + spliterConf: spliterConf, + storeCache: make(map[uint64]*metapb.Store), + isRawKv: isRawKv, } return cli } @@ -181,7 +183,12 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key if err != nil { return nil, errors.Trace(err) } - conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) + opt := grpc.WithInsecure() + if c.tlsConf != nil { + opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) + } + conn, err := grpc.Dial(store.GetAddress(), opt, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize))) if err != nil { return nil, errors.Trace(err) } @@ -305,7 +312,8 @@ func (c *pdClient) sendSplitRegionRequest( if c.tlsConf != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) } - conn, err := grpc.Dial(store.GetAddress(), opt) + conn, err := grpc.Dial(store.GetAddress(), opt, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.spliterConf.GRPCMaxRecvMsgSize))) if err != nil { return nil, multierr.Append(splitErrors, err) } diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index fa907597..6da670c0 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -275,7 +275,7 @@ func ScatterFinishInTimeImpl(t *testing.T, needEncodeKey bool) { client := initTestClient() ranges := initRanges() rewriteRules := initRewriteRules() - regionSplitter := NewRegionSplitter(client) + regionSplitter := NewRegionSplitter(client, defaultSpliterCfg) ctx := context.Background() err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true" @@ -335,7 +335,7 @@ func TestSplitAndScatter(t *testing.T) { func runTestSplitAndScatterWith(t *testing.T, client *TestClient, needEncodeKey bool) { ranges := initRanges() rewriteRules := initRewriteRules() - regionSplitter := NewRegionSplitter(client) + regionSplitter := NewRegionSplitter(client, defaultSpliterCfg) ctx := context.Background() err := regionSplitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true" diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 734123a2..1306244d 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -109,13 +109,14 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit func SplitRanges( ctx context.Context, client *Client, + spliterConf SpliterConfig, ranges []rtree.Range, rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool, needEncodeKey bool, ) error { - splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv)) + splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), spliterConf, isRawKv), spliterConf) return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) { updateCh.Inc() diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 20dff1ac..9cfb6392 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/migration/br/pkg/feature" "github.com/tikv/migration/br/pkg/glue" "github.com/tikv/migration/br/pkg/metautil" + "github.com/tikv/migration/br/pkg/restore" "github.com/tikv/migration/br/pkg/storage" "github.com/tikv/migration/br/pkg/utils" pd "github.com/tikv/pd/client" @@ -56,14 +57,19 @@ const ( flagGrpcKeepaliveTime = "grpc-keepalive-time" // flagGrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed. flagGrpcKeepaliveTimeout = "grpc-keepalive-timeout" + // flagGrpcMaxRecvMsgSize is the max allowed size of a message received from tikv. + flagGrpcMaxRecvMsgSize = "grpc-max-recv-msg-size" // flagEnableOpenTracing is whether to enable opentracing - flagEnableOpenTracing = "enable-opentracing" - flagSkipCheckPath = "skip-check-path" + flagEnableOpenTracing = "enable-opentracing" + flagSkipCheckPath = "skip-check-path" + flagSplitRegionMaxKeys = "split-region-max-keys" defaultSwitchInterval = 5 * time.Minute defaultGRPCKeepaliveTime = 10 * time.Second defaultGRPCKeepaliveTimeout = 3 * time.Second + defaultGRPCMaxRecvMsgSize = 32 * 1024 * 1024 // 32MB defaultChecksumConcurrency = 512 + defaultSplitRegionMaxKeys = 1024 flagCipherType = "crypter.method" flagCipherKey = "crypter.key" @@ -105,8 +111,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) { "the interval of pinging gRPC peer, must keep the same value with TiKV and PD") flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout, "the max time a gRPC connection can keep idle before killed, must keep the same value with TiKV and PD") + flags.Uint(flagGrpcMaxRecvMsgSize, defaultGRPCMaxRecvMsgSize, + "the max allowed size of a message received from TiKV") _ = flags.MarkHidden(flagGrpcKeepaliveTime) _ = flags.MarkHidden(flagGrpcKeepaliveTimeout) + _ = flags.MarkHidden(flagGrpcMaxRecvMsgSize) flags.Bool(flagEnableOpenTracing, false, "Set whether to enable opentracing during the backup/restore process") @@ -115,6 +124,9 @@ func DefineCommonFlags(flags *pflag.FlagSet) { _ = flags.MarkHidden(flagNoCreds) flags.BoolP(flagSkipCheckPath, "", false, "Skip path verification") _ = flags.MarkHidden(flagSkipCheckPath) + flags.Uint(flagSplitRegionMaxKeys, defaultSplitRegionMaxKeys, + "The max number of keys in a single split region request") + _ = flags.MarkHidden(flagSplitRegionMaxKeys) flags.String(flagCipherType, "plaintext", "Encrypt/decrypt method, "+ "be one of plaintext|aes128-ctr|aes192-ctr|aes256-ctr case-insensitively, "+ @@ -333,6 +345,14 @@ func GetKeepalive(cfg *Config) keepalive.ClientParameters { } } +// GetSpliterConfig get the spliter config. +func GetSpliterConfig(cfg *Config) restore.SpliterConfig { + return restore.SpliterConfig{ + GRPCMaxRecvMsgSize: int(cfg.GRPCMaxRecvMsgSize), + SplitRegionMaxKeys: int(cfg.SplitRegionMaxKeys), + } +} + func normalizePDURL(pd string, useTLS bool) (string, error) { if strings.HasPrefix(pd, "http://") { if useTLS { diff --git a/br/pkg/task/config.go b/br/pkg/task/config.go index 3dc63c19..297a338c 100644 --- a/br/pkg/task/config.go +++ b/br/pkg/task/config.go @@ -64,6 +64,11 @@ type Config struct { GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"` // GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed. GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"` + // GRPCMaxRecvMsgSize is the max allowed size of a message received from tikv. + GRPCMaxRecvMsgSize uint `json:"grpc-max-recv-msg-size" toml:"grpc-max-recv-msg-size"` + + // SplitRegionMaxKeys is the max number of keys in a single split region request. + SplitRegionMaxKeys uint `json:"split-region-max-keys" toml:"split-region-max-keys"` CipherInfo backuppb.CipherInfo `json:"-" toml:"-"` } @@ -165,10 +170,18 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + cfg.GRPCMaxRecvMsgSize, err = flags.GetUint(flagGrpcMaxRecvMsgSize) + if err != nil { + return errors.Trace(err) + } cfg.EnableOpenTracing, err = flags.GetBool(flagEnableOpenTracing) if err != nil { return errors.Trace(err) } + cfg.SplitRegionMaxKeys, err = flags.GetUint(flagSplitRegionMaxKeys) + if err != nil { + return errors.Trace(err) + } if cfg.SwitchModeInterval <= 0 { return errors.Annotatef(berrors.ErrInvalidArgument, "--switch-mode-interval must be positive, %s is not allowed", cfg.SwitchModeInterval) @@ -213,4 +226,11 @@ func (cfg *Config) adjust() { if cfg.ChecksumConcurrency == 0 { cfg.ChecksumConcurrency = defaultChecksumConcurrency } + if cfg.GRPCMaxRecvMsgSize == 0 { + cfg.GRPCMaxRecvMsgSize = defaultGRPCMaxRecvMsgSize + } + + if cfg.SplitRegionMaxKeys == 0 { + cfg.SplitRegionMaxKeys = defaultSplitRegionMaxKeys + } } diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 76b2b40d..d310b9e3 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -54,11 +54,12 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Errorf("Current tikv cluster version %s does not support checksum, please disable checksum", clusterVersion) } + spliterCfg := GetSpliterConfig(&cfg.Config) keepaliveCfg := GetKeepalive(&cfg.Config) // sometimes we have pooled the connections. // sending heartbeats in idle times is useful. keepaliveCfg.PermitWithoutStream = true - client, err := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, true) + client, err := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, spliterCfg, true) if err != nil { return errors.Trace(err) } @@ -122,7 +123,8 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR // RawKV restore does not need to rewrite keys. if featureGate.IsEnabled(feature.SplitRegion) { needEncodeKey := (cfg.DstAPIVersion == kvrpcpb.APIVersion_V2.String()) - err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, true, needEncodeKey) + spliterConf := GetSpliterConfig(&cfg.Config) + err = restore.SplitRanges(ctx, client, spliterConf, ranges, nil, updateCh, true, needEncodeKey) if err != nil { return errors.Trace(err) } diff --git a/br/tests/rawkv/main.go b/br/tests/rawkv/main.go index f047bcbc..b2287a02 100644 --- a/br/tests/rawkv/main.go +++ b/br/tests/rawkv/main.go @@ -33,17 +33,18 @@ var ( maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response maxBatchSize = int(1024) // max batch size with BatchPut - keyCnt = flag.Uint("keycnt", 10000000, "KeyCnt of testing") - thread = flag.Uint("thread", 500, "Thread of preloading data") - pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD") - apiVersionInt = flag.Uint("api-version", 1, "Api version of tikv-server") - clusterVersion = flag.String("cluster-version", "v6.1.0", "Version of tikv cluster") - br = flag.String("br", "br", "The br binary to be tested.") - brStorage = flag.String("br-storage", "local:///tmp/backup_restore_test", "The url to store SST files of backup/resotre.") - coverageDir = flag.String("coverage", "", "The coverage profile file dir of test.") - tlsCA = flag.String("ca", "", "TLS CA for tikv cluster") - tlsCert = flag.String("cert", "", "TLS CERT for tikv cluster") - tlsKey = flag.String("key", "", "TLS KEY for tikv cluster") + keyCnt = flag.Uint("keycnt", 10000000, "KeyCnt of testing") + thread = flag.Uint("thread", 500, "Thread of preloading data") + pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD") + splitRegionMaxKeys = flag.Uint("split-region-max-keys", 4, "Maximum number of keys in a split region") + apiVersionInt = flag.Uint("api-version", 1, "Api version of tikv-server") + clusterVersion = flag.String("cluster-version", "v6.1.0", "Version of tikv cluster") + br = flag.String("br", "br", "The br binary to be tested.") + brStorage = flag.String("br-storage", "local:///tmp/backup_restore_test", "The url to store SST files of backup/resotre.") + coverageDir = flag.String("coverage", "", "The coverage profile file dir of test.") + tlsCA = flag.String("ca", "", "TLS CA for tikv cluster") + tlsCert = flag.String("cert", "", "TLS CERT for tikv cluster") + tlsKey = flag.String("key", "", "TLS KEY for tikv cluster") ) type RawKVBRTester struct { @@ -217,6 +218,8 @@ func (t *RawKVBRTester) Backup(ctx context.Context, dstAPIVersion kvrpcpb.APIVer dstAPIVersionStr = dstAPIVersion.String() } brCmdStr := brCmd.Pd(t.pdAddr). + SplitRegionMaxKeys(*splitRegionMaxKeys). + GRPCMaxRecvMsgSize(uint(maxMsgSize)). Storage(t.brStorage, true). CheckReq(false). DstApiVersion(dstAPIVersionStr). @@ -235,6 +238,8 @@ func (t *RawKVBRTester) Backup(ctx context.Context, dstAPIVersion kvrpcpb.APIVer func (t *RawKVBRTester) Restore(ctx context.Context, startKey, endKey []byte) ([]byte, error) { brCmd := NewTiKVBrCmd("restore raw") brCmdStr := brCmd.Pd(t.pdAddr). + SplitRegionMaxKeys(*splitRegionMaxKeys). + GRPCMaxRecvMsgSize(uint(maxMsgSize)). Storage(t.brStorage, true). StartKey(startKey). EndKey(endKey). diff --git a/br/tests/rawkv/tikv_br.go b/br/tests/rawkv/tikv_br.go index 92c7622c..c427541c 100644 --- a/br/tests/rawkv/tikv_br.go +++ b/br/tests/rawkv/tikv_br.go @@ -27,6 +27,16 @@ func (b *TiKVBrRunCmd) Pd(pd string) *TiKVBrRunCmd { return b } +func (b *TiKVBrRunCmd) SplitRegionMaxKeys(maxKeys uint) *TiKVBrRunCmd { + b.options = append(b.options, fmt.Sprintf("--split-region-max-keys=%d", maxKeys)) + return b +} + +func (b *TiKVBrRunCmd) GRPCMaxRecvMsgSize(size uint) *TiKVBrRunCmd { + b.options = append(b.options, fmt.Sprintf("--grpc-max-recv-msg-size=%d", size)) + return b +} + func (b *TiKVBrRunCmd) Storage(storage string, isLocal bool) *TiKVBrRunCmd { b.options = append(b.options, fmt.Sprintf("--storage=%s", storage)) b.local = isLocal