Skip to content

Commit 0190573

Browse files
authored
[PBM-1234, PBM-1236] Unable to delete incremental backups (#905)
1 parent bab7e4d commit 0190573

File tree

14 files changed

+216
-109
lines changed

14 files changed

+216
-109
lines changed

cmd/pbm-agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func (a *Agent) Resync(ctx context.Context, opid ctrl.OPID, ep config.Epoch) {
206206
}
207207
l.Info("succeed")
208208

209-
epch, err := config.ResetEpoch(a.leadConn)
209+
epch, err := config.ResetEpoch(ctx, a.leadConn)
210210
if err != nil {
211211
l.Error("reset epoch: %v", err)
212212
return

cmd/pbm-agent/backup.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
8383
}
8484
}
8585

86-
// wakeup the slicer to not wait for the tick
87-
go a.sliceNow(opid)
86+
if cmd.Type == defs.LogicalBackup {
87+
// wakeup the slicer to not wait for the tick
88+
go a.sliceNow(opid)
89+
}
8890

8991
var bcp *backup.Backup
9092
switch cmd.Type {

cmd/pbm-agent/delete.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"runtime"
66
"time"
77

8+
"go.mongodb.org/mongo-driver/bson/primitive"
89
"golang.org/x/sync/errgroup"
910

1011
"github.com/percona/percona-backup-mongodb/pbm/backup"
@@ -74,8 +75,20 @@ func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.O
7475
l = logger.NewEvent(string(ctrl.CmdDeleteBackup), obj, opid.String(), ep.TS())
7576
ctx := log.SetLogEventToContext(ctx, l)
7677

78+
ct, err := topo.GetClusterTime(ctx, a.leadConn)
79+
if err != nil {
80+
l.Error("get cluster time: %v", err)
81+
return
82+
}
83+
if d.OlderThan > int64(ct.T) {
84+
providedTime := t.Format(time.RFC3339)
85+
realTime := time.Unix(int64(ct.T), 0).UTC().Format(time.RFC3339)
86+
l.Error("provided time %q is after now %q", providedTime, realTime)
87+
return
88+
}
89+
7790
l.Info("deleting backups older than %v", t)
78-
err := backup.DeleteBackupBefore(ctx, a.leadConn, t, "")
91+
err = backup.DeleteBackupBefore(ctx, a.leadConn, t, "")
7992
if err != nil {
8093
l.Error("deleting: %v", err)
8194
return
@@ -148,11 +161,28 @@ func (a *Agent) DeletePITR(ctx context.Context, d *ctrl.DeletePITRCmd, opid ctrl
148161
if d.OlderThan > 0 {
149162
t := time.Unix(d.OlderThan, 0).UTC()
150163
obj := t.Format("2006-01-02T15:04:05Z")
164+
151165
l = logger.NewEvent(string(ctrl.CmdDeletePITR), obj, opid.String(), ep.TS())
166+
ctx := log.SetLogEventToContext(ctx, l)
167+
168+
var ct primitive.Timestamp
169+
ct, err = topo.GetClusterTime(ctx, a.leadConn)
170+
if err != nil {
171+
l.Error("get cluster time: %v", err)
172+
return
173+
}
174+
if d.OlderThan > int64(ct.T) {
175+
providedTime := t.Format(time.RFC3339)
176+
realTime := time.Unix(int64(ct.T), 0).UTC().Format(time.RFC3339)
177+
l.Error("provided time %q is after now %q", providedTime, realTime)
178+
return
179+
}
180+
152181
l.Info("deleting pitr chunks older than %v", t)
153182
err = oplogtmp.DeletePITR(ctx, a.leadConn, &t, l)
154183
} else {
155184
l = logger.NewEvent(string(ctrl.CmdDeletePITR), "_all_", opid.String(), ep.TS())
185+
ctx := log.SetLogEventToContext(ctx, l)
156186
l.Info("deleting all pitr chunks")
157187
err = oplogtmp.DeletePITR(ctx, a.leadConn, nil, l)
158188
}
@@ -210,6 +240,18 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
210240
}
211241
}()
212242

243+
ct, err := topo.GetClusterTime(ctx, a.leadConn)
244+
if err != nil {
245+
l.Error("get cluster time: %v", err)
246+
return
247+
}
248+
if d.OlderThan.T > ct.T {
249+
providedTime := time.Unix(int64(ct.T), 0).UTC().Format(time.RFC3339)
250+
realTime := time.Unix(int64(ct.T), 0).UTC().Format(time.RFC3339)
251+
l.Error("provided time %q is after now %q", providedTime, realTime)
252+
return
253+
}
254+
213255
stg, err := util.GetStorage(ctx, a.leadConn, l)
214256
if err != nil {
215257
l.Error("get storage: " + err.Error())

cmd/pbm-agent/oplog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
7777
}
7878
l.Info("oplog replay successfully finished")
7979

80-
resetEpoch, err := config.ResetEpoch(a.leadConn)
80+
resetEpoch, err := config.ResetEpoch(ctx, a.leadConn)
8181
if err != nil {
8282
l.Error("reset epoch: %s", err.Error())
8383
return

cmd/pbm-agent/restore.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func canSlicingNow(ctx context.Context, conn connect.Client) error {
118118
return errors.Wrap(err, "get backup metadata")
119119
}
120120

121-
if bcp.Type != defs.PhysicalBackup && bcp.Type != defs.ExternalBackup {
121+
if bcp.Type == defs.LogicalBackup {
122122
return lock.ConcurrentOpError{l.LockHeader}
123123
}
124124
}
@@ -442,7 +442,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
442442
}
443443

444444
if bcpType == defs.LogicalBackup && nodeInfo.IsLeader() {
445-
epch, err := config.ResetEpoch(a.leadConn)
445+
epch, err := config.ResetEpoch(ctx, a.leadConn)
446446
if err != nil {
447447
l.Error("reset epoch: %v", err)
448448
}

cmd/pbm/delete.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ func deleteManyBackup(ctx context.Context, pbm sdk.Client, d *deleteBcpOpts) (sd
111111
if err != nil {
112112
return sdk.NoOpID, errors.Wrap(err, "parse --older-than")
113113
}
114+
if n := time.Now().UTC(); ts.T > uint32(n.Unix()) {
115+
providedTime := time.Unix(int64(ts.T), 0).UTC().Format(time.RFC3339)
116+
realTime := n.Format(time.RFC3339)
117+
return sdk.NoOpID, errors.Errorf("--older-than %q is after now %q", providedTime, realTime)
118+
}
114119

115120
bcpType := sdk.ParseBackupType(d.bcpType)
116121
backups, err := sdk.ListDeleteBackupBefore(ctx, pbm, ts, bcpType)
@@ -189,6 +194,11 @@ func deletePITR(
189194
if err != nil {
190195
return nil, errors.Wrap(err, "parse --older-then")
191196
}
197+
if n := time.Now().UTC(); ts.T > uint32(n.Unix()) {
198+
providedTime := time.Unix(int64(ts.T), 0).UTC().Format(time.RFC3339)
199+
realTime := n.Format(time.RFC3339)
200+
return nil, errors.Errorf("--older-than %q is after now %q", providedTime, realTime)
201+
}
192202
}
193203
cid, err := pbm.DeleteOplogRange(ctx, ts)
194204
if err != nil {
@@ -214,6 +224,12 @@ func doCleanup(ctx context.Context, conn connect.Client, pbm sdk.Client, d *clea
214224
if err != nil {
215225
return nil, errors.Wrap(err, "parse --older-than")
216226
}
227+
if n := time.Now().UTC(); ts.T > uint32(n.Unix()) {
228+
providedTime := time.Unix(int64(ts.T), 0).UTC().Format(time.RFC3339)
229+
realTime := n.Format(time.RFC3339)
230+
return nil, errors.Errorf("--older-than %q is after now %q", providedTime, realTime)
231+
}
232+
217233
info, err := pbm.CleanupReport(ctx, ts)
218234
if err != nil {
219235
return nil, errors.Wrap(err, "make cleanup report")

pbm/backup/backup.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,6 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l
314314
}
315315

316316
if inf.IsLeader() {
317-
var epch config.Epoch
318-
epch, err = config.ResetEpochWithContext(ctx, b.leadConn)
319-
if err != nil {
320-
l.Error("reset epoch")
321-
} else {
322-
l.Debug("epoch set to %v", epch)
323-
}
324-
325317
err = b.reconcileStatus(ctx, bcp.Name, opid.String(), defs.StatusDone, nil)
326318
if err != nil {
327319
return errors.Wrap(err, "check cluster for backup done")

pbm/backup/delete.go

Lines changed: 73 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func CanDeleteIncrementalChain(
142142
if base.Status.IsRunning() {
143143
return ErrBackupInProgress
144144
}
145-
if base.Type == defs.IncrementalBackup {
145+
if base.Type != defs.IncrementalBackup {
146146
return ErrNonIncrementalBackup
147147
}
148148
if base.SrcBackup != "" {
@@ -269,7 +269,8 @@ func isRequiredForOplogSlicing(ctx context.Context, cc connect.Client, lw primit
269269
if err != nil {
270270
return false, errors.Wrap(err, "get oplog range from previous backup")
271271
}
272-
if len(timelines) == 1 && timelines[0].Start <= prevRestoreTime.T {
272+
// check if there is a gap (missed ops) in oplog range between previous and following backup restore_to time
273+
if len(timelines) != 1 || prevRestoreTime.T >= timelines[0].Start {
273274
return false, nil
274275
}
275276

@@ -326,7 +327,11 @@ func ListDeleteBackupBefore(
326327
}
327328

328329
pred := func(m *BackupMeta) bool { return m.Type == bcpType }
329-
if bcpType == SelectiveBackup {
330+
if bcpType == defs.LogicalBackup {
331+
pred = func(m *BackupMeta) bool {
332+
return m.Type == defs.LogicalBackup && !util.IsSelective(m.Namespaces)
333+
}
334+
} else if bcpType == SelectiveBackup {
330335
pred = func(m *BackupMeta) bool { return util.IsSelective(m.Namespaces) }
331336
}
332337

@@ -345,71 +350,85 @@ func MakeCleanupInfo(ctx context.Context, conn connect.Client, ts primitive.Time
345350
if err != nil {
346351
return CleanupInfo{}, errors.Wrap(err, "list backups before")
347352
}
348-
349-
exclude := true
350-
if l := len(backups) - 1; l != -1 && backups[l].LastWriteTS.T == ts.T {
351-
// there is a backup at the `ts`
352-
if isValidBaseSnapshot(&backups[l]) {
353-
// it can be used to fully restore data to the `ts` state.
354-
// no need to exclude any base snapshot and chunks before the `ts`
355-
exclude = false
356-
}
357-
// the backup is not considered to be deleted.
358-
// used only for `exclude` value
359-
backups = backups[:l]
360-
}
361-
362-
// exclude the last incremental backups if it is required for following (after the `ts`)
363-
backups, err = extractLastIncrementalChain(ctx, conn, backups)
364-
if err != nil {
365-
return CleanupInfo{}, errors.Wrap(err, "extract last incremental chain")
366-
}
367-
368353
chunks, err := listChunksBefore(ctx, conn, ts)
369354
if err != nil {
370355
return CleanupInfo{}, errors.Wrap(err, "list chunks before")
371356
}
372-
if !exclude {
373-
// all chunks can be deleted. there is a backup to fully restore data
357+
if len(backups) == 0 {
374358
return CleanupInfo{Backups: backups, Chunks: chunks}, nil
375359
}
376360

377-
// the following check is needed for "delete all" special case.
378-
// if there is no base snapshot after `ts` and PITR is running,
379-
// the last base snapshot before `ts` should be excluded.
380-
// otherwise, it is allowed to delete everything before `ts`
381-
required, err := isRequiredForOplogSlicing(ctx, conn, ts)
382-
if err != nil {
383-
return CleanupInfo{}, err
384-
}
385-
if !required {
386-
return CleanupInfo{Backups: backups, Chunks: chunks}, nil
361+
if r := &backups[len(backups)-1]; r.LastWriteTS.T == ts.T {
362+
// there is a backup at the `ts`
363+
backups = backups[:len(backups)-1]
364+
365+
if isValidBaseSnapshot(r) {
366+
// it can be used to fully restore data to the `ts` state.
367+
// no need to exclude any backups and chunks before the `ts`.
368+
// except increments that is base for following increment (after `ts`) must be excluded
369+
backups, err = extractLastIncrementalChain(ctx, conn, backups)
370+
if err != nil {
371+
return CleanupInfo{}, errors.Wrap(err, "extract last incremental chain")
372+
}
373+
374+
return CleanupInfo{Backups: backups, Chunks: chunks}, nil
375+
}
387376
}
388377

389-
// the `baseIndex` could be the base snapshot index for PITR to the `ts`
390-
// or for currently running PITR
391-
baseIndex := findLastBaseSnapshotIndex(backups)
378+
baseIndex := len(backups) - 1
379+
for ; baseIndex != -1; baseIndex-- {
380+
if isValidBaseSnapshot(&backups[baseIndex]) {
381+
break
382+
}
383+
}
392384
if baseIndex == -1 {
393-
// nothing to keep
385+
// no valid base snapshot to exclude
394386
return CleanupInfo{Backups: backups, Chunks: chunks}, nil
395387
}
396388

397-
excluded := false
398-
origin := chunks
399-
chunks = []oplog.OplogChunk{}
400-
for i := range origin {
401-
if backups[baseIndex].LastWriteTS.Compare(origin[i].EndTS) != -1 {
402-
chunks = append(chunks, origin[i])
389+
beforeChunks := []oplog.OplogChunk{}
390+
afterChunks := []oplog.OplogChunk{}
391+
for _, chunk := range chunks {
392+
if chunk.EndTS.Before(backups[baseIndex].LastWriteTS) {
393+
beforeChunks = append(beforeChunks, chunk)
403394
} else {
404-
excluded = true
395+
// keep chunks after the last base snapshot restore time
396+
// the backup may be excluded
397+
afterChunks = append(afterChunks, chunk)
405398
}
406399
}
407400

408-
// if excluded is false, the last found base snapshot is not used for PITR
409-
// no need to keep it. otherwise, should be excluded
410-
if excluded {
401+
if oplog.HasSingleTimelineToCover(afterChunks, backups[baseIndex].LastWriteTS.T, ts.T) {
402+
// there is single continuous oplog range between snapshot and ts.
403+
// keep the backup and chunks to be able to restore to the ts
411404
copy(backups[baseIndex:], backups[baseIndex+1:])
412405
backups = backups[:len(backups)-1]
406+
chunks = beforeChunks
407+
} else {
408+
// no chunks yet but if PITR is ON, the backup can be base snapshot for it
409+
enabled, oplogOnly, err := config.IsPITREnabled(ctx, conn)
410+
if err != nil {
411+
return CleanupInfo{}, errors.Wrap(err, "get PITR status")
412+
}
413+
if enabled && !oplogOnly {
414+
nextRestoreTime, err := FindBaseSnapshotLWAfter(ctx, conn, ts)
415+
if err != nil {
416+
return CleanupInfo{}, errors.Wrap(err, "find next snapshot")
417+
}
418+
419+
if nextRestoreTime.IsZero() {
420+
// it is not the last base snapshot for PITR
421+
copy(backups[baseIndex:], backups[baseIndex+1:])
422+
backups = backups[:len(backups)-1]
423+
chunks = beforeChunks
424+
}
425+
}
426+
}
427+
428+
// exclude increments that is base for following increment (after `ts`)
429+
backups, err = extractLastIncrementalChain(ctx, conn, backups)
430+
if err != nil {
431+
return CleanupInfo{}, errors.Wrap(err, "extract last incremental chain")
413432
}
414433

415434
return CleanupInfo{Backups: backups, Chunks: chunks}, nil
@@ -471,6 +490,10 @@ func extractLastIncrementalChain(
471490
return bcps, nil
472491
}
473492

493+
return extractIncrementalChain(bcps, i), nil
494+
}
495+
496+
func extractIncrementalChain(bcps []BackupMeta, i int) []BackupMeta {
474497
for base := bcps[i].Name; i != -1; i-- {
475498
if bcps[i].Status != defs.StatusDone {
476499
continue
@@ -490,15 +513,5 @@ func extractLastIncrementalChain(
490513
}
491514
}
492515

493-
return bcps, nil
494-
}
495-
496-
func findLastBaseSnapshotIndex(bcps []BackupMeta) int {
497-
for i := len(bcps) - 1; i != -1; i-- {
498-
if isValidBaseSnapshot(&bcps[i]) {
499-
return i
500-
}
501-
}
502-
503-
return -1
516+
return bcps
504517
}

0 commit comments

Comments
 (0)