Skip to content

Commit 0b9c7bd

Browse files
authored
REP-5218 Roll back tasks as needed when restarting. (#35)
The worker threads look for status=added tasks when finding work to do. When it grabs a task it sets status=processing; thus, no other threads will try to grab that task. If the verifier crashes, though, any tasks that were status=processing need to be reset; otherwise no worker thread will ever pick them up, and the verifier will hang. This changeset implements that reset of status=processing tasks so that the verifier will be able to finish after a restart. Specifically: - If the primary task is found `processing`, this means that the initial listing of namespaces was interrupted. Thus, we now reset the primary task to `added`, and all other tasks are deleted. - If any collection-verification tasks are found `processing`, that means we never finished writing out the collection’s partitions and checking its metadata. Any existing partition tasks are unusuable, so they get deleted, and the collection-verification task is reset to `added`. - Any remaining `processing` partition tasks are reset to `added`. The use of transactions to do these rollbacks means that the verifier’s metadata can no longer be a standalone mongod.
1 parent 3f42f7d commit 0b9c7bd

File tree

5 files changed

+405
-18
lines changed

5 files changed

+405
-18
lines changed

internal/verifier/check.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,17 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
153153
if err != nil {
154154
return err
155155
}
156+
157+
err = verifier.doInMetaTransaction(
158+
ctx,
159+
func(ctx context.Context, sCtx mongo.SessionContext) error {
160+
return verifier.ResetInProgressTasks(sCtx)
161+
},
162+
)
163+
if err != nil {
164+
return errors.Wrap(err, "failed to reset any in-progress tasks")
165+
}
166+
156167
verifier.logger.Debug().Msg("Starting Check")
157168

158169
verifier.phase = Check

internal/verifier/reset.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package verifier
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"go.mongodb.org/mongo-driver/bson"
6+
"go.mongodb.org/mongo-driver/mongo"
7+
)
8+
9+
var defaultTaskUpdate = bson.M{
10+
"$set": bson.M{"status": verificationTaskAdded},
11+
"$unset": bson.M{"begin_time": 1},
12+
}
13+
14+
func (verifier *Verifier) ResetInProgressTasks(ctx mongo.SessionContext) error {
15+
didReset, err := verifier.handleIncompletePrimary(ctx)
16+
17+
if err == nil {
18+
if didReset {
19+
return nil
20+
}
21+
22+
err = verifier.resetCollectionTasksIfNeeded(ctx)
23+
}
24+
25+
if err == nil {
26+
err = verifier.resetPartitionTasksIfNeeded(ctx)
27+
}
28+
29+
return err
30+
}
31+
32+
func (verifier *Verifier) handleIncompletePrimary(ctx mongo.SessionContext) (bool, error) {
33+
taskColl := verifier.verificationTaskCollection()
34+
35+
cursor, err := taskColl.Find(
36+
ctx,
37+
bson.M{
38+
"type": verificationTaskPrimary,
39+
"status": bson.M{"$ne": verificationTaskCompleted},
40+
},
41+
)
42+
if err != nil {
43+
return false, errors.Wrapf(err, "failed to fetch incomplete %#q task", verificationTaskPrimary)
44+
}
45+
46+
var incompletePrimaries []VerificationTask
47+
err = cursor.All(ctx, &incompletePrimaries)
48+
if err != nil {
49+
return false, errors.Wrapf(err, "failed to read incomplete %#q task", verificationTaskPrimary)
50+
}
51+
52+
switch len(incompletePrimaries) {
53+
case 0:
54+
// Nothing to do.
55+
case 1:
56+
// Invariant: task status should be “added”.
57+
if incompletePrimaries[0].Status != verificationTaskAdded {
58+
verifier.logger.Panic().
59+
Interface("task", incompletePrimaries[0]).
60+
Msg("Primary task status has invalid state.")
61+
}
62+
63+
verifier.logger.Info().
64+
Msg("Previous verifier run left primary task incomplete. Deleting non-primary tasks.")
65+
66+
deleted, err := taskColl.DeleteMany(
67+
ctx,
68+
bson.M{
69+
"type": bson.M{
70+
"$ne": verificationTaskPrimary,
71+
},
72+
},
73+
)
74+
if err != nil {
75+
return false, errors.Wrapf(err, "failed to delete non-%#q tasks", verificationTaskPrimary)
76+
}
77+
78+
verifier.logger.Info().
79+
Int64("deletedTasksCount", deleted.DeletedCount).
80+
Msg("Found and deleted non-primary tasks.")
81+
82+
return true, nil
83+
default:
84+
verifier.logger.Panic().
85+
Interface("tasks", incompletePrimaries).
86+
Msg("Found multiple incomplete primary tasks; there should only be 1.")
87+
}
88+
89+
return false, nil
90+
}
91+
92+
func (verifier *Verifier) resetCollectionTasksIfNeeded(ctx mongo.SessionContext) error {
93+
taskColl := verifier.verificationTaskCollection()
94+
95+
cursor, err := taskColl.Find(
96+
ctx,
97+
bson.M{
98+
"type": verificationTaskVerifyCollection,
99+
"status": verificationTaskProcessing,
100+
},
101+
)
102+
if err != nil {
103+
return errors.Wrapf(err, "failed to find incomplete %#q tasks", verificationTaskVerifyCollection)
104+
}
105+
var incompleteCollTasks []VerificationTask
106+
err = cursor.All(ctx, &incompleteCollTasks)
107+
if err != nil {
108+
return errors.Wrapf(err, "failed to read incomplete %#q tasks", verificationTaskVerifyCollection)
109+
}
110+
111+
if len(incompleteCollTasks) > 0 {
112+
verifier.logger.Info().
113+
Int("count", len(incompleteCollTasks)).
114+
Msg("Previous verifier run left collection-level verification task(s) pending. Resetting.")
115+
}
116+
117+
for _, task := range incompleteCollTasks {
118+
_, err := taskColl.DeleteMany(
119+
ctx,
120+
bson.M{
121+
"type": verificationTaskVerifyDocuments,
122+
"query_filter.namespace": task.QueryFilter.Namespace,
123+
},
124+
)
125+
if err != nil {
126+
return errors.Wrapf(err, "failed to delete namespace %#q's %#q tasks", task.QueryFilter.Namespace, verificationTaskVerifyDocuments)
127+
}
128+
129+
_, err = taskColl.UpdateOne(
130+
ctx,
131+
bson.M{
132+
"type": verificationTaskVerifyCollection,
133+
"query_filter.namespace": task.QueryFilter.Namespace,
134+
},
135+
defaultTaskUpdate,
136+
)
137+
if err != nil {
138+
return errors.Wrapf(err, "failed to reset namespace %#q's %#q task", task.QueryFilter.Namespace, verificationTaskVerifyCollection)
139+
}
140+
}
141+
142+
return nil
143+
}
144+
145+
func (verifier *Verifier) resetPartitionTasksIfNeeded(ctx mongo.SessionContext) error {
146+
taskColl := verifier.verificationTaskCollection()
147+
148+
_, err := taskColl.UpdateMany(
149+
ctx,
150+
bson.M{
151+
"type": verificationTaskVerifyDocuments,
152+
"status": verificationTaskProcessing,
153+
},
154+
defaultTaskUpdate,
155+
)
156+
if err != nil {
157+
return errors.Wrapf(err, "failed to reset in-progress %#q tasks", verificationTaskVerifyDocuments)
158+
}
159+
160+
return nil
161+
}

