Skip to content

Commit a972e30

Browse files
authored
Merge pull request #1100 from percona/PBM-1335-fallback-db-path-for-restore
PBM-1335: Fallback dbpath for physical restore
2 parents b7a3ad4 + 623d2c4 commit a972e30

File tree

2 files changed

+358
-22
lines changed

2 files changed

+358
-22
lines changed

pbm/restore/physical.go

Lines changed: 217 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"fmt"
88
"io"
99
slog "log"
10+
"maps"
1011
"math/rand"
1112
"net"
1213
"os"
1314
"os/exec"
1415
"path"
1516
"path/filepath"
17+
"slices"
1618
"strconv"
1719
"strings"
1820
"sync"
@@ -46,13 +48,16 @@ import (
4648
const (
4749
defaultRSdbpath = "/data/db"
4850
defaultCSRSdbpath = "/data/configdb"
51+
fallbackDir = ".fallbacksync"
4952

5053
mongofslock = "mongod.lock"
5154

5255
defaultPort = 27017
5356

5457
tryConnCount = 5
5558
tryConnTimeout = 5 * time.Minute
59+
60+
internalMongodLog = "pbm.restore.log"
5661
)
5762

5863
type files struct {
@@ -229,18 +234,80 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
229234
if err != nil && !errors.Is(err, os.ErrNotExist) {
230235
r.log.Warning("remove file <%s>: %v", backup.FilelistName, err)
231236
}
232-
} else if cleanup { // clean-up dbpath on err if needed
237+
}
238+
239+
r.log.Debug("wait for cluster status")
240+
cStatus, err := r.waitClusterStatus()
241+
if err != nil {
242+
r.log.Warning("waiting for cluster status during cleanup: %v", err)
243+
}
244+
if cStatus == defs.StatusError {
245+
r.log.Warning("apply db data from %s", fallbackDir)
246+
err := r.migrateFromFallbackDirToDBDir()
247+
if err != nil {
248+
r.log.Error("migrate from fallback dir: %v", err)
249+
}
250+
} else if cleanup { // clean-up dbpath on err if needed (cluster is done or partlyDone)
233251
r.log.Debug("clean-up dbpath")
234-
err := removeAll(r.dbpath, r.log)
252+
err := removeAll(r.dbpath, nil, r.log)
235253
if err != nil {
236254
r.log.Error("flush dbpath %s: %v", r.dbpath, err)
237255
}
256+
} else { // free space by just deleting fallback dir in any other case
257+
r.log.Debug("remove fallback dir")
258+
err := r.removeFallback()
259+
if err != nil {
260+
r.log.Error("flush fallback: %v", err)
261+
}
238262
}
263+
239264
if r.stopHB != nil {
240265
close(r.stopHB)
241266
}
242267
}
243268

269+
// waitClusterStatus blocks until cluster status file is set on one of final statuses.
270+
// It also checks HB to see if cluster is stucked.
271+
func (r *PhysRestore) waitClusterStatus() (defs.Status, error) {
272+
errF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusError)
273+
doneF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusDone)
274+
partlyDoneF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusPartlyDone)
275+
hbF := fmt.Sprintf("%s.%s", r.syncPathCluster, syncHbSuffix)
276+
277+
tk := time.NewTicker(time.Second * 5)
278+
defer tk.Stop()
279+
280+
for range tk.C {
281+
_, err := r.stg.FileStat(errF)
282+
if err == nil {
283+
return defs.StatusError, nil
284+
} else if !errors.Is(err, storage.ErrNotExist) {
285+
r.log.Error("error while reading %s file", errF)
286+
}
287+
288+
_, err = r.stg.FileStat(doneF)
289+
if err == nil {
290+
return defs.StatusDone, nil
291+
} else if !errors.Is(err, storage.ErrNotExist) {
292+
r.log.Error("error while reading %s file", errF)
293+
}
294+
295+
_, err = r.stg.FileStat(partlyDoneF)
296+
if err == nil {
297+
return defs.StatusPartlyDone, nil
298+
} else if !errors.Is(err, storage.ErrNotExist) {
299+
r.log.Error("error while reading %s file", errF)
300+
}
301+
302+
err = r.checkHB(hbF)
303+
if err != nil {
304+
return defs.StatusError, errors.Wrap(err, "check hb for cluster")
305+
}
306+
307+
}
308+
return defs.StatusError, errors.New("wait cluster status")
309+
}
310+
244311
func (r *PhysRestore) flush(ctx context.Context) error {
245312
r.log.Debug("shutdown server")
246313
rsStat, err := topo.GetReplsetStatus(ctx, r.node)
@@ -289,15 +356,93 @@ func (r *PhysRestore) flush(ctx context.Context) error {
289356
}
290357
}
291358

