Skip to content

Commit 2a4dcd0

Browse files
committed
Extract mongo logic into mDB type for OplogRestore
Primary motivation is to be able to easely mock db access logic.
1 parent 15b51a8 commit 2a4dcd0

File tree

2 files changed

+101
-61
lines changed

2 files changed

+101
-61
lines changed

pbm/oplog/db.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package oplog
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"go.mongodb.org/mongo-driver/bson"
8+
"go.mongodb.org/mongo-driver/bson/primitive"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
11+
"github.com/percona/percona-backup-mongodb/pbm/errors"
12+
)
13+
14+
// mDB represents MongoDB access functionality.
15+
type mDB struct {
16+
m *mongo.Client
17+
}
18+
19+
func newMDB(m *mongo.Client) *mDB {
20+
return &mDB{m: m}
21+
}
22+
23+
// getUUIDForNS ruturns UUID of existing collection.
24+
// When ns doesn't exist, it returns zero value without an error.
25+
// In case of error, it returns zero value for UUID in addition to error.
26+
func (d *mDB) getUUIDForNS(ctx context.Context, ns string) (primitive.Binary, error) {
27+
var uuid primitive.Binary
28+
29+
db, coll, _ := strings.Cut(ns, ".")
30+
cur, err := d.m.Database(db).ListCollections(ctx, bson.D{{"name", coll}})
31+
if err != nil {
32+
return uuid, errors.Wrap(err, "list collections")
33+
}
34+
defer cur.Close(ctx)
35+
36+
for cur.Next(ctx) {
37+
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
38+
uuid = primitive.Binary{
39+
Subtype: subtype,
40+
Data: data,
41+
}
42+
break
43+
}
44+
}
45+
46+
return uuid, errors.Wrap(cur.Err(), "list collections cursor")
47+
}
48+
49+
// ensureCollExists ensures that the collection exists before "creating" views or timeseries.
50+
// See PBM-921 for details.
51+
func (d *mDB) ensureCollExists(dbName string) error {
52+
err := d.m.Database(dbName).CreateCollection(context.TODO(), "system.views")
53+
if err != nil {
54+
// MongoDB 5.0 and 6.0 returns NamespaceExists error.
55+
// MongoDB 7.0 and 8.0 does not return error.
56+
// https://github.com/mongodb/mongo/blob/v6.0/src/mongo/base/error_codes.yml#L84
57+
const NamespaceExists = 48
58+
var cmdError mongo.CommandError
59+
if !errors.As(err, &cmdError) || cmdError.Code != NamespaceExists {
60+
return errors.Wrapf(err, "ensure %s.system.views collection", dbName)
61+
}
62+
}
63+
64+
return nil
65+
}
66+
67+
// applyOps is a wrapper for the applyOps database command, we pass in
68+
// a session to avoid opening a new connection for a few inserts at a time.
69+
func (d *mDB) applyOps(entries []interface{}) error {
70+
singleRes := d.m.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}})
71+
if err := singleRes.Err(); err != nil {
72+
return errors.Wrap(err, "applyOps")
73+
}
74+
res := bson.M{}
75+
err := singleRes.Decode(&res)
76+
if err != nil {
77+
return errors.Wrap(err, "decode singleRes")
78+
}
79+
if isFalsy(res["ok"]) {
80+
return errors.Errorf("applyOps command: %v", res["errmsg"])
81+
}
82+
83+
return nil
84+
}

pbm/oplog/restore.go

Lines changed: 17 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,16 @@ func (c *cloneNS) SetNSPair(nsPair snapshot.CloneNS) {
118118
c.toDB, c.toColl = nsPair.SplitToNS()
119119
}
120120

