Skip to content

Commit 0f11008

Browse files
committed
PBM-1400: add num-parallel-collections to pbm config
1 parent a69ed32 commit 0f11008

File tree

5 files changed

+33
-13
lines changed

5 files changed

+33
-13
lines changed

cmd/pbm-agent/backup.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
114114
case defs.LogicalBackup:
115115
fallthrough
116116
default:
117-
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.numParallelColls)
117+
numParallelColls := a.numParallelColls
118+
if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 {
119+
numParallelColls = cfg.Backup.NumParallelCollections
120+
}
121+
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls)
118122
}
119123

120124
bcp.SetConfig(cfg)

cmd/pbm-agent/oplog.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,14 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
7070
}
7171
}()
7272

73+
cfg, err := config.GetConfig(ctx, a.leadConn)
74+
if err != nil {
75+
l.Error("get PBM config: %v", err)
76+
return
77+
}
78+
7379
l.Info("oplog replay started")
74-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, 0)
80+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0)
7581
err = rr.ReplayOplog(ctx, r, opID, l)
7682
if err != nil {
7783
if errors.Is(err, restore.ErrNoDataForShard) {

cmd/pbm-agent/restore.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
106106
r.BackupName = bcp.Name
107107
}
108108

109+
cfg, err := config.GetConfig(ctx, a.leadConn)
110+
if err != nil {
111+
l.Error("get PBM configuration: %v", err)
112+
return
113+
}
114+
109115
l.Info("recovery started")
110116

111117
switch bcpType {
@@ -118,9 +124,11 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
118124
numParallelColls := runtime.NumCPU() / 2
119125
if r.NumParallelColls != nil && *r.NumParallelColls > 0 {
120126
numParallelColls = int(*r.NumParallelColls)
127+
} else if cfg.Restore != nil && cfg.Restore.NumParallelCollections > 0 {
128+
numParallelColls = cfg.Restore.NumParallelCollections
121129
}
122130

123-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, numParallelColls)
131+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls)
124132
if r.OplogTS.IsZero() {
125133
err = rr.Snapshot(ctx, r, opid, bcp)
126134
} else {

pbm/config/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,9 @@ type RestoreConf struct {
321321
// Logical restore
322322
//
323323
// num of documents to buffer
324-
BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"`
325-
NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"`
324+
BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"`
325+
NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"`
326+
NumParallelCollections int `bson:"numParallelCollections" json:"numParallelCollections,omitempty" yaml:"numParallelCollections,omitempty"`
326327

327328
// NumDownloadWorkers sets the num of goroutine would be requesting chunks
328329
// during the download. By default, it's set to GOMAXPROCS.
@@ -361,6 +362,8 @@ type BackupConf struct {
361362
Timeouts *BackupTimeouts `bson:"timeouts,omitempty" json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
362363
Compression compress.CompressionType `bson:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"`
363364
CompressionLevel *int `bson:"compressionLevel,omitempty" json:"compressionLevel,omitempty" yaml:"compressionLevel,omitempty"`
365+
366+
NumParallelCollections int `bson:"numParallelCollections" json:"numParallelCollections,omitempty" yaml:"numParallelCollections,omitempty"`
364367
}
365368

366369
func (cfg *BackupConf) Clone() *BackupConf {

pbm/restore/logical.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type Restore struct {
4444
brief topo.NodeBrief
4545
stopHB chan struct{}
4646
nodeInfo *topo.NodeInfo
47+
cfg *config.Config
4748
bcpStg storage.Storage
4849
oplogStg storage.Storage
4950

@@ -82,6 +83,7 @@ func New(
8283
leadConn connect.Client,
8384
nodeConn *mongo.Client,
8485
brief topo.NodeBrief,
86+
cfg *config.Config,
8587
rsMap map[string]string,
8688
numParallelColls int,
8789
) *Restore {
@@ -95,6 +97,8 @@ func New(
9597
brief: brief,
9698
rsMap: rsMap,
9799

100+
cfg: cfg,
101+
98102
numParallelColls: numParallelColls,
99103

100104
indexCatalog: idx.NewIndexCatalog(),
@@ -835,7 +839,7 @@ func (r *Restore) RunSnapshot(
835839
defer rdr.Close()
836840

837841
// Restore snapshot (mongorestore)
838-
err = r.snapshot(ctx, rdr)
842+
err = r.snapshot(rdr)
839843
if err != nil {
840844
return errors.Wrap(err, "mongorestore")
841845
}
@@ -1188,13 +1192,8 @@ func (r *Restore) applyOplog(ctx context.Context, ranges []oplogRange, options *
11881192
return nil
11891193
}
11901194

1191-
func (r *Restore) snapshot(ctx context.Context, input io.Reader) error {
1192-
cfg, err := config.GetConfig(ctx, r.leadConn)
1193-
if err != nil {
1194-
return errors.Wrap(err, "unable to get PBM config settings")
1195-
}
1196-
1197-
rf, err := snapshot.NewRestore(r.brief.URI, cfg, r.numParallelColls)
1195+
func (r *Restore) snapshot(input io.Reader) error {
1196+
rf, err := snapshot.NewRestore(r.brief.URI, r.cfg, r.numParallelColls)
11981197
if err != nil {
11991198
return err
12001199
}

0 commit comments

Comments
 (0)