Skip to content

Commit 10857d3

Browse files
committed
Refactor and fix phys restore close logic
1 parent 96eac46 commit 10857d3

File tree

1 file changed

+18
-15
lines changed

1 file changed

+18
-15
lines changed

pbm/restore/physical.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,9 @@ func peekTmpPort(current int) (int, error) {
210210
return -1, errors.Errorf("can't find unused port in range [%d, %d]", current, current+rng)
211211
}
212212

213-
// Close releases object resources.
214-
// Should be run to avoid leaks.
213+
// close cleans up all temp restore files.
214+
// Based on error status it does cleanup of the node's db path or it
215+
// applies fallback sync recovery strategy.
215216
func (r *PhysRestore) close(noerr, cleanup bool) {
216217
if r.tmpConf != nil {
217218
r.log.Debug("rm tmp conf")
@@ -221,7 +222,8 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
221222
}
222223
}
223224

224-
// if there is no error clean-up internal restore files, internal log file(s) should stay
225+
// if there is no error, clean-up internal restore files
226+
// internal log file(s) should stay in any case
225227
if noerr {
226228
extMeta := filepath.Join(r.dbpath,
227229
fmt.Sprintf(defs.ExternalRsMetaFile, util.MakeReverseRSMapFunc(r.rsMap)(r.nodeInfo.SetName)))
@@ -240,24 +242,24 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
240242
if err != nil {
241243
r.log.Warning("waiting for cluster status during cleanup: %v", err)
242244
}
243-
if cStatus == defs.StatusError {
245+
if cStatus == defs.StatusError && cleanup {
244246
r.log.Warning("apply db data from %s", fallbackDir)
245247
err := r.migrateFromFallbackDirToDBDir()
246248
if err != nil {
247249
r.log.Error("migrate from fallback dir: %v", err)
248250
}
249-
} else if cleanup { // clean-up dbpath on err if needed (cluster is done or partlyDone)
251+
} else if (cStatus == defs.StatusDone || cStatus == defs.StatusPartlyDone) && cleanup {
250252
r.log.Debug("clean-up dbpath")
251253
err := removeAll(r.dbpath, r.log, getInternalLogFileSkipRule())
252254
if err != nil {
253255
r.log.Error("flush dbpath %s: %v", r.dbpath, err)
254256
}
255-
} else { // free space by just deleting fallback dir in any other case
256-
r.log.Debug("remove fallback dir")
257-
err := r.removeFallback()
258-
if err != nil {
259-
r.log.Error("flush fallback: %v", err)
260-
}
257+
}
258+
259+
// free space by just deleting fallback dir in any other case
260+
err = r.removeFallback()
261+
if err != nil {
262+
r.log.Error("flush fallback: %v", err)
261263
}
262264

263265
if r.stopHB != nil {
@@ -440,6 +442,7 @@ func (r *PhysRestore) moveToFallback() error {
440442

441443
// removeFallback removes fallback dir
442444
func (r *PhysRestore) removeFallback() error {
445+
r.log.Debug("remove fallback dir")
443446
dbpath := filepath.Clean(r.dbpath)
444447
fallbackPath := filepath.Join(dbpath, fallbackDir)
445448
err := os.RemoveAll(fallbackPath)
@@ -853,7 +856,6 @@ const (
853856
restoreDone
854857
)
855858

856-
func (n nodeStatus) is(s nodeStatus) bool { return n&s != 0 }
857859
// isForCleanup returns true when internal node state if so
858860
// any sort of clean-up: full cleanup or fallbacksync.
859861
// Status indicates that content of db path contains state
@@ -972,11 +974,12 @@ func (r *PhysRestore) Snapshot(
972974
defer func() {
973975
// set failed status of node on error, but
974976
// don't mark node as failed after the local restore succeed
975-
if err != nil && !progress.is(restoreDone) && !errors.Is(err, ErrNoDataForShard) {
976-
r.MarkFailed(meta, err)
977+
restoreFailed := progress.isFailed()
978+
if err != nil && !errors.Is(err, ErrNoDataForShard) && restoreFailed {
979+
r.MarkFailed(err)
977980
}
978981

979-
r.close(err == nil, progress.is(restoreStared) && !progress.is(restoreDone))
982+
r.close(err == nil, progress.isForCleanup())
980983
}()
981984

982985
err = r.init(ctx, cmd.Name, opid, l)

0 commit comments

Comments
 (0)