Skip to content

Commit b99501f

Browse files
authored
[PBM-1246] improve delete-pitr command (#910)
1 parent 444c6d7 commit b99501f

File tree

7 files changed

+186
-147
lines changed

7 files changed

+186
-147
lines changed

cmd/pbm-agent/delete.go

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ import (
55
"runtime"
66
"time"
77

8+
"go.mongodb.org/mongo-driver/bson"
89
"go.mongodb.org/mongo-driver/bson/primitive"
910
"golang.org/x/sync/errgroup"
1011

1112
"github.com/percona/percona-backup-mongodb/pbm/backup"
1213
"github.com/percona/percona-backup-mongodb/pbm/config"
14+
"github.com/percona/percona-backup-mongodb/pbm/connect"
1315
"github.com/percona/percona-backup-mongodb/pbm/ctrl"
1416
"github.com/percona/percona-backup-mongodb/pbm/errors"
1517
"github.com/percona/percona-backup-mongodb/pbm/lock"
1618
"github.com/percona/percona-backup-mongodb/pbm/log"
17-
"github.com/percona/percona-backup-mongodb/pbm/oplog/oplogtmp"
19+
"github.com/percona/percona-backup-mongodb/pbm/oplog"
1820
"github.com/percona/percona-backup-mongodb/pbm/resync"
21+
"github.com/percona/percona-backup-mongodb/pbm/storage"
1922
"github.com/percona/percona-backup-mongodb/pbm/topo"
2023
"github.com/percona/percona-backup-mongodb/pbm/util"
2124
)
@@ -164,34 +167,28 @@ func (a *Agent) DeletePITR(ctx context.Context, d *ctrl.DeletePITRCmd, opid ctrl
164167
}
165168
}()
166169

