Skip to content

Commit 0558156

Browse files
committed
Fix RS and cluster status in case of error
1 parent fccb81b commit 0558156

File tree

1 file changed

+37
-4
lines changed

1 file changed

+37
-4
lines changed

pbm/restore/physical.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,39 @@ func (n nodeError) Error() string {
594594
return fmt.Sprintf("%s failed: %s", n.node, n.msg)
595595
}
596596

597+
// checkForRSLevelErr checks if all nodes have an error,
598+
// and in that case true is returned.
599+
// If any node doesn't have error, false is returned.
600+
func (r *PhysRestore) checkForRSLevelErr() bool {
601+
for f := range r.syncPathPeers {
602+
errFile := f + "." + string(defs.StatusError)
603+
_, err := r.stg.FileStat(errFile)
604+
if errors.Is(err, storage.ErrNotExist) {
605+
return false
606+
}
607+
if err != nil {
608+
r.log.Error("error while checking file %s: %v", errFile, err)
609+
}
610+
// error file is found
611+
}
612+
return true
613+
}
614+
615+
// checkForClusterLevelErr checks if any RS (shard) has an error.
616+
// It returns true if at least one RS has error, otherwise false.
617+
func (r *PhysRestore) checkForClusterLevelErr() bool {
618+
for f := range r.syncPathShards {
619+
errFile := f + "." + string(defs.StatusError)
620+
_, err := r.stg.FileStat(errFile)
621+
if err == nil {
622+
return true
623+
} else if !errors.Is(err, storage.ErrNotExist) {
624+
r.log.Error("error while checking file %s: %v", errFile, err)
625+
}
626+
}
627+
return false
628+
}
629+
597630
func (r *PhysRestore) waitFiles(
598631
status defs.Status,
599632
objs map[string]struct{},
@@ -816,7 +849,7 @@ func (r *PhysRestore) Snapshot(
816849
// set failed status of node on error, but
817850
// don't mark node as failed after the local restore succeed
818851
if err != nil && !progress.is(restoreDone) && !errors.Is(err, ErrNoDataForShard) {
819-
r.MarkFailed(meta, err, !progress.is(restoreStared))
852+
r.MarkFailed(meta, err)
820853
}
821854

822855
r.close(err == nil, progress.is(restoreStared) && !progress.is(restoreDone))
@@ -2367,7 +2400,7 @@ func (r *PhysRestore) checkMongod(needVersion string) (version string, err error
23672400
}
23682401

23692402
// MarkFailed sets the restore and rs state as failed with the given message
2370-
func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
2403+
func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error) {
23712404
var nerr nodeError
23722405
if errors.As(e, &nerr) {
23732406
e = nerr
@@ -2390,14 +2423,14 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
23902423
// At some point, every node will try to set an rs and cluster state
23912424
// (in `toState` method).
23922425
// Here we are not aware of partlyDone etc so leave it to the `toState`.
2393-
if r.nodeInfo.IsPrimary && markCluster {
2426+
if r.checkForRSLevelErr() {
23942427
serr := util.RetryableWrite(r.stg,
23952428
r.syncPathRS+"."+string(defs.StatusError), errStatus(e))
23962429
if serr != nil {
23972430
r.log.Error("MarkFailed: write replset error state `%v`: %v", e, serr)
23982431
}
23992432
}
2400-
if r.nodeInfo.IsClusterLeader() && markCluster {
2433+
if r.nodeInfo.IsLeader() && r.checkForClusterLevelErr() {
24012434
serr := util.RetryableWrite(r.stg,
24022435
r.syncPathCluster+"."+string(defs.StatusError), errStatus(e))
24032436
if serr != nil {

0 commit comments

Comments
 (0)