Skip to content

Commit 665e9bf

Browse files
committed
Merge remote-tracking branch 'origin/dev'
2 parents 8683365 + 9d29213 commit 665e9bf

File tree

14 files changed

+340
-66
lines changed

14 files changed

+340
-66
lines changed

cmd/pbm-agent/main.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@ func main() {
4949
versionFormat = versionCmd.Flag("format", "Output format <json or \"\">").
5050
Default("").
5151
String()
52+
53+
logPath = pbmCmd.Flag("log-path", "Path to file").
54+
Envar("LOG_PATH").
55+
Default("/dev/stderr").
56+
String()
57+
logJSON = pbmCmd.Flag("log-json", "Enable JSON output").
58+
Envar("LOG_JSON").
59+
Bool()
60+
logLevel = pbmCmd.Flag(
61+
"log-level",
62+
"Minimal log level based on severity level: D, I, W, E or F, low to high. Choosing one includes higher levels too.").
63+
Envar("LOG_LEVEL").
64+
Default(log.D).
65+
Enum(log.D, log.I, log.W, log.E, log.F)
5266
)
5367

5468
cmd, err := pbmCmd.DefaultEnvars().Parse(os.Args[1:])
@@ -74,19 +88,24 @@ func main() {
7488

7589
hidecreds()
7690

77-
fmt.Print(perconaSquadNotice)
91+
logOpts := &log.Opts{
92+
LogPath: *logPath,
93+
LogLevel: *logLevel,
94+
LogJSON: *logJSON,
95+
}
7896

79-
err = runAgent(url, *dumpConns)
97+
err = runAgent(url, *dumpConns, logOpts)
8098
stdlog.Println("Exit:", err)
8199
if err != nil {
82100
os.Exit(1)
83101
}
84102
}
85103