292-
r.log.Debug("remove old data")
293-
err = removeAll(r.dbpath, r.log)
359+
err = r.migrateDBDirToFallbackDir()
360+
if err != nil {
361+
return errors.Wrapf(err, "move files to fallback path")
362+
}
363+
364+
return nil
365+
}
366+
367+
// migrateDBDirToFallbackDir moves content of dbPath dir into fallback dir.
368+
// It also removes old fallback dir, and creates new with the same perms.
369+
func (r *PhysRestore) migrateDBDirToFallbackDir() error {
370+
dbpath := filepath.Clean(r.dbpath)
371+
fallbackPath := filepath.Join(dbpath, fallbackDir)
372+
r.log.Debug("dbpath: %s, fallbackPath: %s", dbpath, fallbackPath)
373+
374+
err := os.RemoveAll(fallbackPath)
375+
if err != nil {
376+
return errors.Wrap(err, "remove fallback db path")
377+
}
378+
379+
r.log.Debug("create %s", fallbackPath)
380+
info, err := os.Stat(dbpath)
381+
if err != nil {
382+
return errors.Wrap(err, "stat")
383+
}
384+
err = os.MkdirAll(fallbackPath, info.Mode())
385+
if err != nil {
386+
return errors.Wrapf(err, "creating dir %s", fallbackPath)
387+
}
388+
389+
r.log.Info("move data files from %s to %s", r.dbpath, fallbackDir)
390+
err = r.moveToFallback()
294391
if err != nil {
295-
return errors.Wrapf(err, "flush dbpath %s", r.dbpath)
392+
return errors.Wrapf(err, "fail to move to %s", fallbackPath)
296393
}
297394

298395
return nil
299396
}
300397

