Skip to content

Commit a89b64b

Browse files
PBM-1414 fix reading the NumParallelCollections configuration
1 parent 44b7141 commit a89b64b

File tree

3 files changed

+174
-14
lines changed

3 files changed

+174
-14
lines changed

cmd/pbm-agent/restore.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,19 +135,8 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
135135
return
136136
}
137137

138-
numParallelColls := runtime.NumCPU() / 2
139-
if r.NumParallelColls != nil && *r.NumParallelColls > 0 {
140-
numParallelColls = int(*r.NumParallelColls)
141-
} else if cfg.Restore != nil && cfg.Restore.NumParallelCollections > 0 {
142-
numParallelColls = cfg.Restore.NumParallelCollections
143-
}
144-
145-
numInsertionWorkersPerCol := numInsertionWorkersDefault
146-
if r.NumInsertionWorkers != nil && *r.NumInsertionWorkers > 0 {
147-
numInsertionWorkersPerCol = int(*r.NumInsertionWorkers)
148-
} else if cfg.Restore != nil && cfg.Restore.NumInsertionWorkers > 0 {
149-
numInsertionWorkersPerCol = cfg.Restore.NumInsertionWorkers
150-
}
138+
numParallelColls := getNumParallelCollsConfig(r.NumParallelColls, cfg.Restore)
139+
numInsertionWorkersPerCol := getNumInsertionWorkersConfig(r.NumInsertionWorkers, cfg.Restore)
151140

152141
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
153142
if r.OplogTS.IsZero() {
@@ -194,6 +183,26 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
194183
l.Info("recovery successfully finished")
195184
}
196185