86-
func runAgent(mongoURI string, dumpConns int) error {
87-
mtLog.SetDateFormat(log.LogTimeFormat)
88-
mtLog.SetVerbosity(&options.Verbosity{VLevel: mtLog.DebugLow})
89-
104+
func runAgent(
105+
mongoURI string,
106+
dumpConns int,
107+
logOpts *log.Opts,
108+
) error {
90109
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
91110
defer cancel()
92111

@@ -95,16 +114,34 @@ func runAgent(mongoURI string, dumpConns int) error {
95114
return errors.Wrap(err, "connect to PBM")
96115
}
97116

117+
err = setupNewDB(ctx, leadConn)
118+
if err != nil {
119+
return errors.Wrap(err, "setup pbm collections")
120+
}
121+
98122
agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns)
99123
if err != nil {
100124
return errors.Wrap(err, "connect to the node")
101125
}
102126

103-
logger := log.New(agent.leadConn.LogCollection(), agent.brief.SetName, agent.brief.Me)
127+
logger := log.NewWithOpts(
128+
ctx,
129+
agent.leadConn,
130+
agent.brief.SetName,
131+
agent.brief.Me,
132+
logOpts)
104133
defer logger.Close()
105134

106135
ctx = log.SetLoggerToContext(ctx, logger)
107136

137+
mtLog.SetDateFormat(log.LogTimeFormat)
138+
mtLog.SetVerbosity(&options.Verbosity{VLevel: mtLog.DebugLow})
139+
mtLog.SetWriter(logger)
140+
141+
logger.Printf(perconaSquadNotice)
142+
logger.Printf("log options: log-path=%s, log-level:%s, log-json:%t",
143+
logOpts.LogPath, logOpts.LogLevel, logOpts.LogJSON)
144+
108145
canRunSlicer := true
109146
if err := agent.CanStart(ctx); err != nil {
110147
if errors.Is(err, ErrArbiterNode) || errors.Is(err, ErrDelayedNode) {
@@ -114,11 +151,6 @@ func runAgent(mongoURI string, dumpConns int) error {
114151
}
115152
}
116153

117-
err = setupNewDB(ctx, agent.leadConn)
118-
if err != nil {
119-
return errors.Wrap(err, "setup pbm collections")
120-
}
121-
122154
agent.showIncompatibilityWarning(ctx)
123155

124156
if canRunSlicer {

cmd/pbm-agent/oplog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
7777
}
7878

7979
l.Info("oplog replay started")
80-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0)
80+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1)
8181
err = rr.ReplayOplog(ctx, r, opID, l)
8282
if err != nil {
8383
if errors.Is(err, restore.ErrNoDataForShard) {

cmd/pbm-agent/restore.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import (
1717
"github.com/percona/percona-backup-mongodb/pbm/topo"
1818
)
1919

20+
const (
21+
numInsertionWorkersDefault = 1
22+
)
23+
2024
func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, ep config.Epoch) {
2125
logger := log.FromContext(ctx)
2226
if r == nil {
@@ -138,7 +142,14 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
138142
numParallelColls = cfg.Restore.NumParallelCollections
139143
}
140144

141-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls)
145+
numInsertionWorkersPerCol := numInsertionWorkersDefault
146+
if r.NumInsertionWorkers != nil && *r.NumInsertionWorkers > 0 {
147+
numInsertionWorkersPerCol = int(*r.NumInsertionWorkers)
148+
} else if cfg.Restore != nil && cfg.Restore.NumInsertionWorkers > 0 {
149+
numInsertionWorkersPerCol = cfg.Restore.NumInsertionWorkers
150+
}
151+
152+
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
142153
if r.OplogTS.IsZero() {
143154
err = rr.Snapshot(ctx, r, opid, bcp)
144155
} else {

cmd/pbm/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,11 @@ func main() {
244244
StringVar(&restore.pitrBase)
245245
restoreCmd.Flag("num-parallel-collections", "Number of parallel collections").
246246
Int32Var(&restore.numParallelColls)
247+
restoreCmd.Flag("num-insertion-workers-per-collection",
248+
"Specifies the number of insertion workers to run concurrently per collection. For large imports, "+
249+
"increasing the number of insertion workers may increase the speed of the import.").
250+
Default("1").
251+
Int32Var(&restore.numInsertionWorkers)
247252
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
248253
StringVar(&restore.ns)
249254
restoreCmd.Flag("ns-from", "Allows collection cloning (creating from the backup with different name) "+
@@ -495,7 +500,7 @@ func main() {
495500
if err != nil {
496501
exitErr(errors.Wrap(err, "connect to mongodb"), pbmOutF)
497502
}
498-
ctx = log.SetLoggerToContext(ctx, log.New(conn.LogCollection(), "", ""))
503+
ctx = log.SetLoggerToContext(ctx, log.New(conn, "", ""))
499504

500505
ver, err := version.GetMongoVersion(ctx, conn.MongoClient())
501506
if err != nil {

cmd/pbm/restore.go

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type restoreOpts struct {
5353
conf string
5454
ts string
5555

56-
numParallelColls int32
56+
numParallelColls int32
57+
numInsertionWorkers int32
5758
}
5859

5960
type restoreRet struct {
@@ -124,6 +125,10 @@ func runRestore(
124125
if err != nil {
125126
return nil, errors.Wrap(err, "parse --num-parallel-collections option")
126127
}
128+
numInsertionWorkers, err := parseCLINumInsertionWorkersOption(o.numInsertionWorkers)
129+
if err != nil {
130+
return nil, errors.Wrap(err, "parse --num-insertion-workers option")
131+
}
127132
nss, err := parseCLINSOption(o.ns)
128133
if err != nil {
129134
return nil, errors.Wrap(err, "parse --ns option")
@@ -154,7 +159,7 @@ func runRestore(
154159
}
155160
tdiff := time.Now().Unix() - int64(clusterTime.T)
156161

157-
m, err := doRestore(ctx, conn, o, numParallelColls, nss, o.nsFrom, o.nsTo, rsMap, node, outf)
162+
m, err := doRestore(ctx, conn, o, numParallelColls, numInsertionWorkers, nss, o.nsFrom, o.nsTo, rsMap, node, outf)
158163
if err != nil {
159164
return nil, err
160165
}
@@ -353,12 +358,12 @@ func nsIsTaken(
353358
ns string,
354359
) error {
355360
ns = strings.TrimSpace(ns)
356-
db, coll, ok := strings.Cut(ns, ".")
361+
dbName, coll, ok := strings.Cut(ns, ".")
357362
if !ok {
358363
return errors.Wrap(ErrInvalidNamespace, ns)
359364
}
360365

361-
collNames, err := conn.MongoClient().Database(db).ListCollectionNames(ctx, bson.D{{"name", coll}})
366+
collNames, err := conn.MongoClient().Database(dbName).ListCollectionNames(ctx, bson.D{{"name", coll}})
362367
if err != nil {
363368
return errors.Wrap(err, "list collection names for cloning target validation")
364369
}
@@ -375,6 +380,7 @@ func doRestore(
375380
conn connect.Client,
376381
o *restoreOpts,
377382
numParallelColls *int32,
383+
numInsertionWorkers *int32,
378384
nss []string,
379385
nsFrom string,
380386
nsTo string,
@@ -399,15 +405,16 @@ func doRestore(
399405
cmd := ctrl.Cmd{
400406
Cmd: ctrl.CmdRestore,
401407
Restore: &ctrl.RestoreCmd{
402-
Name: name,
403-
BackupName: bcp,
404-
NumParallelColls: numParallelColls,
405-
Namespaces: nss,
406-
NamespaceFrom: nsFrom,
407-
NamespaceTo: nsTo,
408-
UsersAndRoles: o.usersAndRoles,
409-
RSMap: rsMapping,
410-
External: o.extern,
408+
Name: name,
409+
BackupName: bcp,
410+
NumParallelColls: numParallelColls,
411+
NumInsertionWorkers: numInsertionWorkers,
412+
Namespaces: nss,
413+
NamespaceFrom: nsFrom,
414+
NamespaceTo: nsTo,
415+
UsersAndRoles: o.usersAndRoles,
416+
RSMap: rsMapping,
417+
External: o.extern,
411418
},
412419
}
413420
if o.pitr != "" {
@@ -804,3 +811,14 @@ func validateNSFromNSTo(o *restoreOpts) error {
804811

805812
return nil
806813
}
814+
815+
func parseCLINumInsertionWorkersOption(value int32) (*int32, error) {
816+
if value < 0 {
817+
return nil, errors.New("Number of insertion workers has to be greater than zero.")
818+
}
819+
if value == 0 {
820+
return nil, nil //nolint:nilnil
821+
}
822+
823+
return &value, nil
824+
}

cmd/pbm/restore_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"errors"
5+
"reflect"
56
"testing"
67
)
78

@@ -101,3 +102,45 @@ func TestCloningValidation(t *testing.T) {
101102
})
102103
}
103104
}
105+
106+
func TestParseCLINumInsertionWorkersOption(t *testing.T) {
107+
var num int32 = 1
108+
109+
tests := []struct {
110+
name string
111+
value int32
112+
want *int32
113+
wantErr bool
114+
}{
115+
{
116+
name: "valid number - no error",
117+
value: 1,
118+
want: &num,
119+
wantErr: false,
120+
},
121+
{
122+
name: "zero - no error, but return nil",
123+
value: 0,
124+
want: nil,
125+
wantErr: false,
126+
},
127+
{
128+
name: "negative value - error",
129+
value: -1,
130+
want: nil,
131+
wantErr: true,
132+
},
133+
}
134+
for _, tt := range tests {
135+
t.Run(tt.name, func(t *testing.T) {
136+
got, err := parseCLINumInsertionWorkersOption(tt.value)
137+
if (err != nil) != tt.wantErr {
138+
t.Errorf("parseCLINumInsertionWorkersOption() error = %v, wantErr %v", err, tt.wantErr)
139+
return
140+
}
141+
if !reflect.DeepEqual(got, tt.want) {
142+
t.Errorf("parseCLINumInsertionWorkersOption() got = %v, want %v", got, tt.want)
143+
}
144+
})
145+
}
146+
}

pbm/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func confSetPITR(ctx context.Context, m connect.Client, value bool) error {
554554
// GetConfigVar returns value of given config vaiable
555555
func GetConfigVar(ctx context.Context, m connect.Client, key string) (interface{}, error) {
556556
if !validateConfigKey(key) {
557-
return nil, errors.New("invalid config key")
557+
return nil, errors.Errorf("invalid config key: %s", key)
558558
}
559559

560560
bts, err := m.ConfigCollection().

pbm/ctrl/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ type RestoreCmd struct {
157157
UsersAndRoles bool `bson:"usersAndRoles,omitempty"`
158158
RSMap map[string]string `bson:"rsMap,omitempty"`
159159

160-
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
160+
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
161+
NumInsertionWorkers *int32 `bson:"numInsertionWorkers,omitempty"`
161162

162163
OplogTS primitive.Timestamp `bson:"oplogTS,omitempty"`
163164

pbm/log/discard.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ func (discardLoggerImpl) PauseMgo() {
2828
func (discardLoggerImpl) ResumeMgo() {
2929
}
3030

31+
func (discardLoggerImpl) Write([]byte) (int, error) {
32+
return 0, nil
33+
}
34+
3135
func (discardLoggerImpl) Printf(msg string, args ...any) {
3236
}
3337

pbm/log/log.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type Logger interface {
2626
PauseMgo()
2727
ResumeMgo()
2828

29+
Write(p []byte) (n int, err error)
30+
2931
Printf(msg string, args ...any)
3032
Debug(event, obj, opid string, epoch primitive.Timestamp, msg string, args ...any)
3133
Info(event, obj, opid string, epoch primitive.Timestamp, msg string, args ...any)
@@ -60,19 +62,44 @@ const (
6062
Debug
6163
)
6264

65+
const (
66+
F = "F"
67+
E = "E"
68+
W = "W"
69+
I = "I"
70+
D = "D"
71+
)
72+
6373
func (s Severity) String() string {
6474
switch s {
6575
case Fatal:
66-
return "F"
76+
return F
6777
case Error:
68-
return "E"
78+
return E
6979
case Warning:
70-
return "W"
80+
return W
7181
case Info:
72-
return "I"
82+
return I
7383
case Debug:
74-
return "D"
84+
return D
7585
default:
7686
return ""
7787
}
7888
}
89+
90+
func strToSeverity(s string) Severity {
91+
switch s {
92+
case F:
93+
return Fatal
94+
case E:
95+
return Error
96+
case W:
97+
return Warning
98+
case I:
99+
return Info
100+
case D:
101+
return Debug
102+
default:
103+
return Debug
104+
}
105+
}

0 commit comments

Comments
 (0)