@@ -210,8 +210,9 @@ func peekTmpPort(current int) (int, error) {
210
210
return - 1 , errors .Errorf ("can't find unused port in range [%d, %d]" , current , current + rng )
211
211
}
212
212
213
- // Close releases object resources.
214
- // Should be run to avoid leaks.
213
+ // close cleans up all temp restore files.
214
+ // Based on error status it does cleanup of the node's db path or it
215
+ // applies fallback sync recovery strategy.
215
216
func (r * PhysRestore ) close (noerr , cleanup bool ) {
216
217
if r .tmpConf != nil {
217
218
r .log .Debug ("rm tmp conf" )
@@ -221,7 +222,8 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
221
222
}
222
223
}
223
224
224
- // if there is no error clean-up internal restore files, internal log file(s) should stay
225
+ // if there is no error, clean-up internal restore files
226
+ // internal log file(s) should stay in any case
225
227
if noerr {
226
228
extMeta := filepath .Join (r .dbpath ,
227
229
fmt .Sprintf (defs .ExternalRsMetaFile , util .MakeReverseRSMapFunc (r .rsMap )(r .nodeInfo .SetName )))
@@ -240,24 +242,24 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
240
242
if err != nil {
241
243
r .log .Warning ("waiting for cluster status during cleanup: %v" , err )
242
244
}
243
- if cStatus == defs .StatusError {
245
+ if cStatus == defs .StatusError && cleanup {
244
246
r .log .Warning ("apply db data from %s" , fallbackDir )
245
247
err := r .migrateFromFallbackDirToDBDir ()
246
248
if err != nil {
247
249
r .log .Error ("migrate from fallback dir: %v" , err )
248
250
}
249
- } else if cleanup { // clean-up dbpath on err if needed (cluster is done or partlyDone)
251
+ } else if ( cStatus == defs . StatusDone || cStatus == defs . StatusPartlyDone ) && cleanup {
250
252
r .log .Debug ("clean-up dbpath" )
251
253
err := removeAll (r .dbpath , r .log , getInternalLogFileSkipRule ())
252
254
if err != nil {
253
255
r .log .Error ("flush dbpath %s: %v" , r .dbpath , err )
254
256
}
255
- } else { // free space by just deleting fallback dir in any other case
256
- r . log . Debug ( "remove fallback dir" )
257
- err := r . removeFallback ()
258
- if err != nil {
259
- r . log . Error ( "flush fallback: %v" , err )
260
- }
257
+ }
258
+
259
+ // free space by just deleting fallback dir in any other case
260
+ err = r . removeFallback ()
261
+ if err != nil {
262
+ r . log . Error ( "flush fallback: %v" , err )
261
263
}
262
264
263
265
if r .stopHB != nil {
@@ -440,6 +442,7 @@ func (r *PhysRestore) moveToFallback() error {
440
442
441
443
// removeFallback removes fallback dir
442
444
func (r * PhysRestore ) removeFallback () error {
445
+ r .log .Debug ("remove fallback dir" )
443
446
dbpath := filepath .Clean (r .dbpath )
444
447
fallbackPath := filepath .Join (dbpath , fallbackDir )
445
448
err := os .RemoveAll (fallbackPath )
@@ -853,7 +856,20 @@ const (
853
856
restoreDone
854
857
)
855
858
856
- func (n nodeStatus ) is (s nodeStatus ) bool { return n & s != 0 }
859
+ // isForCleanup returns true when internal node state if so
860
+ // any sort of clean-up: full cleanup or fallbacksync.
861
+ // Status indicates that content of db path contains state
862
+ // which is not correct, and therefore it needs to be
863
+ // discarded.
864
+ func (n nodeStatus ) isForCleanup () bool {
865
+ return n & restoreStared != 0 && n & restoreDone == 0
866
+ }
867
+
868
+ // isFailed returns true when internal node state didn't reach
869
+ // done state, no matter whether it was started or not.
870
+ func (n nodeStatus ) isFailed () bool {
871
+ return n & restoreDone == 0
872
+ }
857
873
858
874
// log buffer that will dump content to the storage on restore
859
875
// finish (whether it's successful or not). It also dumps content
@@ -958,11 +974,12 @@ func (r *PhysRestore) Snapshot(
958
974
defer func () {
959
975
// set failed status of node on error, but
960
976
// don't mark node as failed after the local restore succeed
961
- if err != nil && ! progress .is (restoreDone ) && ! errors .Is (err , ErrNoDataForShard ) {
962
- r .MarkFailed (meta , err )
977
+ restoreFailed := progress .isFailed ()
978
+ if err != nil && ! errors .Is (err , ErrNoDataForShard ) && restoreFailed {
979
+ r .MarkFailed (err )
963
980
}
964
981
965
- r .close (err == nil , progress .is ( restoreStared ) && ! progress . is ( restoreDone ))
982
+ r .close (err == nil , progress .isForCleanup ( ))
966
983
}()
967
984
968
985
err = r .init (ctx , cmd .Name , opid , l )
@@ -2525,20 +2542,7 @@ func (r *PhysRestore) checkMongod(needVersion string) (version string, err error
2525
2542
}
2526
2543
2527
2544
// MarkFailed sets the restore and rs state as failed with the given message
2528
- func (r * PhysRestore ) MarkFailed (meta * RestoreMeta , e error ) {
2529
- var nerr nodeError
2530
- if errors .As (e , & nerr ) {
2531
- e = nerr
2532
- meta .Replsets = []RestoreReplset {{
2533
- Name : nerr .node ,
2534
- Status : defs .StatusError ,
2535
- Error : nerr .msg ,
2536
- }}
2537
- } else if len (meta .Replsets ) > 0 {
2538
- meta .Replsets [0 ].Status = defs .StatusError
2539
- meta .Replsets [0 ].Error = e .Error ()
2540
- }
2541
-
2545
+ func (r * PhysRestore ) MarkFailed (e error ) {
2542
2546
err := util .RetryableWrite (r .stg ,
2543
2547
r .syncPathNode + "." + string (defs .StatusError ), errStatus (e ))
2544
2548
if err != nil {
0 commit comments