@@ -235,13 +235,72 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
235
235
r .log .Warning ("remove file <%s>: %v" , backup .FilelistName , err )
236
236
}
237
237
} else if cleanup { // clean-up dbpath on err if needed
238
- r .migrateFromFallbackDirToDbDir ()
238
+ r .log .Debug ("wait for cluster status" )
239
+ cStatus , err := r .waitClusterStatus ()
240
+ if err != nil {
241
+ r .log .Warning ("waiting for cluster status during cleanup: %v" , err )
242
+ }
243
+
244
+ if cStatus == defs .StatusError {
245
+ err := r .migrateFromFallbackDirToDbDir ()
246
+ if err != nil {
247
+ r .log .Error ("migrate from fallback dir: %v" , err )
248
+ }
249
+ } else { // cluster status is done or partlyDone
250
+ r .log .Debug ("clean-up dbpath" )
251
+ err := removeAll (r .dbpath , r .log )
252
+ if err != nil {
253
+ r .log .Error ("flush dbpath %s: %v" , r .dbpath , err )
254
+ }
255
+ }
239
256
}
240
257
if r .stopHB != nil {
241
258
close (r .stopHB )
242
259
}
243
260
}
244
261
262
+ // waitClusterStatus blocks until cluster status file is set on one of final statuses.
263
+ // It also checks HB to see if cluster is stucked.
264
+ func (r * PhysRestore ) waitClusterStatus () (defs.Status , error ) {
265
+ errF := fmt .Sprintf ("%s.%s" , r .syncPathCluster , defs .StatusError )
266
+ doneF := fmt .Sprintf ("%s.%s" , r .syncPathCluster , defs .StatusDone )
267
+ partlyDoneF := fmt .Sprintf ("%s.%s" , r .syncPathCluster , defs .StatusPartlyDone )
268
+ hbF := fmt .Sprintf ("%s.%s" , r .syncPathCluster , syncHbSuffix )
269
+
270
+ tk := time .NewTicker (time .Second * 5 )
271
+ defer tk .Stop ()
272
+
273
+ for range tk .C {
274
+ _ , err := r .stg .FileStat (errF )
275
+ if err == nil {
276
+ return defs .StatusError , nil
277
+ } else if ! errors .Is (err , storage .ErrNotExist ) {
278
+ r .log .Error ("error while reading %s file" , errF )
279
+ }
280
+
281
+ _ , err = r .stg .FileStat (doneF )
282
+ if err == nil {
283
+ return defs .StatusDone , nil
284
+ } else if ! errors .Is (err , storage .ErrNotExist ) {
285
+ r .log .Error ("error while reading %s file" , errF )
286
+ }
287
+
288
+ _ , err = r .stg .FileStat (partlyDoneF )
289
+ if err == nil {
290
+ return defs .StatusPartlyDone , nil
291
+ } else if ! errors .Is (err , storage .ErrNotExist ) {
292
+ r .log .Error ("error while reading %s file" , errF )
293
+ }
294
+
295
+ err = r .checkHB (hbF )
296
+ if err != nil {
297
+ return defs .StatusError , errors .Wrap (err , "check hb for cluster" )
298
+ }
299
+
300
+ }
301
+ return defs .StatusError , errors .New ("wait cluster status" )
302
+ }
303
+
245
304
func (r * PhysRestore ) flush (ctx context.Context ) error {
246
305
r .log .Debug ("shutdown server" )
247
306
rsStat , err := topo .GetReplsetStatus (ctx , r .node )
0 commit comments