Skip to content

Commit 883fd63

Browse files
Merge pull request #931 from percona/release-2.4.1
Release 2.4.1
2 parents 0b6fbbb + 67a182e commit 883fd63

File tree

19 files changed

+400
-48
lines changed

19 files changed

+400
-48
lines changed

cmd/pbm-agent/agent.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Agent struct {
3434

3535
brief topo.NodeBrief
3636

37+
mongoVersion version.MongoVersion
38+
3739
dumpConns int
3840

3941
closeCMD chan struct{}
@@ -51,7 +53,12 @@ func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConn
5153

5254
info, err := topo.GetNodeInfo(ctx, m)
5355
if err != nil {
54-
return nil, err
56+
return nil, errors.Wrap(err, "get node info")
57+
}
58+
59+
mongoVersion, err := version.GetMongoVersion(ctx, m)
60+
if err != nil {
61+
return nil, errors.Wrap(err, "get mongo version")
5562
}
5663

5764
a := &Agent{
@@ -64,6 +71,7 @@ func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConn
6471
SetName: info.SetName,
6572
Me: info.Me,
6673
}
74+
a.mongoVersion = mongoVersion
6775
a.dumpConns = dumpConns
6876
return a, nil
6977
}

cmd/pbm-agent/backup.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
112112
return
113113
}
114114

115+
bcp.SetMongoVersion(a.mongoVersion.VersionString)
115116
bcp.SetSlicerInterval(cfg.BackupSlicerInterval())
116117
bcp.SetTimeouts(cfg.Backup.Timeouts)
117118

