Skip to content

Commit 44b7141

Browse files
authored
Merge pull request #1054 from percona/PBM-1444-cloning-ddl-ops
PBM-1444: Cloning for create/drop ops
2 parents 33a7d7a + 665e9bf commit 44b7141

File tree

1 file changed

+39
-5
lines changed

1 file changed

+39
-5
lines changed

pbm/oplog/restore.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ var selectedNSSupportedCommands = map[string]struct{}{
8282
}
8383

8484
var cloningNSSupportedCommands = map[string]struct{}{
85+
"create": {},
86+
"drop": {},
8587
"createIndexes": {},
8688
"deleteIndex": {},
8789
"deleteIndexes": {},
@@ -103,11 +105,17 @@ var ErrNoCloningNamespace = errors.New("cloning namespace desn't exist")
103105
// cloneNS has all data related to cloning namespace within oplog
104106
type cloneNS struct {
105107
snapshot.CloneNS
106-
toUUID primitive.Binary
108+
fromDB string
109+
fromColl string
110+
toDB string
111+
toColl string
112+
toUUID primitive.Binary
107113
}
108114

109115
func (c *cloneNS) SetNSPair(nsPair snapshot.CloneNS) {
110116
c.CloneNS = nsPair
117+
c.fromDB, c.fromColl, _ = strings.Cut(nsPair.FromNS, ".")
118+
c.toDB, c.toColl, _ = strings.Cut(nsPair.ToNS, ".")
111119
}
112120

113121
// OplogRestore is the oplog applyer
@@ -381,16 +389,15 @@ func (o *OplogRestore) isOpForCloning(oe *db.Oplog) bool {
381389
return true // internal ops of applyOps are checked one by one later
382390
}
383391

384-
cloneFromDB, cloneFromColl, _ := strings.Cut(o.cloneNS.FromNS, ".")
385-
if db != cloneFromDB {
392+
if db != o.cloneNS.fromDB {
386393
// it's command not relevant for db to clone from
387394
return false
388395
}
389396

390397
if _, ok := cloningNSSupportedCommands[cmd]; ok {
391398
// check if command targets collection
392399
collForCmd, _ := oe.Object[0].Value.(string)
393-
if collForCmd == cloneFromColl {
400+
if collForCmd == o.cloneNS.fromColl {
394401
return true
395402
}
396403
}
@@ -769,10 +776,37 @@ func (o *OplogRestore) cloneEntry(op *db.Oplog) {
769776
if !o.cloneNS.IsSpecified() {
770777
return
771778
}
779+
780+
// op: i, u, d
772781
if op.Namespace == o.cloneNS.FromNS {
773-
*op.UI = o.cloneNS.toUUID
782+
op.UI = nil
774783
op.Namespace = o.cloneNS.ToNS
784+
return
785+
}
786+
787+
if op.Operation != "c" || len(op.Object) == 0 {
788+
return
789+
}
790+
791+
dbName, _, _ := strings.Cut(op.Namespace, ".")
792+
if dbName != o.cloneNS.fromDB {
793+
return
794+
}
795+
796+
cmdName := op.Object[0].Key
797+
if cmdName != "create" && cmdName != "drop" {
798+
return
775799
}
800+
801+
collName, _ := op.Object[0].Value.(string)
802+
if collName != o.cloneNS.fromColl {
803+
return
804+
}
805+
806+
// op: create/drop
807+
op.Namespace = fmt.Sprintf("%s.$cmd", o.cloneNS.toDB)
808+
op.Object[0].Value = o.cloneNS.toColl
809+
op.UI = nil
776810
}
777811

778812
func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {

0 commit comments

Comments
 (0)