Skip to content

Commit 81a4475

Browse files
Merge pull request #1058 from percona/PBM-1414-change-default-ins-workers-fix
PBM-1414 fix reading the NumParallelCollections configuration
2 parents d459d2c + d320b1b commit 81a4475

File tree

3 files changed

+175
-14
lines changed

3 files changed

+175
-14
lines changed

cmd/pbm-agent/restore.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,19 +135,8 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
135135
return
136136
}
137137

138-
numParallelColls := runtime.NumCPU() / 2
139-
if r.NumParallelColls != nil && *r.NumParallelColls > 0 {
140-
numParallelColls = int(*r.NumParallelColls)
141-
} else if cfg.Restore != nil && cfg.Restore.NumParallelCollections > 0 {
142-
numParallelColls = cfg.Restore.NumParallelCollections
143-
}
144-
145-
numInsertionWorkersPerCol := numInsertionWorkersDefault
146-
if r.NumInsertionWorkers != nil && *r.NumInsertionWorkers > 0 {
147-
numInsertionWorkersPerCol = int(*r.NumInsertionWorkers)
148-
} else if cfg.Restore != nil && cfg.Restore.NumInsertionWorkers > 0 {
149-
numInsertionWorkersPerCol = cfg.Restore.NumInsertionWorkers
150-
}
138+
numParallelColls := getNumParallelCollsConfig(r.NumParallelColls, cfg.Restore)
139+
numInsertionWorkersPerCol := getNumInsertionWorkersConfig(r.NumInsertionWorkers, cfg.Restore)
151140

152141
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
153142
if r.OplogTS.IsZero() {
@@ -194,6 +183,26 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
194183
l.Info("recovery successfully finished")
195184
}
196185