internal/verifier/reset_test.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package verifier
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"github.com/10gen/migration-verifier/internal/partitions"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
"go.mongodb.org/mongo-driver/mongo/options"
11+
)
12+
13+
func (suite *MultiDataVersionTestSuite) TestResetPrimaryTask() {
14+
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
15+
16+
created, err := verifier.CheckIsPrimary()
17+
suite.Require().NoError(err)
18+
suite.Require().True(created)
19+
20+
_, err = verifier.InsertCollectionVerificationTask("foo.bar")
21+
suite.Require().NoError(err)
22+
23+
ctx := context.Background()
24+
25+
err = verifier.doInMetaTransaction(
26+
ctx,
27+
func(_ context.Context, ctx mongo.SessionContext) error {
28+
return verifier.ResetInProgressTasks(ctx)
29+
},
30+
)
31+
suite.Require().NoError(err)
32+
33+
tasksColl := verifier.verificationTaskCollection()
34+
cursor, err := tasksColl.Find(ctx, bson.M{})
35+
suite.Require().NoError(err)
36+
var taskDocs []bson.M
37+
suite.Require().NoError(cursor.All(ctx, &taskDocs))
38+
39+
suite.Assert().Len(taskDocs, 1)
40+
}
41+
42+
func (suite *MultiDataVersionTestSuite) TestResetNonPrimaryTasks() {
43+
ctx := context.Background()
44+
45+
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
46+
47+
// Create a primary task, and set it to complete.
48+
created, err := verifier.CheckIsPrimary()
49+
suite.Require().NoError(err)
50+
suite.Require().True(created)
51+
52+
suite.Require().NoError(verifier.UpdatePrimaryTaskComplete())
53+
54+
ns1 := "foo.bar"
55+
ns2 := "qux.quux"
56+
57+
// Create a collection-verification task, and set it to processing.
58+
collTask, err := verifier.InsertCollectionVerificationTask(ns1)
59+
suite.Require().NoError(err)
60+
61+
collTask.Status = verificationTaskProcessing
62+
63+
suite.Require().NoError(
64+
verifier.UpdateVerificationTask(collTask),
65+
)
66+
67+
// Create three partition tasks with the same namespace as the
68+
// collection-verification task (status=[added, processing, completed]),
69+
// and another for a different namespace that’s completed.
70+
for _, taskParts := range []struct {
71+
Status verificationTaskStatus
72+
Namespace string
73+
}{
74+
{verificationTaskAdded, ns1},
75+
{verificationTaskProcessing, ns1},
76+
{verificationTaskCompleted, ns1},
77+
{verificationTaskAdded, ns2},
78+
{verificationTaskProcessing, ns2},
79+
{verificationTaskCompleted, ns2},
80+
} {
81+
task, err := verifier.InsertPartitionVerificationTask(
82+
&partitions.Partition{
83+
Ns: &partitions.Namespace{
84+
DB: strings.Split(taskParts.Namespace, ".")[0],
85+
Coll: strings.Split(taskParts.Namespace, ".")[1],
86+
},
87+
},
88+
nil,
89+
taskParts.Namespace,
90+
)
91+
suite.Require().NoError(err)
92+
93+
task.Status = taskParts.Status
94+
suite.Require().NoError(
95+
verifier.UpdateVerificationTask(task),
96+
)
97+
}
98+
99+
// Reset tasks
100+
err = verifier.doInMetaTransaction(
101+
ctx,
102+
func(_ context.Context, ctx mongo.SessionContext) error {
103+
return verifier.ResetInProgressTasks(ctx)
104+
},
105+
)
106+
suite.Require().NoError(err)
107+
108+
// Contents should just be the primary task and
109+
// the completed partition-level.
110+
tasksColl := verifier.verificationTaskCollection()
111+
cursor, err := tasksColl.Find(
112+
ctx,
113+
bson.M{},
114+
options.Find().SetSort(bson.D{
115+
{"type", 1},
116+
{"query_filter.namespace", 1},
117+
{"status", 1},
118+
}),
119+
)
120+
suite.Require().NoError(err)
121+
var taskDocs []VerificationTask
122+
suite.Require().NoError(cursor.All(ctx, &taskDocs))
123+
124+
suite.Require().Len(taskDocs, 5)
125+
126+
// The tasks that should remain are:
127+
128+
// the primary (completed)
129+
suite.Assert().Equal(
130+
verificationTaskPrimary,
131+
taskDocs[0].Type,
132+
)
133+
134+
// the 2 ns2 partition tasks that weren’t completed (both “added”)
135+
suite.Assert().Equal(
136+
verificationTaskVerifyDocuments,
137+
taskDocs[1].Type,
138+
)
139+
suite.Assert().Equal(
140+
verificationTaskAdded,
141+
taskDocs[1].Status,
142+
)
143+
suite.Assert().Equal(
144+
ns2,
145+
taskDocs[1].QueryFilter.Namespace,
146+
)
147+
148+
suite.Assert().Equal(
149+
verificationTaskVerifyDocuments,
150+
taskDocs[2].Type,
151+
)
152+
suite.Assert().Equal(
153+
verificationTaskAdded,
154+
taskDocs[2].Status,
155+
)
156+
suite.Assert().Equal(
157+
ns2,
158+
taskDocs[2].QueryFilter.Namespace,
159+
)
160+
161+
// the ns2 partition task that *was* completed
162+
suite.Assert().Equal(
163+
verificationTaskVerifyDocuments,
164+
taskDocs[3].Type,
165+
)
166+
suite.Assert().Equal(
167+
verificationTaskCompleted,
168+
taskDocs[3].Status,
169+
)
170+
suite.Assert().Equal(
171+
ns2,
172+
taskDocs[3].QueryFilter.Namespace,
173+
)
174+
175+
// ns1’s verify-collection task (added state)
176+
suite.Assert().Equal(
177+
verificationTaskVerifyCollection,
178+
taskDocs[4].Type,
179+
)
180+
suite.Assert().Equal(
181+
verificationTaskAdded,
182+
taskDocs[4].Status,
183+
)
184+
suite.Assert().Equal(
185+
ns1,
186+
taskDocs[4].QueryFilter.Namespace,
187+
)
188+
}

0 commit comments

Comments
 (0)