Skip to content

Commit f7f370c

Browse files
authored
PBM-1422: Adjust Oplog and PITR to support cloned collection (#1046)
* Remove guard for cloning ns with PITR * Expand oplog options with cloning namespace * Add logic for getting UUID from namespace * Add oplog entries cloning functionality * Add guard for backup without source cloning collections * Fix transaction handling for cloning namespace
1 parent 79478b1 commit f7f370c

File tree

5 files changed

+167
-33
lines changed

5 files changed

+167
-33
lines changed

cmd/pbm/restore.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -798,10 +798,6 @@ func validateNSFromNSTo(o *restoreOpts) error {
798798
if o.nsFrom != "" && o.nsTo != "" && o.usersAndRoles {
799799
return ErrCloningWithUAndR
800800
}
801-
if o.nsFrom != "" && o.nsTo != "" && o.pitr != "" {
802-
// this check will be removed with: PBM-1422
803-
return ErrCloningWithPITR
804-
}
805801
if strings.Contains(o.nsTo, "*") || strings.Contains(o.nsFrom, "*") {
806802
return ErrCloningWithWildCards
807803
}

cmd/pbm/restore_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,6 @@ func TestCloningValidation(t *testing.T) {
4343
},
4444
wantErr: ErrCloningWithUAndR,
4545
},
46-
{
47-
desc: "cloning with PITR is not allowed",
48-
opts: restoreOpts{
49-
nsFrom: "d.c1",
50-
nsTo: "d.c2",
51-
pitr: "2024-10-27T11:23:30",
52-
},
53-
wantErr: ErrCloningWithPITR,
54-
},
5546
{
5647
desc: "cloning with wild cards within nsFrom",
5748
opts: restoreOpts{

pbm/oplog/restore.go

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ var selectedNSSupportedCommands = map[string]struct{}{
8181
"commitIndexBuild": {},
8282
}
8383

84+
var cloningNSSupportedCommands = map[string]struct{}{
85+
"createIndexes": {},
86+
"deleteIndex": {},
87+
"deleteIndexes": {},
88+
"dropIndex": {},
89+
"dropIndexes": {},
90+
"commitIndexBuild": {},
91+
}
92+
8493
var dontPreserveUUID = []string{
8594
"admin.system.users",
8695
"admin.system.roles",
@@ -89,6 +98,18 @@ var dontPreserveUUID = []string{
8998
"*.system.views", // timeseries
9099
}
91100

101+
var ErrNoCloningNamespace = errors.New("cloning namespace desn't exist")
102+
103+
// cloneNS has all data related to cloning namespace within oplog
104+
type cloneNS struct {
105+
snapshot.CloneNS
106+
toUUID primitive.Binary
107+
}
108+
109+
func (c *cloneNS) SetNSPair(nsPair snapshot.CloneNS) {
110+
c.CloneNS = nsPair
111+
}
112+
92113
// OplogRestore is the oplog applyer
93114
type OplogRestore struct {
94115
dst *mongo.Client
@@ -120,7 +141,8 @@ type OplogRestore struct {
120141

121142
unsafe bool
122143

123-
filter OpFilter
144+
filter OpFilter
145+
cloneNS cloneNS
124146
}
125147

126148
const saveLastDistTxns = 100
@@ -196,6 +218,7 @@ func (o *OplogRestore) Apply(src io.ReadCloser) (primitive.Timestamp, error) {
196218
defer bsonSource.Close()
197219

198220
var lts primitive.Timestamp
221+
199222
for {
200223
rawOplogEntry := bsonSource.LoadNext()
201224
if rawOplogEntry == nil {
@@ -257,6 +280,27 @@ func (o *OplogRestore) SetIncludeNS(nss []string) {
257280
o.includeNS = dbs
258281
}
259282

283+
// SetCloneNS sets all needed data for cloning namespace:
284+
// collection names and target namespace UUID
285+
func (o *OplogRestore) SetCloneNS(ctx context.Context, ns snapshot.CloneNS) error {
286+
if !ns.IsSpecified() {
287+
return nil
288+
}
289+
290+
o.cloneNS.SetNSPair(ns)
291+
292+
var err error
293+
o.cloneNS.toUUID, err = getUUIDForNS(ctx, o.dst, o.cloneNS.ToNS)
294+
if err != nil {
295+
return errors.Wrap(err, "get to ns uuid")
296+
}
297+
if o.cloneNS.toUUID.IsZero() {
298+
return ErrNoCloningNamespace
299+
}
300+
301+
return nil
302+
}
303+
260304
func isOpAllowed(oe *Record) bool {
261305
coll, ok := strings.CutPrefix(oe.Namespace, "config.")
262306
if !ok {
@@ -310,6 +354,50 @@ func (o *OplogRestore) isOpSelected(oe *Record) bool {
310354
return false
311355
}
312356

357+
// isOpForCloning returns whether op needs to be processed or not in case of cloning NS.
358+
// In case of non cloning use case, it's always true.
359+
func (o *OplogRestore) isOpForCloning(oe *db.Oplog) bool {
360+
if !o.cloneNS.IsSpecified() {
361+
return true
362+
}
363+
364+
// i, u, d ops for cloning ns
365+
if oe.Namespace == o.cloneNS.FromNS {
366+
return true
367+
}
368+
369+
// filter out all other i, u, d ops
370+
if oe.Operation != "c" {
371+
return false
372+
}
373+
374+
db, coll, _ := strings.Cut(oe.Namespace, ".")
375+
if coll != "$cmd" {
376+
return false
377+
}
378+
379+
cmd := oe.Object[0].Key
380+
if cmd == "applyOps" {
381+
return true // internal ops of applyOps are checked one by one later
382+
}
383+
384+
cloneFromDB, cloneFromColl, _ := strings.Cut(o.cloneNS.FromNS, ".")
385+
if db != cloneFromDB {
386+
// it's command not relevant for db to clone from
387+
return false
388+
}
389+
390+
if _, ok := cloningNSSupportedCommands[cmd]; ok {
391+
// check if command targets collection
392+
collForCmd, _ := oe.Object[0].Value.(string)
393+
if collForCmd == cloneFromColl {
394+
return true
395+
}
396+
}
397+
398+
return false
399+
}
400+
313401
func (o *OplogRestore) isOpExcluded(oe *Record) bool {
314402
if o.excludeNS == nil {
315403
return false
@@ -360,7 +448,8 @@ func (o *OplogRestore) handleOp(oe db.Oplog) error {
360448
return nil
361449
}
362450

363-
if o.isOpExcluded(&oe) || !isOpAllowed(&oe) || !o.isOpSelected(&oe) {
451+
if o.isOpExcluded(&oe) || !isOpAllowed(&oe) ||
452+
!o.isOpSelected(&oe) || !o.isOpForCloning(&oe) {
364453
return nil
365454
}
366455

@@ -676,10 +765,21 @@ func (o *OplogRestore) HandleUncommittedTxn(
676765
return partial, uncommitted, nil
677766
}
678767

768+
func (o *OplogRestore) cloneEntry(op *db.Oplog) {
769+
if !o.cloneNS.IsSpecified() {
770+
return
771+
}
772+
if op.Namespace == o.cloneNS.FromNS {
773+
*op.UI = o.cloneNS.toUUID
774+
op.Namespace = o.cloneNS.ToNS
775+
}
776+
}
777+
679778
func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {
680779
// have to handle it here one more time because before the op gets thru
681780
// txnBuffer its namespace is `collection.$cmd` instead of the real one
682-
if o.isOpExcluded(&op) || !isOpAllowed(&op) || !o.isOpSelected(&op) {
781+
if o.isOpExcluded(&op) || !isOpAllowed(&op) ||
782+
!o.isOpSelected(&op) || !o.isOpForCloning(&op) {
683783
return nil
684784
}
685785

@@ -688,6 +788,8 @@ func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {
688788
return errors.Wrap(err, "filtering UUIDs from oplog")
689789
}
690790

791+
o.cloneEntry(&op)
792+
691793
dbName, collName, _ := strings.Cut(op.Namespace, ".")
692794
if op.Operation == "c" {
693795
if len(op.Object) == 0 {
@@ -1125,3 +1227,29 @@ func isTruthy(val interface{}) bool {
11251227
func isFalsy(val interface{}) bool {
11261228
return !isTruthy(val)
11271229
}
1230+
1231+
// getUUIDForNS ruturns UUID of existing collection.
1232+
// When ns doesn't exist, it returns zero value without an error.
1233+
// In case of error, it returns zero value for UUID in addition to error.
1234+
func getUUIDForNS(ctx context.Context, m *mongo.Client, ns string) (primitive.Binary, error) {
1235+
var uuid primitive.Binary
1236+
1237+
d, c, _ := strings.Cut(ns, ".")
1238+
cur, err := m.Database(d).ListCollections(ctx, bson.D{{"name", c}})
1239+
if err != nil {
1240+
return uuid, errors.Wrap(err, "list collections")
1241+
}
1242+
defer cur.Close(ctx)
1243+
1244+
for cur.Next(ctx) {
1245+
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
1246+
uuid = primitive.Binary{
1247+
Subtype: subtype,
1248+
Data: data,
1249+
}
1250+
break
1251+
}
1252+
}
1253+
1254+
return uuid, errors.Wrap(cur.Err(), "list collections cursor")
1255+
}

pbm/restore/logical.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -256,20 +256,19 @@ func (r *Restore) Snapshot(
256256
oplogRanges := []oplogRange{
257257
{chunks: chunks, storage: r.bcpStg},
258258
}
259-
oplogOption := &applyOplogOption{end: &bcp.LastWriteTS, nss: nss}
259+
oplogOption := &applyOplogOption{
260+
end: &bcp.LastWriteTS,
261+
nss: nss,
262+
cloudNS: cloneNS,
263+
}
260264
if r.nodeInfo.IsConfigSrv() && util.IsSelective(nss) {
261265
oplogOption.nss = []string{"config.databases"}
262266
oplogOption.filter = newConfigsvrOpFilter(nss)
263267
}
264-
if cloneNS.IsSpecified() {
265-
// oplog doesn't need to be applied when cloning ns
266-
// this restriction will be removed during PBM-1422
267-
l.Debug("applying oplog is skipped when cloning collection")
268-
} else {
269-
err = r.applyOplog(ctx, oplogRanges, oplogOption)
270-
if err != nil {
271-
return err
272-
}
268+
269+
err = r.applyOplog(ctx, oplogRanges, oplogOption)
270+
if err != nil {
271+
return err
273272
}
274273

275274
err = r.restoreIndexes(ctx, oplogOption.nss, cloneNS)
@@ -424,7 +423,11 @@ func (r *Restore) PITR(
424423
{chunks: bcpChunks, storage: r.bcpStg},
425424
{chunks: chunks, storage: r.oplogStg},
426425
}
427-
oplogOption := applyOplogOption{end: &cmd.OplogTS, nss: nss}
426+
oplogOption := applyOplogOption{
427+
end: &cmd.OplogTS,
428+
nss: nss,
429+
cloudNS: cloneNS,
430+
}
428431
if r.nodeInfo.IsConfigSrv() && util.IsSelective(nss) {
429432
oplogOption.nss = []string{"config.databases"}
430433
oplogOption.filter = newConfigsvrOpFilter(nss)

pbm/restore/restore.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/percona/percona-backup-mongodb/pbm/log"
2121
"github.com/percona/percona-backup-mongodb/pbm/oplog"
2222
"github.com/percona/percona-backup-mongodb/pbm/restore/phys"
23+
"github.com/percona/percona-backup-mongodb/pbm/snapshot"
2324
"github.com/percona/percona-backup-mongodb/pbm/storage"
2425
"github.com/percona/percona-backup-mongodb/pbm/topo"
2526
"github.com/percona/percona-backup-mongodb/pbm/util"
@@ -286,11 +287,12 @@ func chunks(
286287
}
287288

288289
type applyOplogOption struct {
289-
start *primitive.Timestamp
290-
end *primitive.Timestamp
291-
nss []string
292-
unsafe bool
293-
filter oplog.OpFilter
290+
start *primitive.Timestamp
291+
end *primitive.Timestamp
292+
nss []string
293+
cloudNS snapshot.CloneNS
294+
unsafe bool
295+
filter oplog.OpFilter
294296
}
295297

296298
type (
@@ -341,7 +343,14 @@ func applyOplog(
341343
txnSyncErr chan error
342344
)
343345

344-
oplogRestore, err := oplog.NewOplogRestore(node, ic, mgoV, options.unsafe, true, ctxn, txnSyncErr)
346+
oplogRestore, err := oplog.NewOplogRestore(
347+
node,
348+
ic,
349+
mgoV,
350+
options.unsafe,
351+
true,
352+
ctxn,
353+
txnSyncErr)
345354
if err != nil {
346355
return nil, errors.Wrap(err, "create oplog")
347356
}
@@ -357,6 +366,13 @@ func applyOplog(
357366
}
358367
oplogRestore.SetTimeframe(startTS, endTS)
359368
oplogRestore.SetIncludeNS(options.nss)
369+
err = oplogRestore.SetCloneNS(ctx, options.cloudNS)
370+
if errors.Is(err, oplog.ErrNoCloningNamespace) {
371+
log.Info("cloning namespace doesn't exist so oplog will not be applied")
372+
return partial, nil
373+
} else if err != nil {
374+
return nil, errors.Wrap(err, "set cloning ns")
375+
}
360376

361377
var lts primitive.Timestamp
362378
for _, oplogRange := range ranges {

0 commit comments

Comments
 (0)