Skip to content

Commit 5e015e5

Browse files
authored
Merge branch 'dev' into PBM-1514-Improve-resync-performance
2 parents a0861a8 + 9408277 commit 5e015e5

File tree

6 files changed

+281
-46
lines changed

6 files changed

+281
-46
lines changed

e2e-tests/cmd/pbm-test/run.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ func run(t *sharded.Cluster, typ testTyp) {
109109
runTest("Distributed Transactions PITR",
110110
t.DistributedTrxPITR)
111111

112+
runTest("Cleaning up sharded database for full restore",
113+
t.CleanupFullRestore)
114+
112115
// disttxnconf := "/etc/pbm/fs-disttxn-4x.yaml"
113116
// tsTo := primitive.Timestamp{1644410656, 8}
114117

e2e-tests/pkg/tests/sharded/cmd.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package sharded
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
"go.mongodb.org/mongo-driver/bson"
8+
"go.mongodb.org/mongo-driver/mongo"
9+
)
10+
11+
func (c *Cluster) stopBalancer(ctx context.Context, conn *mongo.Client) {
12+
err := conn.Database("admin").RunCommand(
13+
ctx,
14+
bson.D{{"balancerStop", 1}},
15+
).Err()
16+
if err != nil {
17+
log.Fatalf("ERROR: stopping balancer: %v", err)
18+
}
19+
}
20+
21+
func (c *Cluster) moveChunk(ctx context.Context, db, col string, idx int, to string) {
22+
log.Println("move chunk", idx, "to", to)
23+
err := c.mongos.Conn().Database("admin").RunCommand(
24+
ctx,
25+
bson.D{
26+
{"moveChunk", db + "." + col},
27+
{"find", bson.M{"idx": idx}},
28+
{"to", to},
29+
},
30+
).Err()
31+
if err != nil {
32+
log.Printf("ERROR: moveChunk %s.%s/idx:2000: %v", db, col, err)
33+
}
34+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package sharded
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
)
11+
12+
func (c *Cluster) CleanupFullRestore() {
13+
ctx := context.Background()
14+
db, col := "cleanupdb", "cleanupcol"
15+
var numOfDocs int64 = 2000
16+
17+
conn := c.mongos.Conn()
18+
c.stopBalancer(ctx, conn)
19+
c.setupShardedCollection(ctx, conn, db, col)
20+
21+
err := c.mongos.GenData(db, col, 0, int64(numOfDocs))
22+
if err != nil {
23+
log.Fatalf("error: generate data: %v", err)
24+
}
25+
26+
bcpName := c.LogicalBackup()
27+
c.BackupWaitDone(context.Background(), bcpName)
28+
29+
c.splitChunkAt(ctx, conn, db, col, 1000)
30+
c.moveChunk(ctx, db, col, 500, "rsx")
31+
32+
c.LogicalRestore(context.Background(), bcpName)
33+
c.flushRouterConfig(ctx)
34+
35+
cnt, err := conn.Database(db).
36+
Collection(col).
37+
CountDocuments(ctx, bson.D{})
38+
if err != nil {
39+
log.Fatalf("ERROR: querying count: %v", err)
40+
} else if cnt != numOfDocs {
41+
log.Fatalf("ERROR: wrong number of docs: want=%d, got=%d", numOfDocs, cnt)
42+
}
43+
}
44+
45+
func (c *Cluster) setupShardedCollection(
46+
ctx context.Context,
47+
conn *mongo.Client,
48+
db, col string,
49+
) {
50+
err := conn.Database(db).
51+
Collection(col).
52+
Drop(ctx)
53+
if err != nil {
54+
log.Fatalf("ERROR: droping %s.%s collection: %v", db, col, err)
55+
}
56+
57+
err = conn.Database("admin").RunCommand(
58+
ctx,
59+
bson.D{
60+
{"enableSharding", db},
61+
{"primaryShard", "rs1"},
62+
},
63+
).Err()
64+
if err != nil {
65+
log.Fatalf("ERROR: enableSharding on for %s primary shard rs: %v", db, err)
66+
}
67+
68+
ns := fmt.Sprintf("%s.%s", db, col)
69+
err = conn.Database(db).RunCommand(
70+
ctx,
71+
bson.D{{"create", col}},
72+
).Err()
73+
if err != nil {
74+
log.Fatalf("ERROR: create %s collection: %v", ns, err)
75+
}
76+
77+
err = conn.Database("admin").RunCommand(
78+
ctx,
79+
bson.D{
80+
{"shardCollection", ns},
81+
{"key", bson.M{"idx": 1}},
82+
},
83+
).Err()
84+
if err != nil {
85+
log.Fatalf("ERROR: shard %s collection: %v", ns, err)
86+
}
87+
}
88+
89+
func (c *Cluster) splitChunkAt(
90+
ctx context.Context,
91+
conn *mongo.Client,
92+
db, col string,
93+
id int,
94+
) {
95+
ns := fmt.Sprintf("%s.%s", db, col)
96+
err := conn.Database("admin").RunCommand(
97+
ctx,
98+
bson.D{
99+
{"split", ns},
100+
{"find", bson.M{"idx": id}},
101+
},
102+
).Err()
103+
if err != nil {
104+
log.Fatalf("ERROR: split %s collection: %v", ns, err)
105+
}
106+
}

