Skip to content

Commit 9d29213

Browse files
Merge pull request #1053 from percona/PBM-1414-set-new-default-for-number-of-parallel-collections
PBM-1414: Set new default for number of parallel collections
2 parents 08cfa64 + 8eea8c8 commit 9d29213

File tree

10 files changed

+115
-34
lines changed

10 files changed

+115
-34
lines changed

cmd/pbm-agent/oplog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
7777
}
7878

7979
l.Info("oplog replay started")
80-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0)
80+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1)
8181
err = rr.ReplayOplog(ctx, r, opID, l)
8282
if err != nil {
8383
if errors.Is(err, restore.ErrNoDataForShard) {

cmd/pbm-agent/restore.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import (
1717
"github.com/percona/percona-backup-mongodb/pbm/topo"
1818
)
1919

20+
const (
21+
numInsertionWorkersDefault = 1
22+
)
23+
2024
func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, ep config.Epoch) {
2125
logger := log.FromContext(ctx)
2226
if r == nil {
@@ -138,7 +142,14 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
138142
numParallelColls = cfg.Restore.NumParallelCollections
139143
}
140144

141-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls)
145+
numInsertionWorkersPerCol := numInsertionWorkersDefault
146+
if r.NumInsertionWorkers != nil && *r.NumInsertionWorkers > 0 {
147+
numInsertionWorkersPerCol = int(*r.NumInsertionWorkers)
148+
} else if cfg.Restore != nil && cfg.Restore.NumInsertionWorkers > 0 {
149+
numInsertionWorkersPerCol = cfg.Restore.NumInsertionWorkers
150+
}
151+
152+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
142153
if r.OplogTS.IsZero() {
143154
err = rr.Snapshot(ctx, r, opid, bcp)
144155
} else {

cmd/pbm/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,11 @@ func main() {
244244
StringVar(&restore.pitrBase)
245245
restoreCmd.Flag("num-parallel-collections", "Number of parallel collections").
246246
Int32Var(&restore.numParallelColls)
247+
restoreCmd.Flag("num-insertion-workers-per-collection",
248+
"Specifies the number of insertion workers to run concurrently per collection. For large imports, "+
249+
"increasing the number of insertion workers may increase the speed of the import.").
250+
Default("1").
251+
Int32Var(&restore.numInsertionWorkers)
247252
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
248253
StringVar(&restore.ns)
249254
restoreCmd.Flag("ns-from", "Allows collection cloning (creating from the backup with different name) "+

cmd/pbm/restore.go

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type restoreOpts struct {
5353
conf string
5454
ts string
5555

56-
numParallelColls int32
56+
numParallelColls int32
57+
numInsertionWorkers int32
5758
}
5859

5960
type restoreRet struct {
@@ -124,6 +125,10 @@ func runRestore(
124125
if err != nil {
125126
return nil, errors.Wrap(err, "parse --num-parallel-collections option")
126127
}
128+
numInsertionWorkers, err := parseCLINumInsertionWorkersOption(o.numInsertionWorkers)
129+
if err != nil {
130+
return nil, errors.Wrap(err, "parse --num-insertion-workers option")
131+
}
127132
nss, err := parseCLINSOption(o.ns)
128133
if err != nil {
129134
return nil, errors.Wrap(err, "parse --ns option")
@@ -154,7 +159,7 @@ func runRestore(
154159
}
155160
tdiff := time.Now().Unix() - int64(clusterTime.T)
156161

157-
m, err := doRestore(ctx, conn, o, numParallelColls, nss, o.nsFrom, o.nsTo, rsMap, node, outf)
162+
m, err := doRestore(ctx, conn, o, numParallelColls, numInsertionWorkers, nss, o.nsFrom, o.nsTo, rsMap, node, outf)
158163
if err != nil {
159164
return nil, err
160165
}
@@ -353,12 +358,12 @@ func nsIsTaken(
353358
ns string,
354359
) error {
355360
ns = strings.TrimSpace(ns)
356-
db, coll, ok := strings.Cut(ns, ".")
361+
dbName, coll, ok := strings.Cut(ns, ".")
357362
if !ok {
358363
return errors.Wrap(ErrInvalidNamespace, ns)
359364
}
360365

361-
collNames, err := conn.MongoClient().Database(db).ListCollectionNames(ctx, bson.D{{"name", coll}})
366+
collNames, err := conn.MongoClient().Database(dbName).ListCollectionNames(ctx, bson.D{{"name", coll}})
362367
if err != nil {
363368
return errors.Wrap(err, "list collection names for cloning target validation")
364369
}
@@ -375,6 +380,7 @@ func doRestore(
375380
conn connect.Client,
376381
o *restoreOpts,
377382
numParallelColls *int32,
383+
numInsertionWorkers *int32,
378384
nss []string,
379385
nsFrom string,
380386
nsTo string,
@@ -399,15 +405,16 @@ func doRestore(
399405
cmd := ctrl.Cmd{
400406
Cmd: ctrl.CmdRestore,
401407
Restore: &ctrl.RestoreCmd{
402-
Name: name,
403-
BackupName: bcp,
404-
NumParallelColls: numParallelColls,
405-
Namespaces: nss,
406-
NamespaceFrom: nsFrom,
407-
NamespaceTo: nsTo,
408-
UsersAndRoles: o.usersAndRoles,
409-
RSMap: rsMapping,
410-
External: o.extern,
408+
Name: name,
409+
BackupName: bcp,
410+
NumParallelColls: numParallelColls,
411+
NumInsertionWorkers: numInsertionWorkers,
412+
Namespaces: nss,
413+
NamespaceFrom: nsFrom,
414+
NamespaceTo: nsTo,
415+
UsersAndRoles: o.usersAndRoles,
416+
RSMap: rsMapping,
417+
External: o.extern,
411418
},
412419
}
413420
if o.pitr != "" {
@@ -804,3 +811,14 @@ func validateNSFromNSTo(o *restoreOpts) error {
804811

805812
return nil
806813
}
814+
815+
func parseCLINumInsertionWorkersOption(value int32) (*int32, error) {
816+
if value < 0 {
817+
return nil, errors.New("Number of insertion workers has to be greater than zero.")
818+
}
819+
if value == 0 {
820+
return nil, nil //nolint:nilnil
821+
}
822+
823+
return &value, nil
824+
}

cmd/pbm/restore_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"errors"
5+
"reflect"
56
"testing"
67
)
78

@@ -101,3 +102,45 @@ func TestCloningValidation(t *testing.T) {
101102
})
102103
}
103104
}
105+
106+
func TestParseCLINumInsertionWorkersOption(t *testing.T) {
107+
var num int32 = 1
108+
109+
tests := []struct {
110+
name string
111+
value int32
112+
want *int32
113+
wantErr bool
114+
}{
115+
{
116+
name: "valid number - no error",
117+
value: 1,
118+
want: &num,
119+
wantErr: false,
120+
},
121+
{
122+
name: "zero - no error, but return nil",
123+
value: 0,
124+
want: nil,
125+
wantErr: false,
126+
},
127+
{
128+
name: "negative value - error",
129+
value: -1,
130+
want: nil,
131+
wantErr: true,
132+
},
133+
}
134+
for _, tt := range tests {
135+
t.Run(tt.name, func(t *testing.T) {
136+
got, err := parseCLINumInsertionWorkersOption(tt.value)
137+
if (err != nil) != tt.wantErr {
138+
t.Errorf("parseCLINumInsertionWorkersOption() error = %v, wantErr %v", err, tt.wantErr)
139+
return
140+
}
141+
if !reflect.DeepEqual(got, tt.want) {
142+
t.Errorf("parseCLINumInsertionWorkersOption() got = %v, want %v", got, tt.want)
143+
}
144+
})
145+
}
146+
}

pbm/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func confSetPITR(ctx context.Context, m connect.Client, value bool) error {
554554
// GetConfigVar returns value of given config vaiable
555555
func GetConfigVar(ctx context.Context, m connect.Client, key string) (interface{}, error) {
556556
if !validateConfigKey(key) {
557-
return nil, errors.New("invalid config key")
557+
return nil, errors.Errorf("invalid config key: %s", key)
558558
}
559559

560560
bts, err := m.ConfigCollection().

pbm/ctrl/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ type RestoreCmd struct {
157157
UsersAndRoles bool `bson:"usersAndRoles,omitempty"`
158158
RSMap map[string]string `bson:"rsMap,omitempty"`
159159

160-
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
160+
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
161+
NumInsertionWorkers *int32 `bson:"numInsertionWorkers,omitempty"`
161162

162163
OplogTS primitive.Timestamp `bson:"oplogTS,omitempty"`
163164

pbm/restore/logical.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ type Restore struct {
4848
bcpStg storage.Storage
4949
oplogStg storage.Storage
5050

51-
numParallelColls int
51+
numParallelColls int
52+
numInsertionWorkersPerCol int
5253
// Shards to participate in restore. Num of shards in bcp could
5354
// be less than in the cluster and this is ok. Only these shards
5455
// would be expected to run restore (distributed transactions sync,
@@ -85,7 +86,8 @@ func New(
8586
brief topo.NodeBrief,
8687
cfg *config.Config,
8788
rsMap map[string]string,
88-
numParallelColls int,
89+
numParallelColls,
90+
numInsertionWorkersPerCol int,
8991
) *Restore {
9092
if rsMap == nil {
9193
rsMap = make(map[string]string)
@@ -99,9 +101,9 @@ func New(
99101

100102
cfg: cfg,
101103

102-
numParallelColls: numParallelColls,
103-
104-
indexCatalog: idx.NewIndexCatalog(),
104+
numParallelColls: numParallelColls,
105+
numInsertionWorkersPerCol: numInsertionWorkersPerCol,
106+
indexCatalog: idx.NewIndexCatalog(),
105107
}
106108
}
107109

@@ -1333,7 +1335,12 @@ func (r *Restore) applyOplog(ctx context.Context, ranges []oplogRange, options *
13331335
}
13341336

13351337
func (r *Restore) snapshot(input io.Reader, cloneNS snapshot.CloneNS, excludeRouterCollections bool) error {
1336-
rf, err := snapshot.NewRestore(r.brief.URI, r.cfg, cloneNS, r.numParallelColls, excludeRouterCollections)
1338+
rf, err := snapshot.NewRestore(
1339+
r.brief.URI,
1340+
r.cfg, cloneNS,
1341+
r.numParallelColls,
1342+
r.numInsertionWorkersPerCol,
1343+
excludeRouterCollections)
13371344
if err != nil {
13381345
return err
13391346
}

pbm/snapshot/restore.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import (
1616
const (
1717
preserveUUID = true
1818

19-
batchSizeDefault = 500
20-
numInsertionWorkersDefault = 10
19+
batchSizeDefault = 500
2120
)
2221

2322
var ExcludeFromRestore = []string{
@@ -54,11 +53,11 @@ func (c *CloneNS) IsSpecified() bool {
5453
return c.FromNS != "" && c.ToNS != ""
5554
}
5655

57-
func NewRestore(
58-
uri string,
56+
func NewRestore(uri string,
5957
cfg *config.Config,
6058
cloneNS CloneNS,
61-
numParallelColls int,
59+
numParallelColls,
60+
numInsertionWorkersPerCol int,
6261
excludeRouterCollections bool,
6362
) (io.ReaderFrom, error) {
6463
topts := options.New("mongorestore",
@@ -90,10 +89,7 @@ func NewRestore(
9089
if cfg.Restore.BatchSize > 0 {
9190
batchSize = cfg.Restore.BatchSize
9291
}
93-
numInsertionWorkers := numInsertionWorkersDefault
94-
if cfg.Restore.NumInsertionWorkers > 0 {
95-
numInsertionWorkers = cfg.Restore.NumInsertionWorkers
96-
}
92+
9793
if numParallelColls < 1 {
9894
numParallelColls = 1
9995
}
@@ -119,7 +115,7 @@ func NewRestore(
119115
BulkBufferSize: batchSize,
120116
BypassDocumentValidation: true,
121117
Drop: true,
122-
NumInsertionWorkers: numInsertionWorkers,
118+
NumInsertionWorkers: numInsertionWorkersPerCol,
123119
NumParallelCollections: numParallelColls,
124120
PreserveUUID: preserveUUID,
125121
StopOnError: true,

pbm/topo/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func GetClusterTime(ctx context.Context, m connect.Client) (primitive.Timestamp,
5858

5959
func ClusterTimeFromNodeInfo(info *NodeInfo) (primitive.Timestamp, error) {
6060
if info.ClusterTime == nil {
61-
return primitive.Timestamp{}, errors.New("no clusterTime in response")
61+
return primitive.Timestamp{}, errors.Errorf("No clusterTime in response. Received: %+v", info)
6262
}
6363

6464
return info.ClusterTime.ClusterTime, nil

0 commit comments

Comments
 (0)