Skip to content

Commit f7c8ecf

Browse files
Add an option to pass numInsertionWorkersPerCol from command line and from config file
1 parent 8d47d0b commit f7c8ecf

File tree

7 files changed

+57
-41
lines changed

7 files changed

+57
-41
lines changed

cmd/pbm-agent/oplog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
7777
}
7878

7979
l.Info("oplog replay started")
80-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0)
80+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 0)
8181
err = rr.ReplayOplog(ctx, r, opID, l)
8282
if err != nil {
8383
if errors.Is(err, restore.ErrNoDataForShard) {

cmd/pbm-agent/restore.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ import (
1616
"github.com/percona/percona-backup-mongodb/pbm/topo"
1717
)
1818

19+
const (
20+
numInsertionWorkersDefault = 1
21+
)
22+
1923
func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, ep config.Epoch) {
2024
logger := log.FromContext(ctx)
2125
if r == nil {
@@ -128,7 +132,14 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
128132
numParallelColls = cfg.Restore.NumParallelCollections
129133
}
130134

131-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls)
135+
numInsertionWorkersPerCol := numInsertionWorkersDefault
136+
if r.NumInsertionWorkers != nil && *r.NumInsertionWorkers > 0 {
137+
numInsertionWorkersPerCol = int(*r.NumInsertionWorkers)
138+
} else if cfg.Restore != nil && cfg.Restore.NumInsertionWorkers > 0 {
139+
numInsertionWorkersPerCol = cfg.Restore.NumInsertionWorkers
140+
}
141+
142+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
132143
if r.OplogTS.IsZero() {
133144
err = rr.Snapshot(ctx, r, opid, bcp)
134145
} else {

cmd/pbm/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ func main() {
244244
StringVar(&restore.pitrBase)
245245
restoreCmd.Flag("num-parallel-collections", "Number of parallel collections").
246246
Int32Var(&restore.numParallelColls)
247+
restoreCmd.Flag("num-insertion-workers-per-collection", "Specifies the number of insertion workers to run concurrently per collection. For large imports, increasing the number of insertion workers may increase the speed of the import.").
248+
Int32Var(&restore.numInsertionWorkers)
247249
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
248250
StringVar(&restore.ns)
249251
restoreCmd.Flag("ns-from", "Allows collection cloning (creating from the backup with different name) "+

cmd/pbm/restore.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type restoreOpts struct {
5353
conf string
5454
ts string
5555

56-
numParallelColls int32
56+
numParallelColls int32
57+
numInsertionWorkers int32
5758
}
5859

5960
type restoreRet struct {
@@ -124,6 +125,10 @@ func runRestore(
124125
if err != nil {
125126
return nil, errors.Wrap(err, "parse --num-parallel-collections option")
126127
}
128+
numInsertionWorkers, err := parseCLINumInsertionWorkersOption(o.numInsertionWorkers)
129+
if err != nil {
130+
return nil, errors.Wrap(err, "parse --num-insertion-workers option")
131+
}
127132
nss, err := parseCLINSOption(o.ns)
128133
if err != nil {
129134
return nil, errors.Wrap(err, "parse --ns option")
@@ -154,7 +159,7 @@ func runRestore(
154159
}
155160
tdiff := time.Now().Unix() - int64(clusterTime.T)
156161

157-
m, err := doRestore(ctx, conn, o, numParallelColls, nss, o.nsFrom, o.nsTo, rsMap, node, outf)
162+
m, err := doRestore(ctx, conn, o, numParallelColls, numInsertionWorkers, nss, o.nsFrom, o.nsTo, rsMap, node, outf)
158163
if err != nil {
159164
return nil, err
160165
}
@@ -375,6 +380,7 @@ func doRestore(
375380
conn connect.Client,
376381
o *restoreOpts,
377382
numParallelColls *int32,
383+
numInsertionWorkers *int32,
378384
nss []string,
379385
nsFrom string,
380386
nsTo string,
@@ -399,15 +405,16 @@ func doRestore(
399405
cmd := ctrl.Cmd{
400406
Cmd: ctrl.CmdRestore,
401407
Restore: &ctrl.RestoreCmd{
402-
Name: name,
403-
BackupName: bcp,
404-
NumParallelColls: numParallelColls,
405-
Namespaces: nss,
406-
NamespaceFrom: nsFrom,
407-
NamespaceTo: nsTo,
408-
UsersAndRoles: o.usersAndRoles,
409-
RSMap: rsMapping,
410-
External: o.extern,
408+
Name: name,
409+
BackupName: bcp,
410+
NumParallelColls: numParallelColls,
411+
NumInsertionWorkers: numInsertionWorkers,
412+
Namespaces: nss,
413+
NamespaceFrom: nsFrom,
414+
NamespaceTo: nsTo,
415+
UsersAndRoles: o.usersAndRoles,
416+
RSMap: rsMapping,
417+
External: o.extern,
411418
},
412419
}
413420
if o.pitr != "" {
@@ -804,3 +811,14 @@ func validateNSFromNSTo(o *restoreOpts) error {
804811

805812
return nil
806813
}
814+
815+
func parseCLINumInsertionWorkersOption(value int32) (*int32, error) {
816+
if value < 0 {
817+
return nil, errors.New("Number of insertion workers has to be greater than zero.")
818+
}
819+
if value == 0 {
820+
return nil, nil //nolint:nilnil
821+
}
822+
823+
return &value, nil
824+
}

pbm/ctrl/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ type RestoreCmd struct {
157157
UsersAndRoles bool `bson:"usersAndRoles,omitempty"`
158158
RSMap map[string]string `bson:"rsMap,omitempty"`
159159

160-
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
160+
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
161+
NumInsertionWorkers *int32 `bson:"numInsertionWorkers,omitempty"`
161162

162163
OplogTS primitive.Timestamp `bson:"oplogTS,omitempty"`
163164

pbm/restore/logical.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ type Restore struct {
4848
bcpStg storage.Storage
4949
oplogStg storage.Storage
5050

51-
numParallelColls int
51+
numParallelColls int
52+
numInsertionWorkersPerCol int
5253
// Shards to participate in restore. Num of shards in bcp could
5354
// be less than in the cluster and this is ok. Only these shards
5455
// would be expected to run restore (distributed transactions sync,
@@ -79,14 +80,7 @@ type oplogRange struct {
7980
type restoreUsersAndRolesOption bool
8081

8182
// New creates a new restore object
82-
func New(
83-
leadConn connect.Client,
84-
nodeConn *mongo.Client,
85-
brief topo.NodeBrief,
86-
cfg *config.Config,
87-
rsMap map[string]string,
88-
numParallelColls int,
89-
) *Restore {
83+
func New(leadConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief, cfg *config.Config, rsMap map[string]string, numParallelColls, numInsertionWorkersPerCol int) *Restore {
9084
if rsMap == nil {
9185
rsMap = make(map[string]string)
9286
}
@@ -99,9 +93,9 @@ func New(
9993

10094
cfg: cfg,
10195

102-
numParallelColls: numParallelColls,
103-
104-
indexCatalog: idx.NewIndexCatalog(),
96+
numParallelColls: numParallelColls,
97+
numInsertionWorkersPerCol: numInsertionWorkersPerCol,
98+
indexCatalog: idx.NewIndexCatalog(),
10599
}
106100
}
107101

@@ -1330,7 +1324,7 @@ func (r *Restore) applyOplog(ctx context.Context, ranges []oplogRange, options *
13301324
}
13311325

13321326
func (r *Restore) snapshot(input io.Reader, cloneNS snapshot.CloneNS, excludeRouterCollections bool) error {
1333-
rf, err := snapshot.NewRestore(r.brief.URI, r.cfg, cloneNS, r.numParallelColls, excludeRouterCollections)
1327+
rf, err := snapshot.NewRestore(r.brief.URI, r.cfg, cloneNS, r.numParallelColls, r.numInsertionWorkersPerCol, excludeRouterCollections)
13341328
if err != nil {
13351329
return err
13361330
}

pbm/snapshot/restore.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import (
1616
const (
1717
preserveUUID = true
1818

19-
batchSizeDefault = 500
20-
numInsertionWorkersDefault = 1
19+
batchSizeDefault = 500
2120
)
2221

2322
var ExcludeFromRestore = []string{
@@ -54,13 +53,7 @@ func (c *CloneNS) IsSpecified() bool {
5453
return c.FromNS != "" && c.ToNS != ""
5554
}
5655

57-
func NewRestore(
58-
uri string,
59-
cfg *config.Config,
60-
cloneNS CloneNS,
61-
numParallelColls int,
62-
excludeRouterCollections bool,
63-
) (io.ReaderFrom, error) {
56+
func NewRestore(uri string, cfg *config.Config, cloneNS CloneNS, numParallelColls int, numInsertionWorkersPerCol int, excludeRouterCollections bool) (io.ReaderFrom, error) {
6457
topts := options.New("mongorestore",
6558
"0.0.1",
6659
"none",
@@ -90,10 +83,7 @@ func NewRestore(
9083
if cfg.Restore.BatchSize > 0 {
9184
batchSize = cfg.Restore.BatchSize
9285
}
93-
numInsertionWorkers := numInsertionWorkersDefault
94-
if cfg.Restore.NumInsertionWorkers > 0 {
95-
numInsertionWorkers = cfg.Restore.NumInsertionWorkers
96-
}
86+
9787
if numParallelColls < 1 {
9888
numParallelColls = 1
9989
}
@@ -119,7 +109,7 @@ func NewRestore(
119109
BulkBufferSize: batchSize,
120110
BypassDocumentValidation: true,
121111
Drop: true,
122-
NumInsertionWorkers: numInsertionWorkers,
112+
NumInsertionWorkers: numInsertionWorkersPerCol,
123113
NumParallelCollections: numParallelColls,
124114
PreserveUUID: preserveUUID,
125115
StopOnError: true,

0 commit comments

Comments
 (0)