186+
func getNumParallelCollsConfig(rParallelColls *int32, restoreConf *config.RestoreConf) int {
187+
numParallelColls := runtime.NumCPU() / 2
188+
if rParallelColls != nil && *rParallelColls > 0 {
189+
numParallelColls = int(*rParallelColls)
190+
} else if restoreConf != nil && restoreConf.NumParallelCollections > 0 {
191+
numParallelColls = restoreConf.NumParallelCollections
192+
}
193+
return numParallelColls
194+
}
195+
196+
func getNumInsertionWorkersConfig(rInsWorkers *int32, restoreConf *config.RestoreConf) int {
197+
numInsertionWorkersPerCol := numInsertionWorkersDefault
198+
if rInsWorkers != nil && int(*rInsWorkers) > 0 {
199+
numInsertionWorkersPerCol = int(*rInsWorkers)
200+
} else if restoreConf != nil && restoreConf.NumInsertionWorkers > 0 {
201+
numInsertionWorkersPerCol = restoreConf.NumInsertionWorkers
202+
}
203+
return numInsertionWorkersPerCol
204+
}
205+
197206
func addRestoreMetaWithError(
198207
ctx context.Context,
199208
conn connect.Client,

cmd/pbm-agent/restore_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package main
2+
3+
import (
4+
"runtime"
5+
"testing"
6+
7+
"github.com/percona/percona-backup-mongodb/pbm/config"
8+
)
9+
10+
func TestGetNumInsertionWorkersConfig(t *testing.T) {
11+
type args struct {
12+
rInsWorkers *int32
13+
cfg *config.RestoreConf
14+
}
15+
16+
rZeroInsWorkers := int32(0)
17+
rValidInsWorkers := int32(99)
18+
19+
tests := []struct {
20+
name string
21+
args args
22+
want int
23+
}{
24+
{
25+
name: "When no command line param and no Restore config, return default value",
26+
args: args{
27+
rInsWorkers: nil,
28+
cfg: nil,
29+
},
30+
want: numInsertionWorkersDefault,
31+
},
32+
{
33+
name: "When no command line param and no Restore.NumInsertionWorkers config, return default value",
34+
args: args{
35+
rInsWorkers: nil,
36+
cfg: &config.RestoreConf{},
37+
},
38+
want: numInsertionWorkersDefault,
39+
},
40+
{
41+
name: "When zero command line param, return default value",
42+
args: args{
43+
rInsWorkers: &rZeroInsWorkers,
44+
cfg: &config.RestoreConf{},
45+
},
46+
want: numInsertionWorkersDefault,
47+
},
48+
{
49+
name: "NumInsertionWorkers passed from commandline",
50+
args: args{
51+
rInsWorkers: &rValidInsWorkers,
52+
cfg: nil,
53+
},
54+
want: 99,
55+
},
56+
{
57+
name: "NumInsertionWorkers passed from config",
58+
args: args{
59+
rInsWorkers: nil,
60+
cfg: &config.RestoreConf{NumInsertionWorkers: 42},
61+
},
62+
want: 42,
63+
},
64+
{
65+
name: "NumInsertionWorkers passed from command line and from config, return from command line value",
66+
args: args{
67+
rInsWorkers: &rValidInsWorkers,
68+
cfg: &config.RestoreConf{NumInsertionWorkers: 42},
69+
},
70+
want: 99,
71+
},
72+
}
73+
for _, tt := range tests {
74+
t.Run(tt.name, func(t *testing.T) {
75+
if got := getNumInsertionWorkersConfig(tt.args.rInsWorkers, tt.args.cfg); got != tt.want {
76+
t.Errorf("getNumInsertionWorkersConfig() = %v, want %v", got, tt.want)
77+
}
78+
})
79+
}
80+
}
81+
82+
func TestGetNumParallelCollsConfig(t *testing.T) {
83+
type args struct {
84+
rParallelColls *int32
85+
restoreConf *config.RestoreConf
86+
}
87+
88+
rZeroParallelColls := int32(0)
89+
rValidParallelColls := int32(99)
90+
defaultValue := runtime.NumCPU() / 2
91+
92+
tests := []struct {
93+
name string
94+
args args
95+
want int
96+
}{
97+
{
98+
name: "When no command line param and no Restore config, return default value",
99+
args: args{
100+
rParallelColls: nil,
101+
restoreConf: nil,
102+
},
103+
want: defaultValue,
104+
},
105+
{
106+
name: "When no command line param and no Restore.NumInsertionWorkers config, return default value",
107+
args: args{
108+
rParallelColls: nil,
109+
restoreConf: &config.RestoreConf{},
110+
},
111+
want: defaultValue,
112+
},
113+
{
114+
name: "When zero command line param, return default value",
115+
args: args{
116+
rParallelColls: &rZeroParallelColls,
117+
restoreConf: &config.RestoreConf{},
118+
},
119+
want: defaultValue,
120+
},
121+
{
122+
name: "NumInsertionWorkers passed from commandline",
123+
args: args{
124+
rParallelColls: &rValidParallelColls,
125+
restoreConf: nil,
126+
},
127+
want: 99,
128+
},
129+
{
130+
name: "NumInsertionWorkers passed from config",
131+
args: args{
132+
rParallelColls: nil,
133+
restoreConf: &config.RestoreConf{NumParallelCollections: 42},
134+
},
135+
want: 42,
136+
},
137+
{
138+
name: "NumInsertionWorkers passed from command line and from config, return from command line value",
139+
args: args{
140+
rParallelColls: &rValidParallelColls,
141+
restoreConf: &config.RestoreConf{NumParallelCollections: 42},
142+
},
143+
want: 99,
144+
},
145+
}
146+
for _, tt := range tests {
147+
t.Run(tt.name, func(t *testing.T) {
148+
if got := getNumParallelCollsConfig(tt.args.rParallelColls, tt.args.restoreConf); got != tt.want {
149+
t.Errorf("getNumParallelCollsConfig() = %v, want %v", got, tt.want)
150+
}
151+
})
152+
}
153+
}

cmd/pbm/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ func main() {
247247
restoreCmd.Flag("num-insertion-workers-per-collection",
248248
"Specifies the number of insertion workers to run concurrently per collection. For large imports, "+
249249
"increasing the number of insertion workers may increase the speed of the import.").
250-
Default("1").
251250
Int32Var(&restore.numInsertionWorkers)
252251
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
253252
StringVar(&restore.ns)

0 commit comments

Comments
 (0)