Skip to content

Commit e2dfda2

Browse files
committed
Support cloning create/drop commands from oplog
1 parent e7ce3dd commit e2dfda2

File tree

1 file changed

+37
-2
lines changed

1 file changed

+37
-2
lines changed

pbm/oplog/restore.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ var selectedNSSupportedCommands = map[string]struct{}{
8282
}
8383

8484
var cloningNSSupportedCommands = map[string]struct{}{
85+
"create": {},
86+
"drop": {},
8587
"createIndexes": {},
8688
"deleteIndex": {},
8789
"deleteIndexes": {},
@@ -103,11 +105,17 @@ var ErrNoCloningNamespace = errors.New("cloning namespace desn't exist")
103105
// cloneNS has all data related to cloning namespace within oplog
104106
type cloneNS struct {
105107
snapshot.CloneNS
106-
toUUID primitive.Binary
108+
fromDB string
109+
fromColl string
110+
toDB string
111+
toColl string
112+
toUUID primitive.Binary
107113
}
108114

109115
func (c *cloneNS) SetNSPair(nsPair snapshot.CloneNS) {
110116
c.CloneNS = nsPair
117+
c.fromDB, c.fromColl, _ = strings.Cut(nsPair.FromNS, ".")
118+
c.toDB, c.toColl, _ = strings.Cut(nsPair.ToNS, ".")
111119
}
112120

113121
// OplogRestore is the oplog applyer
@@ -769,10 +777,37 @@ func (o *OplogRestore) cloneEntry(op *db.Oplog) {
769777
if !o.cloneNS.IsSpecified() {
770778
return
771779
}
780+
781+
// op: i, u, d
772782
if op.Namespace == o.cloneNS.FromNS {
773-
*op.UI = o.cloneNS.toUUID
783+
op.UI = nil
774784
op.Namespace = o.cloneNS.ToNS
785+
return
786+
}
787+
788+
if op.Operation != "c" || len(op.Object) == 0 {
789+
return
790+
}
791+
792+
dbName, _, _ := strings.Cut(op.Namespace, ".")
793+
if dbName != o.cloneNS.fromDB {
794+
return
795+
}
796+
797+
cmdName := op.Object[0].Key
798+
if cmdName != "create" && cmdName != "drop" {
799+
return
775800
}
801+
802+
collName, _ := op.Object[0].Value.(string)
803+
if collName != o.cloneNS.fromColl {
804+
return
805+
}
806+
807+
// op: create/drop
808+
op.Namespace = fmt.Sprintf("%s.$cmd", o.cloneNS.toDB)
809+
op.Object[0].Value = o.cloneNS.toColl
810+
op.UI = nil
776811
}
777812

778813
func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {

0 commit comments

Comments
 (0)