@@ -221,7 +221,9 @@ func peekTmpPort(current int) (int, error) {
221
221
// close cleans up all temp restore files.
222
222
// Based on cluster status it does cleanup of the node's db path or it
223
223
// applies fallback sync recovery strategy.
224
- func (r * PhysRestore ) close (noerr bool , progress nodeStatus ) {
224
+ //
225
+ //nolint:nonamedreturns
226
+ func (r * PhysRestore ) close (noerr bool , progress nodeStatus ) (err error ) {
225
227
if r .tmpConf != nil {
226
228
r .log .Debug ("rm tmp conf" )
227
229
err := os .Remove (r .tmpConf .Name ())
@@ -251,52 +253,72 @@ func (r *PhysRestore) close(noerr bool, progress nodeStatus) {
251
253
r .log .Warning ("waiting for cluster status during cleanup: %v" , err )
252
254
}
253
255
254
- // resolve and exec cleanup
255
- cleanup := r .resolveCleanupStrategy (cStatus , progress )
256
- cleanup ()
256
+ defer func () {
257
+ if r .fallback {
258
+ // free space by just deleting fallback dir
259
+ err := r .removeFallback ()
260
+ if err != nil {
261
+ r .log .Error ("flush fallback: %v" , err )
262
+ }
263
+ }
257
264
258
- if r .fallback {
259
- // free space by just deleting fallback dir in any other case
260
- err = r .removeFallback ()
261
- if err != nil {
262
- r .log .Error ("flush fallback: %v" , err )
265
+ if r .stopHB != nil {
266
+ close (r .stopHB )
263
267
}
264
- }
268
+ }()
265
269
266
- if r .stopHB != nil {
267
- close (r .stopHB )
270
+ // resolve and exec cleanup
271
+ cleanup := r .resolveCleanupStrategy (cStatus , progress )
272
+ err = cleanup ()
273
+ if err != nil {
274
+ return errors .Wrap (err , "exec cleanup strategy" )
268
275
}
276
+
277
+ return nil
269
278
}
270
279
271
280
// doFullCleanup is node's cleanup strategy for deleting the whole dbpath.
272
- func (r * PhysRestore ) doFullCleanup () {
281
+ func (r * PhysRestore ) doFullCleanup () error {
273
282
r .log .Warning ("apply full cleanup strategy" )
274
283
err := removeAll (r .dbpath , r .log , getInternalLogFileSkipRule ())
275
284
if err != nil {
276
- r . log . Error ( "flush dbpath %s: %v " , r .dbpath , err )
285
+ return errors . Wrapf ( err , "flush dbpath %s" , r .dbpath )
277
286
}
287
+ return nil
278
288
}
279
289
280
290
// doFallbackCleanup is node's cleanup strategy for recovering dbpath
281
291
// from fallbacksync dir.
282
- func (r * PhysRestore ) doFallbackCleanup () {
292
+ // Mark cluster with fallback error.
293
+ func (r * PhysRestore ) doFallbackCleanup () error {
283
294
r .log .Warning ("apply fallback cleanup strategy: using db data from %s" , fallbackDir )
284
295
err := r .migrateFromFallbackDirToDBDir ()
285
296
if err != nil {
286
- r . log . Error ( "migrate from fallback dir: %v" , err )
297
+ return errors . Wrap ( err , "migrate from fallback dir" )
287
298
}
299
+
300
+ if r .nodeInfo .IsClusterLeader () {
301
+ err = r .MarkAsFallback ()
302
+ if err != nil {
303
+ return errors .Wrap (err , "mark meta as fallback" )
304
+ }
305
+ }
306
+
307
+ // cleanup went file, bug follback represents error in case of restore
308
+ return errors .New ("fallback applyed" )
288
309
}
289
310
290
- func (r * PhysRestore ) skipCleanup () {
311
+ func (r * PhysRestore ) skipCleanup () error {
291
312
r .log .Debug ("no cleanup strategy to apply" )
313
+ return nil
292
314
}
293
315
294
316
// resolveCleanupStrategy returns cleanup strategy based on config parameters, cluster status
295
317
// and node progress.
296
318
func (r * PhysRestore ) resolveCleanupStrategy (
297
319
clusterStatus defs.Status ,
298
320
progress nodeStatus ,
299
- ) func () {
321
+ ) func () error {
300
322
if progress .isDBPathUntouched () {
301
323
return r .skipCleanup
302
324
}
@@ -1057,7 +1079,11 @@ func (r *PhysRestore) Snapshot(
1057
1079
r .MarkFailed (err )
1058
1080
}
1059
1081
1060
- r .close (err == nil , progress )
1082
+ noerr := err == nil
1083
+ err = r .close (noerr , progress )
1084
+ if err != nil {
1085
+ err = errors .Wrap (err , "snapshot close" )
1086
+ }
1061
1087
}()
1062
1088
1063
1089
err = r .init (ctx , cmd .Name , opid , l )
@@ -2715,6 +2741,46 @@ func (r *PhysRestore) MarkFailed(e error) {
2715
2741
}
2716
2742
}
2717
2743
2744
+ func (r * PhysRestore ) MarkAsFallback () error {
2745
+ r .log .Debug ("set fallback error on cluster level" )
2746
+ err := util .RetryableWrite (r .stg ,
2747
+ r .syncPathCluster + "." + string (defs .StatusError ), errStatus (errors .New ("fallback is applied" )))
2748
+ if err != nil {
2749
+ r .log .Error ("write cluster error state for fallback: %v" , err )
2750
+ // lets try to create restore meta
2751
+ }
2752
+
2753
+ mf := filepath .Join (defs .PhysRestoresDir , r .name ) + ".json"
2754
+ _ , err = r .stg .FileStat (mf )
2755
+ if err != nil {
2756
+ return errors .Errorf ("file stat %s: %v" , mf , err )
2757
+ }
2758
+
2759
+ var meta * RestoreMeta
2760
+ src , err := r .stg .SourceReader (mf )
2761
+ if err != nil {
2762
+ return errors .Errorf ("get file %s: %v" , mf , err )
2763
+ }
2764
+ err = json .NewDecoder (src ).Decode (& meta )
2765
+ if err != nil {
2766
+ return errors .Wrap (err , "decode meta" )
2767
+ }
2768
+
2769
+ meta .Status = defs .StatusError
2770
+ meta .Error = "fallback strategy is applied"
2771
+
2772
+ buf , err := json .MarshalIndent (meta , "" , "\t " )
2773
+ if err != nil {
2774
+ return errors .Wrapf (err , "encode restore meta" )
2775
+ }
2776
+ err = util .RetryableWrite (r .stg , mf , buf )
2777
+ if err != nil {
2778
+ return errors .Wrap (err , "write restore meta" )
2779
+ }
2780
+
2781
+ return nil
2782
+ }
2783
+
2718
2784
// moveAll moves fromDir content (files and dirs) to toDir content.
2719
2785
// It ignores all files/dirs specified within toIgnore slice.
2720
2786
func moveAll (fromDir , toDir string , toIgnore []string , l log.LogEvent ) error {
0 commit comments