398+
// migrateFromFallbackDirToDBDir wipe up dbpath dir and
399+
// moves all content from fallback path.
400+
func (r *PhysRestore) migrateFromFallbackDirToDBDir() error {
401+
r.log.Debug("clean-up dbpath")
402+
err := removeAll(r.dbpath, []string{fallbackDir}, r.log)
403+
if err != nil {
404+
r.log.Error("flush dbpath %s: %v", r.dbpath, err)
405+
return errors.Wrap(err, "remove all from dbpath")
406+
}
407+
408+
r.log.Info("move data files from %s to %s", fallbackDir, r.dbpath)
409+
err = r.moveFromFallback()
410+
if err != nil {
411+
r.log.Error("moving from %s: %v", fallbackDir, err)
412+
return errors.Wrapf(err, "move from %s", fallbackDir)
413+
}
414+
415+
return nil
416+
}
417+
418+
// moveFromFallback moves all files/dirs from fallback dir to dbpath dir.
419+
func (r *PhysRestore) moveFromFallback() error {
420+
return moveAll(
421+
path.Join(r.dbpath, fallbackDir),
422+
r.dbpath,
423+
nil,
424+
r.log,
425+
)
426+
}
427+
428+
// moveToFallback moves all files/dirs except fallback dir from dbpath to fallback dir
429+
func (r *PhysRestore) moveToFallback() error {
430+
return moveAll(
431+
r.dbpath,
432+
path.Join(r.dbpath, fallbackDir),
433+
[]string{fallbackDir},
434+
r.log,
435+
)
436+
}
437+
438+
// removeFallback removes fallback dir
439+
func (r *PhysRestore) removeFallback() error {
440+
dbpath := filepath.Clean(r.dbpath)
441+
fallbackPath := filepath.Join(dbpath, fallbackDir)
442+
err := os.RemoveAll(fallbackPath)
443+
return errors.Wrap(err, "remove fallback db path")
444+
}
445+
301446
func nodeShutdown(ctx context.Context, m *mongo.Client) error {
302447
err := m.Database("admin").RunCommand(ctx, bson.D{{"shutdown", 1}}).Err()
303448
if err == nil || strings.Contains(err.Error(), "socket was unexpectedly closed") {
@@ -441,7 +586,7 @@ func (r *PhysRestore) toState(status defs.Status) (_ defs.Status, err error) {
441586

442587
if r.nodeInfo.IsPrimary || status == defs.StatusDone {
443588
r.log.Info("waiting for `%s` status in rs %v", status, r.syncPathPeers)
444-
cstat, err := r.waitFiles(status, copyMap(r.syncPathPeers), false)
589+
cstat, err := r.waitFiles(status, maps.Clone(r.syncPathPeers), false)
445590
if err != nil {
446591
return defs.StatusError, errors.Wrap(err, "wait for nodes in rs")
447592
}
@@ -454,7 +599,7 @@ func (r *PhysRestore) toState(status defs.Status) (_ defs.Status, err error) {
454599

455600
if r.nodeInfo.IsClusterLeader() || status == defs.StatusDone {
456601
r.log.Info("waiting for shards %v", r.syncPathShards)
457-
cstat, err := r.waitFiles(status, copyMap(r.syncPathShards), true)
602+
cstat, err := r.waitFiles(status, maps.Clone(r.syncPathShards), true)
458603
if err != nil {
459604
return defs.StatusError, errors.Wrap(err, "wait for shards")
460605
}
@@ -525,13 +670,37 @@ func (n nodeError) Error() string {
525670
return fmt.Sprintf("%s failed: %s", n.node, n.msg)
526671
}
527672

528-
func copyMap[K comparable, V any](m map[K]V) map[K]V {
529-
cp := make(map[K]V)
530-
for k, v := range m {
531-
cp[k] = v
673+
// checkForRSLevelErr checks if all nodes have an error,
674+
// and in that case true is returned.
675+
// If any node doesn't have error, false is returned.
676+
func (r *PhysRestore) checkForRSLevelErr() bool {
677+
for f := range r.syncPathPeers {
678+
errFile := f + "." + string(defs.StatusError)
679+
_, err := r.stg.FileStat(errFile)
680+
if errors.Is(err, storage.ErrNotExist) {
681+
return false
682+
}
683+
if err != nil {
684+
r.log.Error("error while checking file %s: %v", errFile, err)
685+
}
686+
// error file is found
532687
}
688+
return true
689+
}
533690

534-
return cp
691+
// checkForClusterLevelErr checks if any RS (shard) has an error.
692+
// It returns true if at least one RS has error, otherwise false.
693+
func (r *PhysRestore) checkForClusterLevelErr() bool {
694+
for f := range r.syncPathShards {
695+
errFile := f + "." + string(defs.StatusError)
696+
_, err := r.stg.FileStat(errFile)
697+
if err == nil {
698+
return true
699+
} else if !errors.Is(err, storage.ErrNotExist) {
700+
r.log.Error("error while checking file %s: %v", errFile, err)
701+
}
702+
}
703+
return false
535704
}
536705

537706
func (r *PhysRestore) waitFiles(
@@ -756,7 +925,7 @@ func (r *PhysRestore) Snapshot(
756925
// set failed status of node on error, but
757926
// don't mark node as failed after the local restore succeed
758927
if err != nil && !progress.is(restoreDone) && !errors.Is(err, ErrNoDataForShard) {
759-
r.MarkFailed(meta, err, !progress.is(restoreStared))
928+
r.MarkFailed(meta, err)
760929
}
761930

762931
r.close(err == nil, progress.is(restoreStared) && !progress.is(restoreDone))
@@ -1650,7 +1819,7 @@ func (r *PhysRestore) agreeCommonRestoreTS() (primitive.Timestamp, error) {
16501819
}
16511820

16521821
if r.nodeInfo.IsClusterLeader() {
1653-
_, err := r.waitFiles(defs.StatusExtTS, copyMap(r.syncPathShards), true)
1822+
_, err := r.waitFiles(defs.StatusExtTS, maps.Clone(r.syncPathShards), true)
16541823
if err != nil {
16551824
return ts, errors.Wrap(err, "wait for shards timestamp")
16561825
}
@@ -1699,7 +1868,7 @@ func (r *PhysRestore) setcommittedTxn(_ context.Context, txn []phys.RestoreTxn)
16991868
}
17001869

17011870
func (r *PhysRestore) getcommittedTxn(context.Context) (map[string]primitive.Timestamp, error) {
1702-
shards := copyMap(r.syncPathShards)
1871+
shards := maps.Clone(r.syncPathShards)
17031872
txn := make(map[string]primitive.Timestamp)
17041873
for len(shards) > 0 {
17051874
for f := range shards {
@@ -1787,8 +1956,6 @@ func tryConn(port int, logpath string) (*mongo.Client, error) {
17871956
return nil, errors.Errorf("failed to connect after %d tries: %v", tryConnCount, err)
17881957
}
17891958

1790-
const internalMongodLog = "pbm.restore.log"
1791-
17921959
func (r *PhysRestore) startMongo(opts ...string) error {
17931960
if r.tmpConf != nil {
17941961
opts = append(opts, []string{"-f", r.tmpConf.Name()}...)
@@ -2309,7 +2476,7 @@ func (r *PhysRestore) checkMongod(needVersion string) (version string, err error
23092476
}
23102477

23112478
// MarkFailed sets the restore and rs state as failed with the given message
2312-
func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
2479+
func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error) {
23132480
var nerr nodeError
23142481
if errors.As(e, &nerr) {
23152482
e = nerr
@@ -2332,14 +2499,16 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
23322499
// At some point, every node will try to set an rs and cluster state
23332500
// (in `toState` method).
23342501
// Here we are not aware of partlyDone etc so leave it to the `toState`.
2335-
if r.nodeInfo.IsPrimary && markCluster {
2502+
if r.checkForRSLevelErr() {
2503+
r.log.Debug("set error on rs level")
23362504
serr := util.RetryableWrite(r.stg,
23372505
r.syncPathRS+"."+string(defs.StatusError), errStatus(e))
23382506
if serr != nil {
23392507
r.log.Error("MarkFailed: write replset error state `%v`: %v", e, serr)
23402508
}
23412509
}
2342-
if r.nodeInfo.IsClusterLeader() && markCluster {
2510+
if r.nodeInfo.IsLeader() && r.checkForClusterLevelErr() {
2511+
r.log.Debug("set error on cluster level")
23432512
serr := util.RetryableWrite(r.stg,
23442513
r.syncPathCluster+"."+string(defs.StatusError), errStatus(e))
23452514
if serr != nil {
@@ -2348,7 +2517,33 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
23482517
}
23492518
}
23502519

2351-
func removeAll(dir string, l log.LogEvent) error {
2520+
// moveAll moves fromDir content (files and dirs) to toDir content.
2521+
// It ignores all files/dirs specified within toIgnore slice.
2522+
func moveAll(fromDir, toDir string, toIgnore []string, l log.LogEvent) error {
2523+
d, err := os.Open(fromDir)
2524+
if err != nil {
2525+
return errors.Wrap(err, "open dir")
2526+
}
2527+
defer d.Close()
2528+
2529+
names, err := d.Readdirnames(-1)
2530+
if err != nil {
2531+
return errors.Wrap(err, "read file names")
2532+
}
2533+
for _, n := range names {
2534+
if slices.Contains(toIgnore, n) {
2535+
continue
2536+
}
2537+
err = os.Rename(filepath.Join(fromDir, n), filepath.Join(toDir, n))
2538+
if err != nil {
2539+
return errors.Wrapf(err, "move %s", n)
2540+
}
2541+
l.Debug("move to %s dir: %s", toDir, n)
2542+
}
2543+
return nil
2544+
}
2545+
2546+
func removeAll(dir string, toIgnore []string, l log.LogEvent) error {
23522547
d, err := os.Open(dir)
23532548
if err != nil {
23542549
return errors.Wrap(err, "open dir")
@@ -2360,7 +2555,7 @@ func removeAll(dir string, l log.LogEvent) error {
23602555
return errors.Wrap(err, "read file names")
23612556
}
23622557
for _, n := range names {
2363-
if n == internalMongodLog {
2558+
if n == internalMongodLog || slices.Contains(toIgnore, n) {
23642559
continue
23652560
}
23662561
err = os.RemoveAll(filepath.Join(dir, n))

0 commit comments

Comments
 (0)