167-
if d.OlderThan > 0 {
168-
t := time.Unix(d.OlderThan, 0).UTC()
169-
obj := t.Format("2006-01-02T15:04:05Z")
170+
ct, err := topo.ClusterTimeFromNodeInfo(nodeInfo)
171+
if err != nil {
172+
l.Error("get cluster time: %v", err)
173+
return
174+
}
170175

171-
l = logger.NewEvent(string(ctrl.CmdDeletePITR), obj, opid.String(), ep.TS())
172-
ctx := log.SetLogEventToContext(ctx, l)
176+
t := time.Unix(d.OlderThan, 0).UTC()
177+
obj := t.Format("2006-01-02T15:04:05Z")
173178

174-
var ct primitive.Timestamp
175-
ct, err = topo.GetClusterTime(ctx, a.leadConn)
176-
if err != nil {
177-
l.Error("get cluster time: %v", err)
178-
return
179-
}
180-
if d.OlderThan > int64(ct.T) {
181-
providedTime := t.Format(time.RFC3339)
182-
realTime := time.Unix(int64(ct.T), 0).UTC().Format(time.RFC3339)
183-
l.Error("provided time %q is after now %q", providedTime, realTime)
184-
return
185-
}
179+
l = logger.NewEvent(string(ctrl.CmdDeletePITR), obj, opid.String(), ep.TS())
180+
ctx = log.SetLogEventToContext(ctx, l)
186181

187-
l.Info("deleting pitr chunks older than %v", t)
188-
err = oplogtmp.DeletePITR(ctx, a.leadConn, &t, l)
189-
} else {
190-
l = logger.NewEvent(string(ctrl.CmdDeletePITR), "_all_", opid.String(), ep.TS())
191-
ctx := log.SetLogEventToContext(ctx, l)
192-
l.Info("deleting all pitr chunks")
193-
err = oplogtmp.DeletePITR(ctx, a.leadConn, nil, l)
182+
if d.OlderThan > int64(ct.T) {
183+
providedTime := t.Format(time.RFC3339)
184+
realTime := time.Unix(int64(ct.T), 0).UTC().Format(time.RFC3339)
185+
l.Error("provided time %q is after now %q", providedTime, realTime)
186+
return
194187
}
188+
189+
ts := primitive.Timestamp{T: uint32(t.Unix())}
190+
l.Info("deleting pitr chunks older than %v", t)
191+
err = deletePITRImpl(ctx, a.leadConn, ts)
195192
if err != nil {
196193
l.Error("deleting: %v", err)
197194
return
@@ -301,3 +298,73 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
301298
l.Error("storage resync: " + err.Error())
302299
}
303300
}
301+
302+
func deletePITRImpl(ctx context.Context, conn connect.Client, ts primitive.Timestamp) error {
303+
l := log.LogEventFromContext(ctx)
304+
305+
cfg, err := config.GetConfig(ctx, conn)
306+
if err != nil {
307+
return errors.Wrap(err, "get config")
308+
}
309+
310+
if cfg.PITR.Enabled && !cfg.PITR.OplogOnly {
311+
ct, err := topo.GetClusterTime(ctx, conn)
312+
if err != nil {
313+
return errors.Wrap(err, "get cluster time")
314+
}
315+
316+
lw, err := backup.FindBaseSnapshotLWBefore(ctx, conn, ct, primitive.Timestamp{})
317+
if err != nil {
318+
return errors.Wrap(err, "find previous snapshot")
319+
}
320+
if !lw.IsZero() {
321+
if lw.T < ts.T || (lw.T == ts.T && (ts.I == 0 || lw.I < ts.I)) {
322+
ts = lw
323+
}
324+
}
325+
}
326+
327+
chunks, err := oplog.ListDeleteChunksBefore(ctx, conn, ts)
328+
if err != nil {
329+
return errors.Wrap(err, "get pitr chunks")
330+
}
331+
if len(chunks) == 0 {
332+
l.Debug("nothing to delete")
333+
return nil
334+
}
335+
336+
stg, err := util.StorageFromConfig(cfg.Storage, l)
337+
if err != nil {
338+
return errors.Wrap(err, "get storage")
339+
}
340+
341+
return deleteChunks(ctx, conn, stg, chunks)
342+
}
343+
344+
func deleteChunks(ctx context.Context, m connect.Client, stg storage.Storage, chunks []oplog.OplogChunk) error {
345+
l := log.LogEventFromContext(ctx)
346+
347+
for _, chnk := range chunks {
348+
err := stg.Delete(chnk.FName)
349+
if err != nil && !errors.Is(err, storage.ErrNotExist) {
350+
return errors.Wrapf(err, "delete pitr chunk '%s' (%v) from storage", chnk.FName, chnk)
351+
}
352+
353+
_, err = m.PITRChunksCollection().DeleteOne(
354+
ctx,
355+
bson.D{
356+
{"rs", chnk.RS},
357+
{"start_ts", chnk.StartTS},
358+
{"end_ts", chnk.EndTS},
359+
},
360+
)
361+
362+
if err != nil {
363+
return errors.Wrap(err, "delete pitr chunk metadata")
364+
}
365+
366+
l.Debug("deleted %s", chnk.FName)
367+
}
368+
369+
return nil
370+
}

cmd/pbm/delete.go

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.mongodb.org/mongo-driver/bson/primitive"
1313

