|
5 | 5 | "fmt" |
6 | 6 | "time" |
7 | 7 |
|
| 8 | + "github.com/10gen/migration-verifier/internal/logger" |
| 9 | + "github.com/10gen/migration-verifier/internal/retry" |
8 | 10 | mapset "github.com/deckarep/golang-set/v2" |
9 | 11 | "github.com/pkg/errors" |
10 | 12 | "go.mongodb.org/mongo-driver/bson" |
@@ -178,19 +180,27 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any |
178 | 180 | return err |
179 | 181 | } |
180 | 182 | } |
181 | | - err = verifier.AddMetaIndexes(ctx) |
182 | | - if err != nil { |
183 | | - return err |
184 | | - } |
185 | 183 |
|
186 | | - err = verifier.doInMetaTransaction( |
187 | | - ctx, |
188 | | - func(ctx context.Context, sCtx mongo.SessionContext) error { |
189 | | - return verifier.ResetInProgressTasks(sCtx) |
190 | | - }, |
191 | | - ) |
| 184 | + err = retry.Retry(ctx, verifier.logger, func(_ *retry.Info) error { |
| 185 | + err := verifier.AddMetaIndexes(ctx) |
| 186 | + if err != nil { |
| 187 | + return errors.Wrap(err, "failed to create verifier metadata's indexes") |
| 188 | + } |
| 189 | + |
| 190 | + err = verifier.doInMetaTransaction( |
| 191 | + ctx, |
| 192 | + func(ctx context.Context, sCtx mongo.SessionContext) error { |
| 193 | + return verifier.ResetInProgressTasks(sCtx) |
| 194 | + }, |
| 195 | + ) |
| 196 | + if err != nil { |
| 197 | + return errors.Wrap(err, "failed to reset any in-progress tasks") |
| 198 | + } |
| 199 | + |
| 200 | + return nil |
| 201 | + }) |
192 | 202 | if err != nil { |
193 | | - return errors.Wrap(err, "failed to reset any in-progress tasks") |
| 203 | + return err |
194 | 204 | } |
195 | 205 |
|
196 | 206 | verifier.logger.Debug().Msg("Starting Check") |
@@ -352,7 +362,7 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { |
352 | 362 | ) |
353 | 363 | } |
354 | 364 | } |
355 | | - isPrimary, err := verifier.CheckIsPrimary(ctx) |
| 365 | + isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx) |
356 | 366 | if err != nil { |
357 | 367 | return err |
358 | 368 | } |
@@ -386,30 +396,40 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { |
386 | 396 | return nil |
387 | 397 | } |
388 | 398 |
|
389 | | -func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection, taskType verificationTaskType, generation int) ([]VerificationTask, []VerificationTask, error) { |
| 399 | +func FetchFailedAndIncompleteTasks( |
| 400 | + ctx context.Context, |
| 401 | + logger *logger.Logger, |
| 402 | + coll *mongo.Collection, |
| 403 | + taskType verificationTaskType, |
| 404 | + generation int, |
| 405 | +) ([]VerificationTask, []VerificationTask, error) { |
390 | 406 | var FailedTasks, allTasks, IncompleteTasks []VerificationTask |
391 | 407 |
|
392 | | - cur, err := coll.Find(ctx, bson.D{ |
393 | | - bson.E{Key: "type", Value: taskType}, |
394 | | - bson.E{Key: "generation", Value: generation}, |
395 | | - }) |
396 | | - if err != nil { |
397 | | - return FailedTasks, IncompleteTasks, err |
398 | | - } |
| 408 | + err := retry.Retry(ctx, logger, func(_ *retry.Info) error { |
| 409 | + cur, err := coll.Find(ctx, bson.D{ |
| 410 | + bson.E{Key: "type", Value: taskType}, |
| 411 | + bson.E{Key: "generation", Value: generation}, |
| 412 | + }) |
| 413 | + if err != nil { |
| 414 | + return err |
| 415 | + } |
399 | 416 |
|
400 | | - err = cur.All(ctx, &allTasks) |
401 | | - if err != nil { |
402 | | - return FailedTasks, IncompleteTasks, err |
403 | | - } |
404 | | - for _, t := range allTasks { |
405 | | - if failedStatus.Contains(t.Status) { |
406 | | - FailedTasks = append(FailedTasks, t) |
407 | | - } else if t.Status != verificationTaskCompleted { |
408 | | - IncompleteTasks = append(IncompleteTasks, t) |
| 417 | + err = cur.All(ctx, &allTasks) |
| 418 | + if err != nil { |
| 419 | + return err |
409 | 420 | } |
410 | | - } |
| 421 | + for _, t := range allTasks { |
| 422 | + if failedStatus.Contains(t.Status) { |
| 423 | + FailedTasks = append(FailedTasks, t) |
| 424 | + } else if t.Status != verificationTaskCompleted { |
| 425 | + IncompleteTasks = append(IncompleteTasks, t) |
| 426 | + } |
| 427 | + } |
| 428 | + |
| 429 | + return nil |
| 430 | + }) |
411 | 431 |
|
412 | | - return FailedTasks, IncompleteTasks, nil |
| 432 | + return FailedTasks, IncompleteTasks, err |
413 | 433 | } |
414 | 434 |
|
415 | 435 | // work is the logic for an individual worker thread. |
|
0 commit comments