e2e-tests/pkg/tests/sharded/trx.go

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,22 @@ func (c *Cluster) DistributedTransactions(bcp Backuper, col string) {
4747

4848
c.setupTrxCollection(ctx, col)
4949

50-
c.moveChunk(ctx, col, 0, "rs1")
51-
c.moveChunk(ctx, col, 30, "rs1")
52-
c.moveChunk(ctx, col, 89, "rs1")
53-
c.moveChunk(ctx, col, 99, "rs1")
54-
c.moveChunk(ctx, col, 110, "rs1")
55-
c.moveChunk(ctx, col, 130, "rs1")
56-
c.moveChunk(ctx, col, 131, "rs1")
57-
c.moveChunk(ctx, col, 630, "rsx")
58-
c.moveChunk(ctx, col, 530, "rsx")
59-
c.moveChunk(ctx, col, 631, "rsx")
60-
c.moveChunk(ctx, col, 730, "rsx")
61-
c.moveChunk(ctx, col, 3000, "rsx")
62-
c.moveChunk(ctx, col, 3001, "rsx")
63-
c.moveChunk(ctx, col, 180, "rsx")
64-
c.moveChunk(ctx, col, 199, "rsx")
65-
c.moveChunk(ctx, col, 2001, "rsx")
50+
c.moveChunk(ctx, trxdb, col, 0, "rs1")
51+
c.moveChunk(ctx, trxdb, col, 30, "rs1")
52+
c.moveChunk(ctx, trxdb, col, 89, "rs1")
53+
c.moveChunk(ctx, trxdb, col, 99, "rs1")
54+
c.moveChunk(ctx, trxdb, col, 110, "rs1")
55+
c.moveChunk(ctx, trxdb, col, 130, "rs1")
56+
c.moveChunk(ctx, trxdb, col, 131, "rs1")
57+
c.moveChunk(ctx, trxdb, col, 630, "rsx")
58+
c.moveChunk(ctx, trxdb, col, 530, "rsx")
59+
c.moveChunk(ctx, trxdb, col, 631, "rsx")
60+
c.moveChunk(ctx, trxdb, col, 730, "rsx")
61+
c.moveChunk(ctx, trxdb, col, 3000, "rsx")
62+
c.moveChunk(ctx, trxdb, col, 3001, "rsx")
63+
c.moveChunk(ctx, trxdb, col, 180, "rsx")
64+
c.moveChunk(ctx, trxdb, col, 199, "rsx")
65+
c.moveChunk(ctx, trxdb, col, 2001, "rsx")
6666

6767
_, err = conn.Database(trxdb).Collection(col).DeleteMany(ctx, bson.M{})
6868
if err != nil {
@@ -322,21 +322,6 @@ func (c *Cluster) setupTrxCollection(ctx context.Context, col string) {
322322
}
323323
}
324324

325-
func (c *Cluster) moveChunk(ctx context.Context, col string, idx int, to string) {
326-
log.Println("move chunk", idx, "to", to)
327-
err := c.mongos.Conn().Database("admin").RunCommand(
328-
ctx,
329-
bson.D{
330-
{"moveChunk", trxdb + "." + col},
331-
{"find", bson.M{"idx": idx}},
332-
{"to", to},
333-
},
334-
).Err()
335-
if err != nil {
336-
log.Printf("ERROR: moveChunk %s.%s/idx:2000: %v", trxdb, col, err)
337-
}
338-
}
339-
340325
func (c *Cluster) checkTrxCollection(ctx context.Context, col string, bcp Backuper) {
341326
log.Println("Checking restored data")
342327

pbm/defs/defs.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,16 @@ const (
9797
// for phys restore, to indicate shards have been stopped
9898
StatusDown Status = "down"
9999

100-
StatusStarting Status = "starting"
101-
StatusRunning Status = "running"
102-
StatusDumpDone Status = "dumpDone"
103-
StatusCopyReady Status = "copyReady"
104-
StatusCopyDone Status = "copyDone"
105-
StatusPartlyDone Status = "partlyDone"
106-
StatusDone Status = "done"
107-
StatusCancelled Status = "canceled"
108-
StatusError Status = "error"
100+
StatusStarting Status = "starting"
101+
StatusCleanupCluster Status = "cleanupCluster"
102+
StatusRunning Status = "running"
103+
StatusDumpDone Status = "dumpDone"
104+
StatusCopyReady Status = "copyReady"
105+
StatusCopyDone Status = "copyDone"
106+
StatusPartlyDone Status = "partlyDone"
107+
StatusDone Status = "done"
108+
StatusCancelled Status = "canceled"
109+
StatusError Status = "error"
109110

110111
// status to communicate last op timestamp if it's not set
111112
// during external restore

pbm/restore/logical.go

Lines changed: 112 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.mongodb.org/mongo-driver/bson"
1818
"go.mongodb.org/mongo-driver/bson/primitive"
1919
"go.mongodb.org/mongo-driver/mongo"
20+
"go.mongodb.org/mongo-driver/mongo/writeconcern"
2021

2122
"github.com/percona/percona-backup-mongodb/pbm/archive"
2223
"github.com/percona/percona-backup-mongodb/pbm/backup"
@@ -57,11 +58,11 @@ type Restore struct {
5758
//
5859
// Only the restore leader would have this info.
5960
shards []topo.Shard
60-
// rsMap is mapping between old and new replset names. used for data restore.
61-
// empty if all replset names are the same
61+
// rsMap is mapping between old and new replset names, used for data restore.
62+
// It's empty if all replset names are the same.
6263
rsMap map[string]string
63-
// sMap is mapping between old and new shard names. used for router config update.
64-
// empty if all shard names are the same
64+
// sMap is mapping between old and new shard names, used for router config update.
65+
// It's empty if all shard names are the same.
6566
sMap map[string]string
6667

6768
log log.LogEvent
@@ -76,6 +77,13 @@ type oplogRange struct {
7677
storage storage.Storage
7778
}
7879

80+
// configDatabasesDoc represents document in config.databases collection
81+
type configDatabasesDoc struct {
82+
ID string `bson:"_id"`
83+
Primary string `bson:"primary"`
84+
Version bson.D `bson:"version"`
85+
}
86+
7987
// PBM restore from temp collections (pbmRUsers/pbmRRoles)should be used
8088
type restoreUsersAndRolesOption bool
8189

@@ -240,7 +248,20 @@ func (r *Restore) Snapshot(
240248
return err
241249
}
242250

243-
err = r.toState(ctx, defs.StatusRunning, &defs.WaitActionStart)
251+
err = r.toState(ctx, defs.StatusCleanupCluster, &defs.WaitActionStart)
252+
if err != nil {
253+
return err
254+
}
255+
256+
// drop sharded dbs on sharded cluster, on each shard (not CSRS), only for full restore
257+
if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() && !util.IsSelective(nss) {
258+
err = r.dropShardedDBs(ctx, bcp)
259+
if err != nil {
260+
return err
261+
}
262+
}
263+
264+
err = r.toState(ctx, defs.StatusRunning, nil)
244265
if err != nil {
245266
return err
246267
}
@@ -413,7 +434,20 @@ func (r *Restore) PITR(
413434
return err
414435
}
415436

416-
err = r.toState(ctx, defs.StatusRunning, &defs.WaitActionStart)
437+
err = r.toState(ctx, defs.StatusCleanupCluster, &defs.WaitActionStart)
438+
if err != nil {
439+
return err
440+
}
441+
442+
// drop sharded dbs on sharded cluster, on each shard (not CSRS), only for full restore
443+
if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() && !util.IsSelective(nss) {
444+
err = r.dropShardedDBs(ctx, bcp)
445+
if err != nil {
446+
return err
447+
}
448+
}
449+
450+
err = r.toState(ctx, defs.StatusRunning, nil)
417451
if err != nil {
418452
return err
419453
}
@@ -828,6 +862,78 @@ func (r *Restore) toState(ctx context.Context, status defs.Status, wait *time.Du
828862
return toState(ctx, r.leadConn, status, r.name, r.nodeInfo, r.reconcileStatus, wait)
829863
}
830864

865+
// dropShardedDBs drop all sharded databases present in the backup.
866+
// Backup is specified with bcp parameter.
867+
// For each sharded database present in the backup _shardsvrDropDatabase command
868+
// is used to drop the database from the config srv and all shards.
869+
func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) error {
870+
dbsInBcp, err := r.getDBsFromBackup(bcp)
871+
if err != nil {
872+
return errors.Wrap(err, "get dbs from backup")
873+
}
874+
875+
// make cluster-wide drop for each db from the backup
876+
for _, db := range dbsInBcp {
877+
var configDBDoc configDatabasesDoc
878+
err := r.leadConn.ConfigDatabase().
879+
Collection("databases").
880+
FindOne(ctx, bson.D{{"_id", db}}).
881+
Decode(&configDBDoc)
882+
if err != nil {
883+
if errors.Is(err, mongo.ErrNoDocuments) {
884+
continue
885+
}
886+
return errors.Wrapf(err, "get config.databases for %q", db)
887+
}
888+
889+
if configDBDoc.Primary != r.nodeInfo.SetName {
890+
// this shard is not primary shard for this db, so ignore it
891+
continue
892+
}
893+
894+
cmd := bson.D{
895+
{"_shardsvrDropDatabase", 1},
896+
{"databaseVersion", configDBDoc.Version},
897+
{"writeConcern", writeconcern.Majority()},
898+
}
899+
res := r.nodeConn.Database(db).RunCommand(ctx, cmd)
900+
if err := res.Err(); err != nil {
901+
return errors.Wrapf(err, "_shardsvrDropDatabase for %q", db)
902+
}
903+
r.log.Debug("drop %q", db)
904+
}
905+
906+
return nil
907+
}
908+
909+
// getDBsFromBackup returns all databases present in backup metadata file
910+
// for each replicaset.
911+
func (r *Restore) getDBsFromBackup(bcp *backup.BackupMeta) ([]string, error) {
912+
rsName := util.MakeReverseRSMapFunc(r.rsMap)(r.brief.SetName)
913+
filepath := path.Join(bcp.Name, rsName, archive.MetaFile)
914+
rdr, err := r.bcpStg.SourceReader(filepath)
915+
if err != nil {
916+
return nil, errors.Wrap(err, "read metadata file")
917+
}
918+
defer rdr.Close()
919+
920+
meta, err := archive.ReadMetadata(rdr)
921+
if err != nil {
922+
return nil, errors.Wrap(err, "get metadata")
923+
}
924+
925+
uniqueDbs := map[string]bool{}
926+
for _, ns := range meta.Namespaces {
927+
uniqueDbs[ns.Database] = true
928+
}
929+
dbs := []string{}
930+
for db := range uniqueDbs {
931+
dbs = append(dbs, db)
932+
}
933+
934+
return dbs, nil
935+
}
936+
831937
func (r *Restore) RunSnapshot(
832938
ctx context.Context,
833939
dump string,

0 commit comments

Comments
 (0)