1414
"github.com/percona/percona-backup-mongodb/pbm/backup"
15+
"github.com/percona/percona-backup-mongodb/pbm/config"
1516
"github.com/percona/percona-backup-mongodb/pbm/connect"
1617
"github.com/percona/percona-backup-mongodb/pbm/ctrl"
1718
"github.com/percona/percona-backup-mongodb/pbm/defs"
@@ -33,7 +34,6 @@ func deleteBackup(
3334
conn connect.Client,
3435
pbm sdk.Client,
3536
d *deleteBcpOpts,
36-
outf outFormat,
3737
) (fmt.Stringer, error) {
3838
if d.name == "" && d.olderThan == "" {
3939
return nil, errors.New("either --name or --older-than should be set")
@@ -59,7 +59,7 @@ func deleteBackup(
5959
return nil, err
6060
}
6161

62-
if outf != outText || d.dryRun {
62+
if d.dryRun {
6363
return nil, nil
6464
}
6565

@@ -153,22 +153,72 @@ type deletePitrOpts struct {
153153
olderThan string
154154
yes bool
155155
all bool
156+
wait bool
157+
dryRun bool
156158
}
157159

158160
func deletePITR(
159161
ctx context.Context,
160162
conn connect.Client,
161163
pbm sdk.Client,
162164
d *deletePitrOpts,
163-
outf outFormat,
164165
) (fmt.Stringer, error) {
166+
if d.olderThan == "" && !d.all {
167+
return nil, errors.New("either --older-than or --all should be set")
168+
}
165169
if d.olderThan != "" && d.all {
166170
return nil, errors.New("cannot use --older-then and --all at the same command")
167171
}
168-
if !d.all && d.olderThan == "" {
169-
return nil, errors.New("either --older-than or --all should be set")
172+
173+
var until primitive.Timestamp
174+
if d.all {
175+
until = primitive.Timestamp{T: uint32(time.Now().UTC().Unix())}
176+
} else {
177+
var err error
178+
until, err = parseOlderThan(d.olderThan)
179+
if err != nil {
180+
return nil, errors.Wrap(err, "parse --older-then")
181+
}
182+
183+
now := time.Now().UTC()
184+
if until.T > uint32(now.Unix()) {
185+
providedTime := time.Unix(int64(until.T), 0).UTC().Format(time.RFC3339)
186+
realTime := now.Format(time.RFC3339)
187+
return nil, errors.Errorf("--older-than %q is after now %q", providedTime, realTime)
188+
}
189+
}
190+
191+
enabled, oplogOnly, err := config.IsPITREnabled(ctx, conn)
192+
if err != nil {
193+
return nil, errors.Wrap(err, "check pitr status")
170194
}
171195

196+
if enabled && !oplogOnly {
197+
lw, err := backup.FindBaseSnapshotLWBefore(ctx,
198+
conn, primitive.Timestamp{T: uint32(time.Now().UTC().Unix())}, primitive.Timestamp{})
199+
if err != nil {
200+
return nil, errors.Wrap(err, "find previous snapshot")
201+
}
202+
if !lw.IsZero() {
203+
if lw.T < until.T || (lw.T == until.T && (until.I == 0 || lw.I < until.I)) {
204+
until = lw
205+
}
206+
}
207+
}
208+
209+
chunks, err := sdk.ListDeleteChunksBefore(ctx, pbm, until)
210+
if err != nil {
211+
return nil, errors.Wrap(err, "list chunks")
212+
}
213+
if len(chunks) == 0 {
214+
return outMsg{"nothing to delete"}, nil
215+
}
216+
217+
printDeleteInfoTo(os.Stdout, nil, chunks)
218+
219+
if d.dryRun {
220+
return nil, nil
221+
}
172222
if !d.yes {
173223
q := "Are you sure you want to delete chunks?"
174224
if d.all {
@@ -182,26 +232,13 @@ func deletePITR(
182232
}
183233
}
184234

185-
var ts primitive.Timestamp
186-
if d.olderThan != "" {
187-
var err error
188-
ts, err = parseOlderThan(d.olderThan)
189-
if err != nil {
190-
return nil, errors.Wrap(err, "parse --older-then")
191-
}
192-
if n := time.Now().UTC(); ts.T > uint32(n.Unix()) {
193-
providedTime := time.Unix(int64(ts.T), 0).UTC().Format(time.RFC3339)
194-
realTime := n.Format(time.RFC3339)
195-
return nil, errors.Errorf("--older-than %q is after now %q", providedTime, realTime)
196-
}
197-
}
198-
cid, err := pbm.DeleteOplogRange(ctx, ts)
235+
cid, err := pbm.DeleteOplogRange(ctx, until)
199236
if err != nil {
200237
return nil, errors.Wrap(err, "schedule pitr delete")
201238
}
202239

203-
if outf != outText {
204-
return nil, nil
240+
if !d.wait {
241+
return outMsg{"Processing by agents. Please check status later"}, nil
205242
}
206243

207244
return waitForDelete(ctx, conn, pbm, cid)
@@ -314,7 +351,7 @@ func printDeleteInfoTo(w io.Writer, backups []backup.BackupMeta, chunks []oplog.
314351
}
315352

316353
restoreTime := time.Unix(int64(bcp.LastWriteTS.T), 0).UTC().Format(time.RFC3339)
317-
fmt.Fprintf(w, " - %q [size: %s type: <%s>, restore time: %s]",
354+
fmt.Fprintf(w, " - %q [size: %s type: <%s>, restore time: %s]\n",
318355
bcp.Name, fmtSize(bcp.Size), t, restoreTime)
319356
}
320357
}

cmd/pbm/main.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8-
stdlog "log"
98
"os"
109
"strings"
1110
"time"
@@ -264,7 +263,7 @@ func main() {
264263
datetimeFormat,
265264
dateFormat)).
266265
StringVar(&deletePitr.olderThan)
267-
deletePitrCmd.Flag("all", "Delete all chunks").
266+
deletePitrCmd.Flag("all", `Delete all chunks (deprecated). Use --older-than="0d"`).
268267
Short('a').
269268
BoolVar(&deletePitr.all)
270269
deletePitrCmd.Flag("yes", "Don't ask for confirmation").
@@ -273,6 +272,11 @@ func main() {
273272
deletePitrCmd.Flag("force", "Don't ask for confirmation (deprecated)").
274273
Short('f').
275274
BoolVar(&deletePitr.yes)
275+
deletePitrCmd.Flag("wait", "Wait for deletion done").
276+
Short('w').
277+
BoolVar(&deletePitr.wait)
278+
deletePitrCmd.Flag("dry-run", "Report but do not delete").
279+
BoolVar(&deletePitr.dryRun)
276280

277281
cleanupCmd := pbmCmd.Command("cleanup", "Delete Backups and PITR chunks")
278282
cleanupOpts := cleanupOptions{}
@@ -324,7 +328,7 @@ func main() {
324328
BoolVar(&logs.extr)
325329

326330
statusOpts := statusOptions{}
327-
statusCmd := pbmCmd.Command("status", "Show PBM status")
331+
statusCmd := pbmCmd.Command("status", "Show PBM status").Alias("s")
328332
statusCmd.Flag(RSMappingFlag, RSMappingDoc).
329333
Envar(RSMappingEnvVar).
330334
StringVar(&statusOpts.rsMap)
@@ -386,7 +390,8 @@ func main() {
386390

387391
ver, err := version.GetMongoVersion(ctx, conn.MongoClient())
388392
if err != nil {
389-
stdlog.Fatalf("get mongo version: %v", err)
393+
fmt.Fprintf(os.Stderr, "get mongo version: %v", err)
394+
os.Exit(1)
390395
}
391396
if err := version.FeatureSupport(ver).PBMSupport(); err != nil {
392397
fmt.Fprintf(os.Stderr, "WARNING: %v\n", err)
@@ -420,9 +425,9 @@ func main() {
420425
case listCmd.FullCommand():
421426
out, err = runList(ctx, conn, pbm, &list)
422427
case deleteBcpCmd.FullCommand():
423-
out, err = deleteBackup(ctx, conn, pbm, &deleteBcp, pbmOutF)
428+
out, err = deleteBackup(ctx, conn, pbm, &deleteBcp)
424429
case deletePitrCmd.FullCommand():
425-
out, err = deletePITR(ctx, conn, pbm, &deletePitr, pbmOutF)
430+
out, err = deletePITR(ctx, conn, pbm, &deletePitr)
426431
case cleanupCmd.FullCommand():
427432
out, err = doCleanup(ctx, conn, pbm, &cleanupOpts)
428433
case logsCmd.FullCommand():

pbm/oplog/chunk.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,3 +491,16 @@ func HasSingleTimelineToCover(chunks []OplogChunk, from, till uint32) bool {
491491

492492
return false
493493
}
494+
495+
func ListDeleteChunksBefore(ctx context.Context, conn connect.Client, ts primitive.Timestamp) ([]OplogChunk, error) {
496+
f := bson.D{{"end_ts", bson.M{"$lt": ts}}}
497+
o := options.Find().SetSort(bson.D{{"start_ts", 1}})
498+
cur, err := conn.PITRChunksCollection().Find(ctx, f, o)
499+
if err != nil {
500+
return nil, errors.Wrap(err, "query")
501+
}
502+
503+
rv := []OplogChunk{}
504+
err = cur.All(ctx, &rv)
505+
return rv, errors.Wrap(err, "cursor: all")
506+
}

0 commit comments

Comments
 (0)