186+
func getNumParallelCollsConfig(rParallelColls *int32, restoreConf *config.RestoreConf) int {
187+
numParallelColls := runtime.NumCPU() / 2
188+
if rParallelColls != nil && *rParallelColls > 0 {
189+
numParallelColls = int(*rParallelColls)
190+
} else if restoreConf != nil && restoreConf.NumParallelCollections > 0 {
191+
numParallelColls = restoreConf.NumParallelCollections
192+
}
193+
return numParallelColls
194+
}
195+
196+
func getNumInsertionWorkersConfig(rInsWorkers *int32, restoreConf *config.RestoreConf) int {
197+
numInsertionWorkersPerCol := numInsertionWorkersDefault
198+
if rInsWorkers != nil && int(*rInsWorkers) > 0 {
199+
numInsertionWorkersPerCol = int(*rInsWorkers)
200+
} else if restoreConf != nil && restoreConf.NumInsertionWorkers > 0 {
201+
numInsertionWorkersPerCol = restoreConf.NumInsertionWorkers
202+
}
203+
return numInsertionWorkersPerCol
204+
}
205+
197206
func addRestoreMetaWithError(
198207
ctx context.Context,
199208
conn connect.Client,

cmd/pbm-agent/restore_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package main
2+
3+
import (
4+
"github.com/percona/percona-backup-mongodb/pbm/config"
5+
"runtime"
6+
"testing"
7+
)
8+
9+
func TestGetNumInsertionWorkersConfig(t *testing.T) {
10+
type args struct {
11+
rInsWorkers *int32
12+
cfg *config.RestoreConf
13+
}
14+
15+
rZeroInsWorkers := int32(0)
16+
rValidInsWorkers := int32(99)
17+
18+
tests := []struct {
19+
name string
20+
args args
21+
want int
22+
}{
23+
{
24+
name: "When no command line param and no Restore config, return default value",
25+
args: args{
26+
rInsWorkers: nil,
27+
cfg: nil,
28+
},
29+
want: numInsertionWorkersDefault,
30+
},
31+
{
32+
name: "When no command line param and no Restore.NumInsertionWorkers config, return default value",
33+
args: args{
34+
rInsWorkers: nil,
35+
cfg: &config.RestoreConf{},
36+
},
37+
want: numInsertionWorkersDefault,
38+
},
39+
{
40+
name: "When zero command line param, return default value",
41+
args: args{
42+
rInsWorkers: &rZeroInsWorkers,
43+
cfg: &config.RestoreConf{},
44+
},
45+
want: numInsertionWorkersDefault,
46+
},
47+
{
48+
name: "NumInsertionWorkers passed from commandline",
49+
args: args{
50+
rInsWorkers: &rValidInsWorkers,
51+
cfg: nil,
52+
},
53+
want: 99,
54+
},
55+
{
56+
name: "NumInsertionWorkers passed from config",
57+
args: args{
58+
rInsWorkers: nil,
59+
cfg: &config.RestoreConf{NumInsertionWorkers: 42},
60+
},
61+
want: 42,
62+
},
63+
{
64+
name: "NumInsertionWorkers passed from command line and from config, return from command line value",
65+
args: args{
66+
rInsWorkers: &rValidInsWorkers,
67+
cfg: &config.RestoreConf{NumInsertionWorkers: 42},
68+
},
69+
want: 99,
70+
},
71+
}
72+
for _, tt := range tests {
73+
t.Run(tt.name, func(t *testing.T) {
74+
if got := getNumInsertionWorkersConfig(tt.args.rInsWorkers, tt.args.cfg); got != tt.want {
75+
t.Errorf("getNumInsertionWorkersConfig() = %v, want %v", got, tt.want)
76+
}
77+
})
78+
}
79+
}
80+
81+
func TestGetNumParallelCollsConfig(t *testing.T) {
82+
type args struct {
83+
rParallelColls *int32
84+
restoreConf *config.RestoreConf
85+
}
86+
87+
rZeroParallelColls := int32(0)
88+
rValidParallelColls := int32(99)
89+
defaultValue := runtime.NumCPU() / 2
90+
91+
tests := []struct {
92+
name string
93+
args args
94+
want int
95+
}{
96+
{
97+
name: "When no command line param and no Restore config, return default value",
98+
args: args{
99+
rParallelColls: nil,
100+
restoreConf: nil,
101+
},
102+
want: defaultValue,
103+
},
104+
{
105+
name: "When no command line param and no Restore.NumInsertionWorkers config, return default value",
106+
args: args{
107+
rParallelColls: nil,
108+
restoreConf: &config.RestoreConf{},
109+
},
110+
want: defaultValue,
111+
},
112+
{
113+
name: "When zero command line param, return default value",
114+
args: args{
115+
rParallelColls: &rZeroParallelColls,
116+
restoreConf: &config.RestoreConf{},
117+
},
118+
want: defaultValue,
119+
},
120+
{
121+
name: "NumInsertionWorkers passed from commandline",
122+
args: args{
123+
rParallelColls: &rValidParallelColls,
124+
restoreConf: nil,
125+
},
126+
want: 99,
127+
},
128+
{
129+
name: "NumInsertionWorkers passed from config",
130+
args: args{
131+
rParallelColls: nil,
132+
restoreConf: &config.RestoreConf{NumParallelCollections: 42},
133+
},
134+
want: 42,
135+
},
136+
{
137+
name: "NumInsertionWorkers passed from command line and from config, return from command line value",
138+
args: args{
139+
rParallelColls: &rValidParallelColls,
140+
restoreConf: &config.RestoreConf{NumParallelCollections: 42},
141+
},
142+
want: 99,
143+
},
144+
}
145+
for _, tt := range tests {
146+
t.Run(tt.name, func(t *testing.T) {
147+
if got := getNumParallelCollsConfig(tt.args.rParallelColls, tt.args.restoreConf); got != tt.want {
148+
t.Errorf("getNumParallelCollsConfig() = %v, want %v", got, tt.want)
149+
}
150+
})
151+
}
152+
}

cmd/pbm/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ func main() {
247247
restoreCmd.Flag("num-insertion-workers-per-collection",
248248
"Specifies the number of insertion workers to run concurrently per collection. For large imports, "+
249249
"increasing the number of insertion workers may increase the speed of the import.").
250-
Default("1").
251250
Int32Var(&restore.numInsertionWorkers)
252251
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
253252
StringVar(&restore.ns)

0 commit comments

Comments
 (0)