121+
// mDBCl represents client interface for MongoDB logic used by OplogRestore
122+
type mDBCl interface {
123+
getUUIDForNS(ctx context.Context, ns string) (primitive.Binary, error)
124+
ensureCollExists(dbName string) error
125+
applyOps(entries []interface{}) error
126+
}
127+
121128
// OplogRestore is the oplog applyer
122129
type OplogRestore struct {
123-
dst *mongo.Client
130+
mdb mDBCl
124131
ver *db.Version
125132
needIdxWorkaround bool
126133
preserveUUIDopt bool
@@ -157,15 +164,15 @@ const saveLastDistTxns = 100
157164

158165
// NewOplogRestore creates an object for an oplog applying
159166
func NewOplogRestore(
160-
dst *mongo.Client,
167+
m *mongo.Client,
161168
ic *idx.IndexCatalog,
162169
sv *version.MongoVersion,
163170
unsafe,
164171
preserveUUID bool,
165172
ctxn chan phys.RestoreTxn,
166173
txnErr chan error,
167174
) (*OplogRestore, error) {
168-
m, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...))
175+
matcher, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...))
169176
if err != nil {
170177
return nil, errors.Wrap(err, "create matcher for the collections exclude")
171178
}
@@ -185,13 +192,13 @@ func NewOplogRestore(
185192
}
186193
ver := &db.Version{v[0], v[1], v[2]}
187194
return &OplogRestore{
188-
dst: dst,
195+
mdb: newMDB(m),
189196
ver: ver,
190197
preserveUUIDopt: preserveUUID,
191198
preserveUUID: preserveUUID,
192199
needIdxWorkaround: needsCreateIndexWorkaround(ver),
193200
indexCatalog: ic,
194-
excludeNS: m,
201+
excludeNS: matcher,
195202
noUUIDns: noUUID,
196203
txn: ctxn,
197204
txnSyncErr: txnErr,
@@ -298,7 +305,7 @@ func (o *OplogRestore) SetCloneNS(ctx context.Context, ns snapshot.CloneNS) erro
298305
o.cloneNS.SetNSPair(ns)
299306

300307
var err error
301-
o.cloneNS.toUUID, err = getUUIDForNS(ctx, o.dst, o.cloneNS.ToNS)
308+
o.cloneNS.toUUID, err = o.mdb.getUUIDForNS(ctx, o.cloneNS.ToNS)
302309
if err != nil {
303310
return errors.Wrap(err, "get to ns uuid")
304311
}
@@ -956,20 +963,14 @@ func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {
956963
}
957964
} else if op.Operation == "i" && collName == "system.views" {
958965
// PBM-921: ensure the collection exists before "creating" views or timeseries
959-
err := o.dst.Database(dbName).CreateCollection(context.TODO(), "system.views")
966+
err := o.mdb.ensureCollExists(dbName)
960967
if err != nil {
961-
// MongoDB 5.0 and 6.0 returns NamespaceExists error.
962-
// MongoDB 7.0 and 8.0 does not return error.
963-
// https://github.com/mongodb/mongo/blob/v6.0/src/mongo/base/error_codes.yml#L84
964-
const NamespaceExists = 48
965-
var cmdError mongo.CommandError
966-
if !errors.As(err, &cmdError) || cmdError.Code != NamespaceExists {
967-
return errors.Wrapf(err, "ensure %s.system.views collection", dbName)
968-
}
968+
return err
969969
}
970+
970971
}
971972

972-
err = o.applyOps([]interface{}{op})
973+
err = o.mdb.applyOps([]interface{}{op})
973974
if err != nil {
974975
// https://jira.percona.com/browse/PBM-818
975976
if o.unsafe && op.Namespace == "config.chunks" {
@@ -1071,25 +1072,6 @@ func extractIndexDocumentFromCommitIndexBuilds(op db.Oplog) (string, []*idx.Inde
10711072
return collectionName, nil
10721073
}
10731074

1074-
// applyOps is a wrapper for the applyOps database command, we pass in
1075-
// a session to avoid opening a new connection for a few inserts at a time.
1076-
func (o *OplogRestore) applyOps(entries []interface{}) error {
1077-
singleRes := o.dst.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}})
1078-
if err := singleRes.Err(); err != nil {
1079-
return errors.Wrap(err, "applyOps")
1080-
}
1081-
res := bson.M{}
1082-
err := singleRes.Decode(&res)
1083-
if err != nil {
1084-
return errors.Wrap(err, "decode singleRes")
1085-
}
1086-
if isFalsy(res["ok"]) {
1087-
return errors.Errorf("applyOps command: %v", res["errmsg"])
1088-
}
1089-
1090-
return nil
1091-
}
1092-
10931075
// filterUUIDs removes 'ui' entries from ops, including nested applyOps ops.
10941076
// It also modifies ops that rely on 'ui'.
10951077
func (o *OplogRestore) filterUUIDs(op db.Oplog) (db.Oplog, error) {
@@ -1261,29 +1243,3 @@ func isTruthy(val interface{}) bool {
12611243
func isFalsy(val interface{}) bool {
12621244
return !isTruthy(val)
12631245
}
1264-
1265-
// getUUIDForNS ruturns UUID of existing collection.
1266-
// When ns doesn't exist, it returns zero value without an error.
1267-
// In case of error, it returns zero value for UUID in addition to error.
1268-
func getUUIDForNS(ctx context.Context, m *mongo.Client, ns string) (primitive.Binary, error) {
1269-
var uuid primitive.Binary
1270-
1271-
d, c, _ := strings.Cut(ns, ".")
1272-
cur, err := m.Database(d).ListCollections(ctx, bson.D{{"name", c}})
1273-
if err != nil {
1274-
return uuid, errors.Wrap(err, "list collections")
1275-
}
1276-
defer cur.Close(ctx)
1277-
1278-
for cur.Next(ctx) {
1279-
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
1280-
uuid = primitive.Binary{
1281-
Subtype: subtype,
1282-
Data: data,
1283-
}
1284-
break
1285-
}
1286-
}
1287-
1288-
return uuid, errors.Wrap(cur.Err(), "list collections cursor")
1289-
}

0 commit comments

Comments
 (0)