cmd/pbm/backup.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func runBackup(
138138
Namespaces: nss,
139139
Compression: compression,
140140
CompressionLevel: level,
141+
Filelist: b.externList,
141142
},
142143
})
143144
if err != nil {
@@ -169,7 +170,7 @@ func runBackup(
169170
return nil, errors.Errorf("unexpected backup status %v", str)
170171
}
171172

172-
bcp, err := backup.NewDBManager(conn).GetBackupByName(ctx, b.name)
173+
bcp, err := pbm.GetBackupByName(ctx, b.name, sdk.GetBackupByNameOptions{FetchFilelist: b.externList})
173174
if err != nil {
174175
return nil, errors.Wrap(err, "get backup meta")
175176
}

cmd/pbm/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func main() {
132132
backupCmd.Flag("wait", "Wait for the backup to finish").
133133
Short('w').
134134
BoolVar(&backupOptions.wait)
135-
backupCmd.Flag("list-files", "Wait for the backup to finish").
135+
backupCmd.Flag("list-files", "Shows the list of files per node (external backup only)").
136136
Short('l').
137137
BoolVar(&backupOptions.externList)
138138

pbm/backup/backup.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Backup struct {
2727
leadConn connect.Client
2828
nodeConn *mongo.Client
2929
brief topo.NodeBrief
30+
mongoVersion string
3031
typ defs.BackupType
3132
incrBase bool
3233
timeouts *config.BackupTimeouts
@@ -72,6 +73,10 @@ func NewIncremental(leadConn connect.Client, conn *mongo.Client, brief topo.Node
7273
}
7374
}
7475

76+
func (b *Backup) SetMongoVersion(v string) {
77+
b.mongoVersion = v
78+
}
79+
7580
func (b *Backup) SetTimeouts(t *config.BackupTimeouts) {
7681
b.timeouts = t
7782
}
@@ -117,6 +122,7 @@ func (b *Backup) Init(
117122
// the driver (mongo?) sets TS to the current wall clock if TS was 0, so have to init with 1
118123
FirstWriteTS: primitive.Timestamp{T: 1, I: 1},
119124
PBMVersion: version.Current().Version,
125+
MongoVersion: b.mongoVersion,
120126
Nomination: []BackupRsNomination{},
121127
BalancerStatus: balancer,
122128
Hb: ts,
@@ -132,12 +138,6 @@ func (b *Backup) Init(
132138
}
133139
meta.Store = cfg.Storage
134140

135-
ver, err := version.GetMongoVersion(ctx, b.nodeConn)
136-
if err != nil {
137-
return errors.Wrap(err, "get mongo version")
138-
}
139-
meta.MongoVersion = ver.VersionString
140-
141141
fcv, err := version.GetFCV(ctx, b.nodeConn)
142142
if err != nil {
143143
return errors.Wrap(err, "get featureCompatibilityVersion")
@@ -182,6 +182,8 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l
182182
rsMeta := BackupReplset{
183183
Name: inf.SetName,
184184
Node: inf.Me,
185+
PBMVersion: version.Current().Version,
186+
MongoVersion: b.mongoVersion,
185187
StartTS: time.Now().UTC().Unix(),
186188
Status: defs.StatusRunning,
187189
Conditions: []Condition{},

pbm/backup/physical.go

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ func (b *Backup) doPhysical(
354354
}
355355

356356
if b.typ == defs.ExternalBackup {
357-
return b.handleExternal(ctx, bcp, rsMeta, data, jrnls, bcur.Meta.DBpath, opid, inf, l)
357+
return b.handleExternal(ctx, bcp, rsMeta, data, jrnls, bcur.Meta.DBpath, opid, inf, stg, l)
358358
}
359359

360360
return b.uploadPhysical(ctx, bcp, rsMeta, data, jrnls, bcur.Meta.DBpath, stg, l)
@@ -364,16 +364,18 @@ func (b *Backup) handleExternal(
364364
ctx context.Context,
365365
bcp *ctrl.BackupCmd,
366366
rsMeta *BackupReplset,
367-
data,
367+
data []File,
368368
jrnls []File,
369369
dbpath string,
370370
opid ctrl.OPID,
371371
inf *topo.NodeInfo,
372+
stg storage.Storage,
372373
l log.LogEvent,
373374
) error {
375+
filelist := make(Filelist, 0, len(data)+len(jrnls)+2) // +2 for metadata and filelist
374376
for _, f := range append(data, jrnls...) {
375377
f.Name = path.Clean("./" + strings.TrimPrefix(f.Name, dbpath))
376-
rsMeta.Files = append(rsMeta.Files, f)
378+
filelist = append(filelist, f)
377379
}
378380

379381
// We'll rewrite rs' LastWriteTS with the cluster LastWriteTS for the meta
@@ -389,19 +391,33 @@ func (b *Backup) handleExternal(
389391
}
390392
// save rs meta along with the data files so it can be used during the restore
391393
metaf := fmt.Sprintf(defs.ExternalRsMetaFile, fsMeta.Name)
392-
fsMeta.Files = append(fsMeta.Files, File{
393-
Name: metaf,
394-
})
394+
filelist = append(filelist, File{Name: metaf}, File{Name: FilelistName})
395395
metadst := filepath.Join(dbpath, metaf)
396396
err = writeRSmetaToDisk(metadst, &fsMeta)
397397
if err != nil {
398398
// we can restore without it
399399
l.Warning("failed to save rs meta file <%s>: %v", metadst, err)
400400
}
401401

402-
err = RSSetPhyFiles(ctx, b.leadConn, bcp.Name, rsMeta.Name, rsMeta)
402+
filelistPath := filepath.Join(dbpath, FilelistName)
403+
err = saveFilelist(filelistPath, filelist)
403404
if err != nil {
404-
return errors.Wrap(err, "set shard's files list")
405+
return errors.Wrap(err, "save filelist to dbpath")
406+
}
407+
408+
if bcp.Filelist {
409+
// keep filelist on backup storage for listing files to copy
410+
bcpStoragePath := path.Join(bcp.Name, rsMeta.Name, FilelistName)
411+
_, err = storage.Upload(ctx, filelist, stg, compress.CompressionTypeNone, nil, bcpStoragePath, -1)
412+
if err != nil {
413+
return errors.Wrapf(err, "save filelist to storage: %q", bcpStoragePath)
414+
}
415+
416+
defer func() {
417+
if err := stg.Delete(bcp.Name); err != nil {
418+
l.Warning("remove backup folder <%s>: %v", bcp.Name, err)
419+
}
420+
}()
405421
}
406422

407423
err = b.toState(ctx, defs.StatusCopyReady, bcp.Name, opid.String(), inf, nil)
@@ -419,10 +435,28 @@ func (b *Backup) handleExternal(
419435
if err != nil {
420436
l.Warning("remove rs meta file <%s>: %v", metadst, err)
421437
}
438+
err = os.Remove(filelistPath)
439+
if err != nil {
440+
l.Warning("remove file <%s>: %v", filelistPath, err)
441+
}
422442

423443
return nil
424444
}
425445

446+
func saveFilelist(filepath string, fl Filelist) error {
447+
fw, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
448+
if err != nil {
449+
return errors.Wrapf(err, "create/open")
450+
}
451+
defer fw.Close()
452+
453+
if _, err = fl.WriteTo(fw); err != nil {
454+
return errors.Wrap(err, "write")
455+
}
456+
457+
return errors.Wrap(fw.Sync(), "fsync")
458+
}
459+
426460
func writeRSmetaToDisk(fname string, rsMeta *BackupReplset) error {
427461
fw, err := os.OpenFile(fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
428462
if err != nil {
@@ -454,35 +488,38 @@ func (b *Backup) uploadPhysical(
454488
stg storage.Storage,
455489
l log.LogEvent,
456490
) error {
457-
var err error
458491
l.Info("uploading data")
459-
rsMeta.Files, err = uploadFiles(ctx, data, bcp.Name+"/"+rsMeta.Name, dbpath,
492+
dataFiles, err := uploadFiles(ctx, data, bcp.Name+"/"+rsMeta.Name, dbpath,
460493
b.typ == defs.IncrementalBackup, stg, bcp.Compression, bcp.CompressionLevel, l)
461494
if err != nil {
462-
return err
495+
return errors.Wrap(err, "upload data files")
463496
}
464497
l.Info("uploading data done")
465498

466499
l.Info("uploading journals")
467500
ju, err := uploadFiles(ctx, jrnls, bcp.Name+"/"+rsMeta.Name, dbpath,
468501
false, stg, bcp.Compression, bcp.CompressionLevel, l)
469502
if err != nil {
470-
return err
503+
return errors.Wrap(err, "upload journal files")
471504
}
472505
l.Info("uploading journals done")
473-
rsMeta.Files = append(rsMeta.Files, ju...)
474506

475-
err = RSSetPhyFiles(ctx, b.leadConn, bcp.Name, rsMeta.Name, rsMeta)
476-
if err != nil {
477-
return errors.Wrap(err, "set shard's files list")
478-
}
507+
filelist := Filelist(dataFiles)
508+
filelist = append(filelist, ju...)
479509

480510
size := int64(0)
481-
for _, f := range rsMeta.Files {
511+
for _, f := range filelist {
482512
size += f.StgSize
483513
}
484514

485-
err = IncBackupSize(ctx, b.leadConn, bcp.Name, size)
515+
filelistPath := path.Join(bcp.Name, rsMeta.Name, FilelistName)
516+
flSize, err := storage.Upload(ctx, filelist, stg, compress.CompressionTypeNone, nil, filelistPath, -1)
517+
if err != nil {
518+
return errors.Wrapf(err, "upload filelist %q", filelistPath)
519+
}
520+
l.Info("uploaded: %q %s", filelistPath, fmtSize(flSize))
521+
522+
err = IncBackupSize(ctx, b.leadConn, bcp.Name, size+flSize)
486523
if err != nil {
487524
return errors.Wrap(err, "inc backup size")
488525
}

pbm/backup/query.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -208,19 +208,6 @@ func IncBackupSize(ctx context.Context, conn connect.Client, bcpName string, siz
208208
return err
209209
}
210210

211-
func RSSetPhyFiles(ctx context.Context, conn connect.Client, bcpName, rsName string, rs *BackupReplset) error {
212-
_, err := conn.BcpCollection().UpdateOne(
213-
ctx,
214-
bson.D{{"name", bcpName}, {"replsets.name", rsName}},
215-
bson.D{
216-
{"$set", bson.M{"replsets.$.files": rs.Files}},
217-
{"$set", bson.M{"replsets.$.journal": rs.Journal}},
218-
},
219-
)
220-
221-
return err
222-
}
223-
224211
func SetRSLastWrite(conn connect.Client, bcpName, rsName string, ts primitive.Timestamp) error {
225212
_, err := conn.BcpCollection().UpdateOne(
226213
context.Background(),

pbm/backup/storage.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,35 @@ func DeleteBackupFiles(meta *BackupMeta, stg storage.Storage) error {
209209

210210
// DeleteBackupFiles removes backup's artifacts from storage
211211
func deletePhysicalBackupFiles(meta *BackupMeta, stg storage.Storage) error {
212+
if version.HasFilelistFile(meta.PBMVersion) {
213+
for i := range meta.Replsets {
214+
rs := &meta.Replsets[i]
215+
if rs.Files != nil {
216+
// it is already fetched
217+
continue
218+
}
219+
220+
filelistPath := path.Join(meta.Name, rs.Name, FilelistName)
221+
rdr, err := stg.SourceReader(filelistPath)
222+
if err != nil {
223+
if errors.Is(err, storage.ErrNotExist) {
224+
continue
225+
}
226+
227+
return errors.Wrapf(err, "open %q", filelistPath)
228+
}
229+
defer rdr.Close()
230+
231+
filelist, err := ReadFilelist(rdr)
232+
rdr.Close()
233+
if err != nil {
234+
return errors.Wrapf(err, "parse filelist")
235+
}
236+
237+
rs.Files = filelist
238+
}
239+
}
240+
212241
for _, r := range meta.Replsets {
213242
for _, f := range r.Files {
214243
fname := meta.Name + "/" + r.Name + "/" + f.Name + meta.Compression.Suffix()

0 commit comments

Comments
 (0)