Skip to content

Commit 1ab82ab

Browse files
committed
Add cleanup cluster phase during logical restore
1 parent fa14224 commit 1ab82ab

File tree

1 file changed

+92
-4
lines changed

1 file changed

+92
-4
lines changed

pbm/restore/logical.go

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.mongodb.org/mongo-driver/bson"
1818
"go.mongodb.org/mongo-driver/bson/primitive"
1919
"go.mongodb.org/mongo-driver/mongo"
20+
"go.mongodb.org/mongo-driver/mongo/writeconcern"
2021

2122
"github.com/percona/percona-backup-mongodb/pbm/archive"
2223
"github.com/percona/percona-backup-mongodb/pbm/backup"
@@ -57,11 +58,11 @@ type Restore struct {
5758
//
5859
// Only the restore leader would have this info.
5960
shards []topo.Shard
60-
// rsMap is mapping between old and new replset names. used for data restore.
61-
// empty if all replset names are the same
61+
// rsMap is mapping between old and new replset names, used for data restore.
62+
// It's empty if all replset names are the same.
6263
rsMap map[string]string
63-
// sMap is mapping between old and new shard names. used for router config update.
64-
// empty if all shard names are the same
64+
// sMap is mapping between old and new shard names, used for router config update.
65+
// It's empty if all shard names are the same.
6566
sMap map[string]string
6667

6768
log log.LogEvent
@@ -76,6 +77,13 @@ type oplogRange struct {
7677
storage storage.Storage
7778
}
7879

80+
// configDatabasesDoc represents document in config.databases collection
81+
type configDatabasesDoc struct {
82+
ID string `bson:"_id"`
83+
Primary string `bson:"primary"`
84+
Version bson.D `bson:"version"`
85+
}
86+
7987
// PBM restore from temp collections (pbmRUsers/pbmRRoles)should be used
8088
type restoreUsersAndRolesOption bool
8189

@@ -242,6 +250,14 @@ func (r *Restore) Snapshot(
242250

243251
err = r.toState(ctx, defs.StatusCleanupCluster, &defs.WaitActionStart)
244252

253+
if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() {
254+
err = r.dropShardedDBs(ctx, bcp)
255+
if err != nil {
256+
return err
257+
}
258+
259+
}
260+
245261
err = r.toState(ctx, defs.StatusRunning, nil)
246262
if err != nil {
247263
return err
@@ -830,6 +846,78 @@ func (r *Restore) toState(ctx context.Context, status defs.Status, wait *time.Du
830846
return toState(ctx, r.leadConn, status, r.name, r.nodeInfo, r.reconcileStatus, wait)
831847
}
832848

849+
// dropShardedDBs drop all sharded databases present in the backup.
850+
// Backup is specified with bcp parameter.
851+
// For each sharded database present in the backup _shardsvrDropDatabase command
852+
// is used to drop the database from the config srv and all shards.
853+
func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) error {
854+
dbsInBcp, err := r.getDBsFromBackup(bcp)
855+
if err != nil {
856+
return errors.Wrap(err, "get dbs from backup")
857+
}
858+
859+
// make cluster-wide drop for each db from the backup
860+
for _, db := range dbsInBcp {
861+
var configDBDoc configDatabasesDoc
862+
err := r.leadConn.ConfigDatabase().
863+
Collection("databases").
864+
FindOne(ctx, bson.D{{"_id", db}}).
865+
Decode(&configDBDoc)
866+
if err != nil {
867+
if errors.Is(err, mongo.ErrNoDocuments) {
868+
continue
869+
}
870+
return errors.Wrapf(err, "get config.databases for %q", db)
871+
}
872+
873+
if configDBDoc.Primary != r.nodeInfo.SetName {
874+
// this shard is not primary shard for this db, so ignore it
875+
continue
876+
}
877+
878+
cmd := bson.D{
879+
{"_shardsvrDropDatabase", 1},
880+
{"databaseVersion", configDBDoc.Version},
881+
{"writeConcern", writeconcern.Majority()},
882+
}
883+
res := r.nodeConn.Database(db).RunCommand(ctx, cmd)
884+
if err := res.Err(); err != nil {
885+
return errors.Wrapf(err, "_shardsvrDropDatabase for %q", db)
886+
}
887+
r.log.Debug("drop %q", db)
888+
}
889+
890+
return nil
891+
}
892+
893+
// getDBsFromBackup returns all databases present in backup metadata file
894+
// for each replicaset.
895+
func (r *Restore) getDBsFromBackup(bcp *backup.BackupMeta) ([]string, error) {
896+
rsName := util.MakeRSMapFunc(r.rsMap)(r.brief.SetName)
897+
filepath := path.Join(bcp.Name, rsName, archive.MetaFile)
898+
rdr, err := r.bcpStg.SourceReader(filepath)
899+
if err != nil {
900+
return nil, errors.Wrap(err, "read metadata file")
901+
}
902+
defer rdr.Close()
903+
904+
meta, err := archive.ReadMetadata(rdr)
905+
if err != nil {
906+
return nil, errors.Wrap(err, "get metadata")
907+
}
908+
909+
uniqueDbs := map[string]bool{}
910+
for _, ns := range meta.Namespaces {
911+
uniqueDbs[ns.Database] = true
912+
}
913+
dbs := []string{}
914+
for db := range uniqueDbs {
915+
dbs = append(dbs, db)
916+
}
917+
918+
return dbs, nil
919+
}
920+
833921
func (r *Restore) RunSnapshot(
834922
ctx context.Context,
835923
dump string,

0 commit comments

Comments
 (0)