@@ -6,42 +6,64 @@ import (
66 "go.mongodb.org/mongo-driver/mongo"
77)
88
9+ var defaultTaskUpdate = bson.M {
10+ "$set" : bson.M {"status" : verificationTaskAdded },
11+ "$unset" : bson.M {"begin_time" : 1 },
12+ }
13+
914func (verifier * Verifier ) ResetInProgressTasks (ctx mongo.SessionContext ) error {
10- err := verifier .resetPrimaryTaskIfNeeded (ctx )
15+ didReset , err := verifier .handleIncompletePrimary (ctx )
16+
17+ if err == nil {
18+ if didReset {
19+ return nil
20+ }
1121
12- if err != nil {
1322 err = verifier .resetCollectionTasksIfNeeded (ctx )
1423 }
1524
16- if err ! = nil {
25+ if err = = nil {
1726 err = verifier .resetPartitionTasksIfNeeded (ctx )
1827 }
1928
2029 return err
2130}
2231
23- func (verifier * Verifier ) resetPrimaryTaskIfNeeded (ctx mongo.SessionContext ) error {
32+ func (verifier * Verifier ) handleIncompletePrimary (ctx mongo.SessionContext ) ( bool , error ) {
2433 taskColl := verifier .verificationTaskCollection ()
2534
26- incompletePrimary , err := taskColl .CountDocuments (
35+ cursor , err := taskColl .Find (
2736 ctx ,
2837 bson.M {
2938 "type" : verificationTaskPrimary ,
30- "status" : verificationTaskProcessing ,
39+ "status" : bson. M { "$ne" : verificationTaskCompleted } ,
3140 },
3241 )
3342 if err != nil {
34- return errors .Wrapf (err , "failed to count incomplete %#q tasks " , verificationTaskPrimary )
43+ return false , errors .Wrapf (err , "failed to fetch incomplete %#q task " , verificationTaskPrimary )
3544 }
3645
37- switch incompletePrimary {
46+ var incompletePrimaries []VerificationTask
47+ err = cursor .All (ctx , & incompletePrimaries )
48+ if err != nil {
49+ return false , errors .Wrapf (err , "failed to read incomplete %#q task" , verificationTaskPrimary )
50+ }
51+
52+ switch len (incompletePrimaries ) {
3853 case 0 :
3954 // Nothing to do.
4055 case 1 :
56+ // Invariant: task status should be “added”.
57+ if incompletePrimaries [0 ].Status != verificationTaskAdded {
58+ verifier .logger .Panic ().
59+ Interface ("task" , incompletePrimaries [0 ]).
60+ Msg ("Primary task status has invalid state." )
61+ }
62+
4163 verifier .logger .Info ().
42- Msg ("Previous verifier run left primary task incomplete. Resetting ." )
64+ Msg ("Previous verifier run left primary task incomplete. Deleting non-primary tasks ." )
4365
44- _ , err := taskColl .DeleteMany (
66+ deleted , err := taskColl .DeleteMany (
4567 ctx ,
4668 bson.M {
4769 "type" : bson.M {
@@ -50,30 +72,21 @@ func (verifier *Verifier) resetPrimaryTaskIfNeeded(ctx mongo.SessionContext) err
5072 },
5173 )
5274 if err != nil {
53- return errors .Wrapf (err , "failed to delete non-%#q tasks" , verificationTaskPrimary )
75+ return false , errors .Wrapf (err , "failed to delete non-%#q tasks" , verificationTaskPrimary )
5476 }
5577
56- _ , err = taskColl .UpdateOne (
57- ctx ,
58- bson.M {
59- "type" : verificationTaskPrimary ,
60- },
61- bson.M {
62- "$set" : bson.M {"status" : verificationTaskAdded },
63- },
64- )
65- if err != nil {
66- return errors .Wrapf (err , "failed to reset %#q task" , verificationTaskPrimary )
67- }
78+ verifier .logger .Info ().
79+ Int64 ("deletedTasksCount" , deleted .DeletedCount ).
80+ Msg ("Found and deleted non-primary tasks." )
6881
69- return nil
82+ return true , nil
7083 default :
7184 verifier .logger .Panic ().
72- Int64 ( "incompletePrimary " , incompletePrimary ).
85+ Interface ( "tasks " , incompletePrimaries ).
7386 Msg ("Found multiple incomplete primary tasks; there should only be 1." )
7487 }
7588
76- return nil
89+ return false , nil
7790}
7891
7992func (verifier * Verifier ) resetCollectionTasksIfNeeded (ctx mongo.SessionContext ) error {
@@ -95,12 +108,18 @@ func (verifier *Verifier) resetCollectionTasksIfNeeded(ctx mongo.SessionContext)
95108 return errors .Wrapf (err , "failed to read incomplete %#q tasks" , verificationTaskVerifyCollection )
96109 }
97110
111+ if len (incompleteCollTasks ) > 0 {
112+ verifier .logger .Info ().
113+ Int ("count" , len (incompleteCollTasks )).
114+ Msg ("Previous verifier run left collection-level verification task(s) pending. Resetting." )
115+ }
116+
98117 for _ , task := range incompleteCollTasks {
99118 _ , err := taskColl .DeleteMany (
100119 ctx ,
101120 bson.M {
102- "type" : verificationTaskVerifyDocuments ,
103- "query_filter.partition. namespace" : task .QueryFilter .Namespace ,
121+ "type" : verificationTaskVerifyDocuments ,
122+ "query_filter.namespace" : task .QueryFilter .Namespace ,
104123 },
105124 )
106125 if err != nil {
@@ -113,10 +132,7 @@ func (verifier *Verifier) resetCollectionTasksIfNeeded(ctx mongo.SessionContext)
113132 "type" : verificationTaskVerifyCollection ,
114133 "query_filter.namespace" : task .QueryFilter .Namespace ,
115134 },
116- bson.M {
117- "$set" : bson.M {"status" : verificationTaskAdded },
118- // TODO unset
119- },
135+ defaultTaskUpdate ,
120136 )
121137 if err != nil {
122138 return errors .Wrapf (err , "failed to reset namespace %#q's %#q task" , task .QueryFilter .Namespace , verificationTaskVerifyCollection )
@@ -135,10 +151,7 @@ func (verifier *Verifier) resetPartitionTasksIfNeeded(ctx mongo.SessionContext)
135151 "type" : verificationTaskVerifyDocuments ,
136152 "status" : verificationTaskProcessing ,
137153 },
138- bson.M {
139- "$set" : bson.M {"status" : verificationTaskAdded },
140- // TODO unset
141- },
154+ defaultTaskUpdate ,
142155 )
143156 if err != nil {
144157 return errors .Wrapf (err , "failed to reset in-progress %#q tasks" , verificationTaskVerifyDocuments )
0 commit comments