From 9403332bf35264905b896dee54264bd67abba1df Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 14:47:21 -0500 Subject: [PATCH 1/7] fix error handling --- internal/verifier/check.go | 146 +++++++++++------ internal/verifier/migration_verifier.go | 159 ++++++++++++------- internal/verifier/migration_verifier_test.go | 30 ++-- internal/verifier/recheck.go | 3 +- internal/verifier/recheck_test.go | 3 +- internal/verifier/reset_test.go | 6 +- internal/verifier/verification_task.go | 7 +- 7 files changed, 223 insertions(+), 131 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index a9307d55..51c23408 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -3,13 +3,13 @@ package verifier import ( "context" "fmt" - "sync" "time" mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "golang.org/x/sync/errgroup" ) type GenerationStatus string @@ -57,19 +57,14 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context) error { return nil } -func (verifier *Verifier) CheckWorker(ctx context.Context) error { - verifier.logger.Debug().Msgf("Starting %d verification workers", verifier.numWorkers) - ctx, cancel := context.WithCancel(ctx) - - wg := sync.WaitGroup{} - for i := 0; i < verifier.numWorkers; i++ { - wg.Add(1) - go verifier.Work(ctx, i, &wg) - time.Sleep(10 * time.Millisecond) - } - +func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { generation := verifier.generation + verifier.logger.Debug(). + Int("generation", generation). + Int("workersCount", verifier.numWorkers). + Msgf("Starting verification worker threads.") + // Since we do a progress report right at the start we don’t need // this to go in non-debug output. startLabel := fmt.Sprintf("Starting check generation %d", generation) @@ -81,44 +76,75 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error { verifier.writeStringBuilder(genStartReport) - waitForTaskCreation := 0 + eg, ctx := errgroup.WithContext(ctxIn) - for { + // If the change stream fails, everything should stop. + eg.Go(func() error { select { case err := <-verifier.changeStreamErrChan: - cancel() return errors.Wrap(err, "change stream failed") case <-ctx.Done(): - cancel() return nil - default: } + }) - verificationStatus, err := verifier.GetVerificationStatus() - if err != nil { - verifier.logger.Error().Msgf("Failed getting verification status: %v", err) - } + // Start the worker threads. + for i := 0; i < verifier.numWorkers; i++ { + eg.Go(func() error { + return errors.Wrapf( + verifier.Work(ctx, i), + "worker %d failed", + i, + ) + }) + time.Sleep(10 * time.Millisecond) + } - if waitForTaskCreation%2 == 0 { - if generation > 0 || verifier.gen0PendingCollectionTasks.Load() == 0 { - verifier.PrintVerificationSummary(ctx, GenerationInProgress) + waitForTaskCreation := 0 + + eg.Go(func() error { + for { + verificationStatus, err := verifier.GetVerificationStatus(ctx) + if err != nil { + return errors.Wrapf( + err, + "failed to retrieve status of generation %d's tasks", + generation, + ) } - } - //wait for task to be created, if none of the tasks found. - if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 || verificationStatus.RecheckTasks > 0 { - waitForTaskCreation++ - time.Sleep(verifier.verificationStatusCheckInterval) - } else { - verifier.PrintVerificationSummary(ctx, GenerationComplete) - verifier.logger.Debug().Msg("Verification tasks complete") - cancel() - wg.Wait() - break + if waitForTaskCreation%2 == 0 { + if generation > 0 || verifier.gen0PendingCollectionTasks.Load() == 0 { + verifier.PrintVerificationSummary(ctx, GenerationInProgress) + } + } + + // The generation continues as long as >=1 task for this generation is + // “added” or “pending”. + if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 { + waitForTaskCreation++ + time.Sleep(verifier.verificationStatusCheckInterval) + continue + } else { + verifier.PrintVerificationSummary(ctx, GenerationComplete) + return nil + } } + }) + + err := eg.Wait() + + if err != nil { + verifier.logger.Debug(). + Int("generation", generation). + Msgf("Check finished.") } - verifier.logger.Debug().Msgf("Check generation %d finished", generation) - return nil + + return errors.Wrapf( + err, + "check generation %d failed", + generation, + ) } func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any, testChan ...chan struct{}) error { @@ -179,10 +205,14 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any return errors.Wrap(err, "failed to start change stream on source") } } - // Log out the verification status when initially booting up so it's easy to see the current state - verificationStatus, err := verifier.GetVerificationStatus() + + // Log the verification status when initially booting up so it's easy to see the current state + verificationStatus, err := verifier.GetVerificationStatus(ctx) if err != nil { - verifier.logger.Error().Msgf("Failed getting verification status: %v", err) + return errors.Wrapf( + err, + "failed to retrieve verification status", + ) } else { verifier.logger.Debug().Msgf("Initial verification phase: %+v", verificationStatus) } @@ -259,8 +289,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any err = verifier.ClearRecheckDocs(ctx) if err != nil { - verifier.logger.Error().Msgf("Failed trying to clear out old recheck docs, continuing: %v", - err) + verifier.logger.Warn(). + Err(err). + Msg("Failed to clear out old recheck docs. (This is probably unimportant.)") } verifier.mux.Unlock() } @@ -368,9 +399,8 @@ func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection, return FailedTasks, IncompleteTasks, nil } -func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.WaitGroup) { - defer wg.Done() - +// Work is the logic for an individual worker thread. +func (verifier *Verifier) Work(ctx context.Context, workerNum int) error { verifier.logger.Debug(). Int("workerNum", workerNum). Msg("Worker started.") @@ -382,9 +412,9 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait for { select { case <-ctx.Done(): - return + return ctx.Err() default: - task, err := verifier.FindNextVerifyTaskAndUpdate() + task, err := verifier.FindNextVerifyTaskAndUpdate(ctx) if errors.Is(err, mongo.ErrNoDocuments) { duration := verifier.workerSleepDelayMillis * time.Millisecond @@ -399,20 +429,32 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait continue } else if err != nil { - panic(err) + return errors.Wrap( + err, + "failed to seek next task", + ) } - if task.Type == verificationTaskVerifyCollection { - verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) + switch task.Type { + case verificationTaskVerifyCollection: + err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) + if err != nil { + return err + } if task.Generation == 0 { newVal := verifier.gen0PendingCollectionTasks.Add(-1) if newVal == 0 { verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete) } } - } else { - verifier.ProcessVerifyTask(workerNum, task) + case verificationTaskVerifyDocuments: + err := verifier.ProcessVerifyTask(ctx, workerNum, task) + if err != nil { + return err + } } } } + + return nil } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 485b59c4..f76bbb11 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -153,7 +153,6 @@ type VerificationStatus struct { FailedTasks int `json:"failedTasks"` CompletedTasks int `json:"completedTasks"` MetadataMismatchTasks int `json:"metadataMismatchTasks"` - RecheckTasks int `json:"recheckTasks"` } // VerificationResult holds the Verification Results. @@ -584,7 +583,7 @@ func (verifier *Verifier) compareOneDocument(srcClientDoc, dstClientDoc bson.Raw }}, nil } -func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTask) { +func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, task *VerificationTask) error { start := time.Now() verifier.logger.Debug(). @@ -603,7 +602,7 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas }() problems, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments( - context.Background(), + ctx, task, ) @@ -661,7 +660,7 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas // Create a task for the next generation to recheck the // mismatched & missing docs. - err := verifier.InsertFailedCompareRecheckDocs(task.QueryFilter.Namespace, idsToRecheck, dataSizes) + err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes) if err != nil { verifier.logger.Error(). Err(err). @@ -674,14 +673,16 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas } } - err = verifier.UpdateVerificationTask(task) - if err != nil { + updateErr := verifier.UpdateVerificationTask(ctx, task) + if updateErr != nil { verifier.logger.Error(). - Err(err). + Err(updateErr). Int("workerNum", workerNum). Interface("task", task.PrimaryKey). Msg("Failed to update task status.") } + + return err } func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) { @@ -951,18 +952,43 @@ func (verifier *Verifier) doIndexSpecsMatch(ctx context.Context, srcSpec bson.Ra return false, errors.Errorf("weirdly received %d matching index docs (should be 0 or 1)", count) } -func (verifier *Verifier) ProcessCollectionVerificationTask(ctx context.Context, workerNum int, task *VerificationTask) { - verifier.logger.Debug().Msgf("[Worker %d] Processing collection", workerNum) - verifier.verifyMetadataAndPartitionCollection(ctx, workerNum, task) - err := verifier.UpdateVerificationTask(task) +func (verifier *Verifier) ProcessCollectionVerificationTask( + ctx context.Context, + workerNum int, + task *VerificationTask, +) error { + verifier.logger.Debug(). + Int("workerNum", workerNum). + Interface("task", task.PrimaryKey). + Str("namespace", task.QueryFilter.Namespace). + Msg("Processing collection.") + + err := verifier.verifyMetadataAndPartitionCollection(ctx, workerNum, task) if err != nil { - verifier.logger.Error().Msgf("Failed updating verification status: %v", err) + return errors.Wrapf( + err, + "failed to process collection %#q for task %s", + task.QueryFilter.Namespace, + task.PrimaryKey, + ) } + + return errors.Wrapf( + verifier.UpdateVerificationTask(ctx, task), + "failed to update verification task %s's status", + task.PrimaryKey, + ) } func (verifier *Verifier) markCollectionFailed(workerNum int, task *VerificationTask, cluster string, namespace string, err error) { task.Status = verificationTaskFailed - verifier.logger.Error().Msgf("[Worker %d] Unable to read metadata for collection %s from cluster %s: %+v", workerNum, namespace, cluster, err) + verifier.logger.Error(). + Int("workerNum", workerNum). + Interface("task", task.PrimaryKey). + Str("namespace", namespace). + Err(err). + Msg("Failed to read collection metadata.") + task.FailedDocs = append(task.FailedDocs, VerificationResult{ NameSpace: namespace, Cluster: cluster, @@ -1102,36 +1128,41 @@ func (verifier *Verifier) verifyIndexes( return results, nil } -func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Context, workerNum int, task *VerificationTask) { +func (verifier *Verifier) verifyMetadataAndPartitionCollection( + ctx context.Context, + workerNum int, + task *VerificationTask, +) error { srcColl := verifier.srcClientCollection(task) dstColl := verifier.dstClientCollection(task) srcNs := FullName(srcColl) dstNs := FullName(dstColl) - srcSpecOpt, srcErr := util.GetCollectionSpecIfExists(ctx, srcColl) - if srcErr != nil { - verifier.markCollectionFailed(workerNum, task, ClusterSource, srcNs, srcErr) - } - - dstSpecOpt, dstErr := util.GetCollectionSpecIfExists(ctx, dstColl) - if dstErr != nil { - verifier.markCollectionFailed(workerNum, task, ClusterTarget, dstNs, dstErr) + srcSpecOpt, err := util.GetCollectionSpecIfExists(ctx, srcColl) + if err != nil { + return errors.Wrapf( + err, + "failed to fetch %#q's specification on source", + FullName(srcColl), + ) } - if srcErr != nil || dstErr != nil { - return + dstSpecOpt, err := util.GetCollectionSpecIfExists(ctx, dstColl) + if err != nil { + return errors.Wrapf( + err, + "failed to fetch %#q's specification on destination", + FullName(srcColl), + ) } - insertFailedCollection := func() { + insertFailedCollection := func() error { _, err := verifier.InsertFailedCollectionVerificationTask(srcNs) - if err != nil { - verifier.logger.Fatal(). - Int("workerNum", workerNum). - Str("srcNamespace", srcNs). - Str("dstNamespace", dstNs). - Err(err). - Msg("Failed to persist collection verification task.") - } + return errors.Wrapf( + err, + "failed to persist metadata mismatch for collection %#q", + srcNs, + ) } srcSpec, hasSrcSpec := srcSpecOpt.Get() @@ -1147,19 +1178,24 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte // This counts as success. task.Status = verificationTaskCompleted - return + return nil } + task.Status = verificationTaskFailed // Fall through here; comparing the collection specifications will produce the correct // failure output. } specificationProblems, verifyData := verifier.compareCollectionSpecifications(srcNs, dstNs, srcSpecOpt, dstSpecOpt) if specificationProblems != nil { - insertFailedCollection() + err := insertFailedCollection() + if err != nil { + return err + } + task.FailedDocs = specificationProblems if !verifyData { task.Status = verificationTaskFailed - return + return nil } task.Status = verificationTaskMetadataMismatch } @@ -1170,37 +1206,46 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte } else { task.Status = verificationTaskCompleted } - return + return nil } indexProblems, err := verifier.verifyIndexes(ctx, srcColl, dstColl, srcSpec.IDIndex, dstSpec.IDIndex) if err != nil { - task.Status = verificationTaskFailed - verifier.logger.Error(). - Int("workerNum", workerNum). - Str("namespace", srcNs). - Err(err). - Msgf("Failed to compare collection indexes.") - - return + return errors.Wrapf( + err, + "failed to compare namespace %#q's indexes", + srcNs, + ) } if indexProblems != nil { if specificationProblems == nil { // don't insert a failed collection unless we did not insert one above - insertFailedCollection() + err = insertFailedCollection() + if err != nil { + return err + } } task.FailedDocs = append(task.FailedDocs, indexProblems...) task.Status = verificationTaskMetadataMismatch } + // We’ve confirmed that the collection metadata (including indices and shard keys) + // matches between soruce & destination. Now we can partition the collection. + if task.Generation == 0 { partitions, shardKeys, docsCount, bytesCount, err := verifier.partitionAndInspectNamespace(ctx, srcNs) if err != nil { - task.Status = verificationTaskFailed - verifier.logger.Error().Msgf("[Worker %d] Error partitioning collection: %+v", workerNum, err) - return + return errors.Wrapf( + err, + "failed to partition collection %#q", + srcNs, + ) } - verifier.logger.Debug().Msgf("[Worker %d] split collection “%s” into %d partitions", workerNum, srcNs, len(partitions)) + verifier.logger.Debug(). + Int("workerNum", workerNum). + Str("namespace", srcNs). + Int("partitionsCount", len(partitions)). + Msg("Divided collection into partitions.") task.SourceDocumentCount = docsCount task.SourceByteCount = bytesCount @@ -1208,8 +1253,11 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte for _, partition := range partitions { _, err := verifier.InsertPartitionVerificationTask(partition, shardKeys, dstNs) if err != nil { - task.Status = verificationTaskFailed - verifier.logger.Error().Msgf("[Worker %d] Error inserting verifier tasks: %+v", workerNum, err) + return errors.Wrapf( + err, + "failed to insert a partition task for namespace %#q", + srcNs, + ) } } } @@ -1217,10 +1265,11 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte if task.Status == verificationTaskProcessing { task.Status = verificationTaskCompleted } + + return nil } -func (verifier *Verifier) GetVerificationStatus() (*VerificationStatus, error) { - ctx := context.Background() +func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() @@ -1358,7 +1407,7 @@ func (verifier *Verifier) StartServer() error { } func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { - status, err := verifier.GetVerificationStatus() + status, err := verifier.GetVerificationStatus(ctx) if err != nil { return Progress{Error: err}, err } diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 6c83e3db..61899268 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -244,10 +244,10 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { task2.SourceDocumentCount = 900 task2.SourceByteCount = 9_000 - err = verifier.UpdateVerificationTask(task2) + err = verifier.UpdateVerificationTask(ctx, task2) suite.Require().NoError(err) - err = verifier.UpdateVerificationTask(task1) + err = verifier.UpdateVerificationTask(ctx, task1) suite.Require().NoError(err) stats, err = verifier.GetNamespaceStatistics(ctx) @@ -323,7 +323,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { // Now set one task to status=processing task1parts[0].Status = verificationTaskProcessing - err = verifier.UpdateVerificationTask(task1parts[0]) + err = verifier.UpdateVerificationTask(ctx, task1parts[0]) suite.Require().NoError(err) stats, err = verifier.GetNamespaceStatistics(ctx) @@ -359,10 +359,10 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { task2parts[1].SourceDocumentCount = task2.SourceDocumentCount / 2 task2parts[1].SourceByteCount = task2.SourceByteCount / 2 - err = verifier.UpdateVerificationTask(task2parts[0]) + err = verifier.UpdateVerificationTask(ctx, task2parts[0]) suite.Require().NoError(err) - err = verifier.UpdateVerificationTask(task2parts[1]) + err = verifier.UpdateVerificationTask(ctx, task2parts[1]) suite.Require().NoError(err) stats, err = verifier.GetNamespaceStatistics(ctx) @@ -394,11 +394,11 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { ctx := suite.Context() verifier := suite.BuildVerifier() - err := verifier.InsertFailedCompareRecheckDocs("foo.bar", []interface{}{42}, []int{100}) + err := verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar", []interface{}{42}, []int{100}) suite.Require().NoError(err) - err = verifier.InsertFailedCompareRecheckDocs("foo.bar", []interface{}{43, 44}, []int{100, 100}) + err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar", []interface{}{43, 44}, []int{100, 100}) suite.Require().NoError(err) - err = verifier.InsertFailedCompareRecheckDocs("foo.bar2", []interface{}{42}, []int{100}) + err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar2", []interface{}{42}, []int{100}) suite.Require().NoError(err) event := ParsedEvent{ DocKey: DocKey{ID: int32(55)}, @@ -1197,7 +1197,7 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { func (suite *IntegrationTestSuite) TestVerificationStatus() { verifier := suite.BuildVerifier() - ctx := context.Background() + ctx := suite.Context() metaColl := verifier.verificationDatabase().Collection(verificationTasksCollection) _, err := metaColl.InsertMany(ctx, []interface{}{ @@ -1209,7 +1209,7 @@ func (suite *IntegrationTestSuite) TestVerificationStatus() { }) suite.Require().NoError(err) - status, err := verifier.GetVerificationStatus() + status, err := verifier.GetVerificationStatus(ctx) suite.Require().NoError(err) suite.Equal(1, status.AddedTasks, "added tasks not equal") suite.Equal(1, status.ProcessingTasks, "processing tasks not equal") @@ -1301,7 +1301,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { runner := RunVerifierCheck(ctx, suite.T(), verifier) waitForTasks := func() *VerificationStatus { - status, err := verifier.GetVerificationStatus() + status, err := verifier.GetVerificationStatus(ctx) suite.Require().NoError(err) for status.TotalTasks == 0 && verifier.generation < 50 { @@ -1312,7 +1312,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { time.Sleep(delay) suite.Require().NoError(runner.StartNextGeneration()) suite.Require().NoError(runner.AwaitGenerationEnd()) - status, err = verifier.GetVerificationStatus() + status, err = verifier.GetVerificationStatus(ctx) suite.Require().NoError(err) } return status @@ -1417,7 +1417,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { }() waitForTasks := func() *VerificationStatus { - status, err := verifier.GetVerificationStatus() + status, err := verifier.GetVerificationStatus(ctx) suite.Require().NoError(err) for status.TotalTasks == 0 && verifier.generation < 50 { @@ -1428,7 +1428,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { time.Sleep(delay) checkContinueChan <- struct{}{} <-checkDoneChan - status, err = verifier.GetVerificationStatus() + status, err = verifier.GetVerificationStatus(ctx) suite.Require().NoError(err) } return status @@ -1538,7 +1538,7 @@ func (suite *IntegrationTestSuite) TestBackgroundInIndexSpec() { runner := RunVerifierCheck(ctx, suite.T(), verifier) suite.Require().NoError(runner.AwaitGenerationEnd()) - status, err := verifier.GetVerificationStatus() + status, err := verifier.GetVerificationStatus(ctx) suite.Require().NoError(err) suite.Assert().Zero( status.MetadataMismatchTasks, diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 0f7ad651..9a02d931 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -44,6 +44,7 @@ type RecheckDoc struct { // InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check. func (verifier *Verifier) InsertFailedCompareRecheckDocs( + ctx context.Context, namespace string, documentIDs []interface{}, dataSizes []int) error { dbName, collName := SplitNamespace(namespace) @@ -59,7 +60,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( Msg("Persisting rechecks for mismatched or missing documents.") return verifier.insertRecheckDocs( - context.Background(), + ctx, dbNames, collNames, documentIDs, diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index e4b14b50..db821781 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -12,10 +12,11 @@ import ( func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { verifier := suite.BuildVerifier() - ctx := context.Background() + ctx := suite.Context() suite.Require().NoError( verifier.InsertFailedCompareRecheckDocs( + ctx, "the.namespace", []any{"theDocID"}, []int{1234}, diff --git a/internal/verifier/reset_test.go b/internal/verifier/reset_test.go index f33ce6dd..9acfe7ed 100644 --- a/internal/verifier/reset_test.go +++ b/internal/verifier/reset_test.go @@ -40,7 +40,7 @@ func (suite *IntegrationTestSuite) TestResetPrimaryTask() { } func (suite *IntegrationTestSuite) TestResetNonPrimaryTasks() { - ctx := context.Background() + ctx := suite.Context() verifier := suite.BuildVerifier() @@ -61,7 +61,7 @@ func (suite *IntegrationTestSuite) TestResetNonPrimaryTasks() { collTask.Status = verificationTaskProcessing suite.Require().NoError( - verifier.UpdateVerificationTask(collTask), + verifier.UpdateVerificationTask(ctx, collTask), ) // Create three partition tasks with the same namespace as the @@ -92,7 +92,7 @@ func (suite *IntegrationTestSuite) TestResetNonPrimaryTasks() { task.Status = taskParts.Status suite.Require().NoError( - verifier.UpdateVerificationTask(task), + verifier.UpdateVerificationTask(ctx, task), ) } diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index dc1f4af9..c35ba111 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -181,7 +181,7 @@ func (verifier *Verifier) InsertDocumentRecheckTask(ids []interface{}, dataSize return err } -func (verifier *Verifier) FindNextVerifyTaskAndUpdate() (*VerificationTask, error) { +func (verifier *Verifier) FindNextVerifyTaskAndUpdate(ctx context.Context) (*VerificationTask, error) { var verificationTask = VerificationTask{} filter := bson.M{ "$and": bson.A{ @@ -209,12 +209,11 @@ func (verifier *Verifier) FindNextVerifyTaskAndUpdate() (*VerificationTask, erro // We want “verifyCollection” tasks before “verify”(-document) ones. opts.SetSort(bson.M{"type": -1}) - err := coll.FindOneAndUpdate(context.Background(), filter, updates, opts).Decode(&verificationTask) + err := coll.FindOneAndUpdate(ctx, filter, updates, opts).Decode(&verificationTask) return &verificationTask, err } -func (verifier *Verifier) UpdateVerificationTask(task *VerificationTask) error { - var ctx = context.Background() +func (verifier *Verifier) UpdateVerificationTask(ctx context.Context, task *VerificationTask) error { updateFields := bson.M{ "$set": bson.M{ "status": task.Status, From a681dbff05f2f28059a21b4d3f78f6c9e23965ae Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 15:14:45 -0500 Subject: [PATCH 2/7] fix tests --- internal/verifier/check.go | 85 +++++++++---------- internal/verifier/migration_verifier.go | 15 ---- internal/verifier/migration_verifier_test.go | 86 ++++++++++++++------ 3 files changed, 106 insertions(+), 80 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 51c23408..1d862c94 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -76,7 +76,8 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { verifier.writeStringBuilder(genStartReport) - eg, ctx := errgroup.WithContext(ctxIn) + cancelableCtx, canceler := context.WithCancelCause(ctxIn) + eg, ctx := errgroup.WithContext(cancelableCtx) // If the change stream fails, everything should stop. eg.Go(func() error { @@ -102,6 +103,8 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { waitForTaskCreation := 0 + succeededErr := errors.Errorf("generation %d finished", generation) + eg.Go(func() error { for { verificationStatus, err := verifier.GetVerificationStatus(ctx) @@ -127,6 +130,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { continue } else { verifier.PrintVerificationSummary(ctx, GenerationComplete) + canceler(succeededErr) return nil } } @@ -134,6 +138,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { err := eg.Wait() + if errors.Is(err, succeededErr) { + err = nil + } + if err != nil { verifier.logger.Debug(). Int("generation", generation). @@ -410,51 +418,46 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int) error { Msg("Worker finished.") for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - task, err := verifier.FindNextVerifyTaskAndUpdate(ctx) - if errors.Is(err, mongo.ErrNoDocuments) { - duration := verifier.workerSleepDelayMillis * time.Millisecond - - if duration > 0 { - verifier.logger.Debug(). - Int("workerNum", workerNum). - Stringer("duration", duration). - Msg("No tasks found. Sleeping.") - - time.Sleep(duration) - } + task, err := verifier.FindNextVerifyTaskAndUpdate(ctx) + if errors.Is(err, mongo.ErrNoDocuments) { + duration := verifier.workerSleepDelayMillis * time.Millisecond - continue - } else if err != nil { - return errors.Wrap( - err, - "failed to seek next task", - ) + if duration > 0 { + verifier.logger.Debug(). + Int("workerNum", workerNum). + Stringer("duration", duration). + Msg("No tasks found. Sleeping.") + + time.Sleep(duration) } - switch task.Type { - case verificationTaskVerifyCollection: - err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) - if err != nil { - return err - } - if task.Generation == 0 { - newVal := verifier.gen0PendingCollectionTasks.Add(-1) - if newVal == 0 { - verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete) - } - } - case verificationTaskVerifyDocuments: - err := verifier.ProcessVerifyTask(ctx, workerNum, task) - if err != nil { - return err + continue + } else if errors.Is(err, context.Canceled) { + return nil + } else if err != nil { + return errors.Wrap( + err, + "failed to seek next task", + ) + } + + switch task.Type { + case verificationTaskVerifyCollection: + err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) + if err != nil { + return err + } + if task.Generation == 0 { + newVal := verifier.gen0PendingCollectionTasks.Add(-1) + if newVal == 0 { + verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete) } } + case verificationTaskVerifyDocuments: + err := verifier.ProcessVerifyTask(ctx, workerNum, task) + if err != nil { + return err + } } } - - return nil } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index f76bbb11..ba7aea99 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -980,21 +980,6 @@ func (verifier *Verifier) ProcessCollectionVerificationTask( ) } -func (verifier *Verifier) markCollectionFailed(workerNum int, task *VerificationTask, cluster string, namespace string, err error) { - task.Status = verificationTaskFailed - verifier.logger.Error(). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Str("namespace", namespace). - Err(err). - Msg("Failed to read collection metadata.") - - task.FailedDocs = append(task.FailedDocs, VerificationResult{ - NameSpace: namespace, - Cluster: cluster, - Details: Failed + fmt.Sprintf(" %v", err)}) -} - func getIndexesMap( ctx context.Context, coll *mongo.Collection, diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 61899268..100e6d76 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -671,7 +671,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() { QueryFilter: QueryFilter{ Namespace: "testDb.sameView", To: "testDb.sameView"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskCompleted, task.Status) suite.Nil(task.FailedDocs) @@ -685,7 +687,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() { QueryFilter: QueryFilter{ Namespace: "testDb.wrongColl", To: "testDb.wrongColl"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(task.FailedDocs[0].Field, "Options.viewOn") @@ -703,7 +707,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() { QueryFilter: QueryFilter{ Namespace: "testDb.wrongPipeline", To: "testDb.wrongPipeline"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(task.FailedDocs[0].Field, "Options.pipeline") @@ -726,7 +732,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() { QueryFilter: QueryFilter{ Namespace: "testDb.missingOptionsSrc", To: "testDb.missingOptionsSrc"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(task.FailedDocs[0].Field, "Options.collation") @@ -744,7 +752,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() { QueryFilter: QueryFilter{ Namespace: "testDb.missingOptionsDst", To: "testDb.missingOptionsDst"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(task.FailedDocs[0].Field, "Options.collation") @@ -762,7 +772,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() { QueryFilter: QueryFilter{ Namespace: "testDb.differentOptions", To: "testDb.differentOptions"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(task.FailedDocs[0].Field, "Options.collation") @@ -783,7 +795,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.testColl", To: "testDb.testColl"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) suite.Equal(1, len(task.FailedDocs)) suite.Equal(task.FailedDocs[0].Details, Missing) @@ -798,7 +812,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.testColl", To: "testDb.testCollTo"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) suite.Equal(1, len(task.FailedDocs)) suite.Equal(task.FailedDocs[0].Details, Missing) @@ -813,7 +829,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.destOnlyColl", To: "testDb.destOnlyColl"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) suite.Equal(1, len(task.FailedDocs)) suite.Equal(task.FailedDocs[0].Details, Missing) @@ -830,7 +848,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.viewOnSrc", To: "testDb.viewOnSrc"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) suite.Equal(1, len(task.FailedDocs)) suite.Equal(task.FailedDocs[0].Field, "Type") @@ -847,7 +867,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.cappedOnDst", To: "testDb.cappedOnDst"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskFailed, task.Status) // Capped and size should differ var wrongFields []string @@ -864,7 +886,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.testColl", To: "testDb.testColl"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskCompleted, task.Status) // Neither collection exists success case @@ -873,7 +897,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() { QueryFilter: QueryFilter{ Namespace: "testDb.testCollDNE", To: "testDb.testCollDNE"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskCompleted, task.Status) } @@ -900,7 +926,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { To: "testDb.testColl1", }, } - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskMetadataMismatch, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(srcIndexNames[1], task.FailedDocs[0].ID) @@ -926,7 +954,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { QueryFilter: QueryFilter{ Namespace: "testDb.testColl2", To: "testDb.testColl2"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskMetadataMismatch, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal(dstIndexNames[1], task.FailedDocs[0].ID) @@ -952,7 +982,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { QueryFilter: QueryFilter{ Namespace: "testDb.testColl3", To: "testDb.testColl3"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskMetadataMismatch, task.Status) if suite.Equal(2, len(task.FailedDocs)) { sort.Slice(task.FailedDocs, func(i, j int) bool { @@ -987,7 +1019,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { QueryFilter: QueryFilter{ Namespace: "testDb.testColl4", To: "testDb.testColl4"}} - verifier.verifyMetadataAndPartitionCollection(ctx, 1, task) + suite.Require().NoError( + verifier.verifyMetadataAndPartitionCollection(ctx, 1, task), + ) suite.Equal(verificationTaskMetadataMismatch, task.Status) if suite.Equal(1, len(task.FailedDocs)) { suite.Equal("wrong", task.FailedDocs[0].ID) @@ -1281,16 +1315,19 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() { } func (suite *IntegrationTestSuite) TestGenerationalRechecking() { + dbname1 := suite.DBNameForTest("1") + dbname2 := suite.DBNameForTest("2") + zerolog.SetGlobalLevel(zerolog.DebugLevel) verifier := suite.BuildVerifier() - verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) - verifier.SetDstNamespaces([]string{"testDb2.testColl3"}) + verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"}) + verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"}) verifier.SetNamespaceMap() ctx := suite.Context() - srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1") - dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3") + srcColl := suite.srcMongoClient.Database(dbname1).Collection("testColl1") + dstColl := suite.dstMongoClient.Database(dbname2).Collection("testColl3") _, err := srcColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42}) suite.Require().NoError(err) _, err = srcColl.InsertOne(ctx, bson.M{"_id": 2, "x": 43}) @@ -1548,6 +1585,7 @@ func (suite *IntegrationTestSuite) TestBackgroundInIndexSpec() { func (suite *IntegrationTestSuite) TestPartitionWithFilter() { zerolog.SetGlobalLevel(zerolog.DebugLevel) + dbname := suite.DBNameForTest() ctx := suite.Context() @@ -1557,14 +1595,14 @@ func (suite *IntegrationTestSuite) TestPartitionWithFilter() { // Set up the verifier for testing. verifier := suite.BuildVerifier() - verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) + verifier.SetSrcNamespaces([]string{dbname + ".testColl1"}) verifier.SetNamespaceMap() verifier.globalFilter = filter // Use a small partition size so that we can test creating multiple partitions. verifier.partitionSizeInBytes = 30 // Insert documents into the source. - srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1") + srcColl := suite.srcMongoClient.Database(dbname).Collection("testColl1") // 30 documents with _ids [0, 30) are in the filter. for i := 0; i < 30; i++ { @@ -1579,7 +1617,7 @@ func (suite *IntegrationTestSuite) TestPartitionWithFilter() { } // Create partitions with the filter. - partitions, _, _, _, err := verifier.partitionAndInspectNamespace(ctx, "testDb1.testColl1") + partitions, _, _, _, err := verifier.partitionAndInspectNamespace(ctx, dbname+".testColl1") suite.Require().NoError(err) // Check that each partition have bounds in the filter. From e1c510653c471e03c84df6b4d1129d6d0d65b008 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 16:24:02 -0500 Subject: [PATCH 3/7] more replacements --- internal/retry/retry.go | 4 +-- internal/retry/retryer_test.go | 10 +++---- internal/verifier/change_stream_test.go | 11 +++----- internal/verifier/check.go | 28 +++++++++++--------- internal/verifier/clustertime_test.go | 4 +-- internal/verifier/migration_verifier.go | 4 +-- internal/verifier/migration_verifier_test.go | 6 +++-- internal/verifier/recheck.go | 1 + internal/verifier/recheck_test.go | 8 +++--- internal/verifier/reset_test.go | 9 ++++--- internal/verifier/verification_task.go | 25 ++++++++++++----- 11 files changed, 63 insertions(+), 47 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 4f9358fb..bb2a55aa 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -59,12 +59,12 @@ func (r *Retryer) RunForUUIDAndTransientErrors( // // RunForUUIDErrorOnly returns the collection's current name in all cases. func (r *Retryer) RunForUUIDErrorOnly( - logger *logger.Logger, expectedCollName string, f func(*Info, string) error, + ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error, ) (string, error) { // Since we're not actually sleeping when checking for UUID/name mismatch // errors, we don't need to provide a real context to handle // cancellations. - return r.runRetryLoop(context.Background(), logger, expectedCollName, f, false, true) + return r.runRetryLoop(ctx, logger, expectedCollName, f, false, true) } // RunForTransientErrorsOnly retries f() for transient errors only, and diff --git a/internal/retry/retryer_test.go b/internal/retry/retryer_test.go index 23c315b4..23ebd242 100644 --- a/internal/retry/retryer_test.go +++ b/internal/retry/retryer_test.go @@ -26,7 +26,7 @@ func (suite *UnitTestSuite) TestRetryer() { suite.NoError(err) suite.Equal(0, attemptNumber) - _, err = retryer.RunForUUIDErrorOnly(logger, "foo", f) + _, err = retryer.RunForUUIDErrorOnly(suite.Context(), logger, "foo", f) suite.NoError(err) suite.Equal(0, attemptNumber) @@ -92,7 +92,7 @@ func (suite *UnitTestSuite) TestRetryer() { } return nil } - _, err := retryer.RunForUUIDErrorOnly(logger, "bar", f) + _, err := retryer.RunForUUIDErrorOnly(suite.Context(), logger, "bar", f) suite.NoError(err) suite.Equal(attemptLimit/2, attemptNumber) }) @@ -109,7 +109,7 @@ func (suite *UnitTestSuite) TestRetryer() { Raw: bson.Raw(raw), } } - _, err := retryer.RunForUUIDErrorOnly(logger, "bar", f) + _, err := retryer.RunForUUIDErrorOnly(suite.Context(), logger, "bar", f) suite.NoError(err) // We only did one retry because the actual collection name matched the // previous attempt. @@ -130,7 +130,7 @@ func (suite *UnitTestSuite) TestRetryerDurationLimitIsZero() { return cmdErr } - _, err := retryer.RunForUUIDErrorOnly(suite.Logger(), "bar", f) + _, err := retryer.RunForUUIDErrorOnly(suite.Context(), suite.Logger(), "bar", f) suite.Equal(cmdErr, err) suite.Equal(0, attemptNumber) } @@ -280,7 +280,7 @@ func (suite *UnitTestSuite) TestRetryerWithEmptyCollectionName() { return nil } - name, err := retryer.RunForUUIDErrorOnly(suite.Logger(), "", f) + name, err := retryer.RunForUUIDErrorOnly(suite.Context(), suite.Logger(), "", f) suite.NoError(err) suite.Equal("", name) } diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 14990cb8..f4a6f42d 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -55,7 +55,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { func() { verifier1 := suite.BuildVerifier() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(suite.Context()) defer cancel() err := verifier1.StartChangeStream(ctx) suite.Require().NoError(err) @@ -147,8 +147,7 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { verifier := suite.BuildVerifier() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := suite.Context() sess, err := suite.srcMongoClient.StartSession() suite.Require().NoError(err) sctx := mongo.NewSessionContext(ctx, sess) @@ -167,8 +166,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { verifier := suite.BuildVerifier() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := suite.Context() sess, err := suite.srcMongoClient.StartSession() suite.Require().NoError(err) sctx := mongo.NewSessionContext(ctx, sess) @@ -220,8 +218,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { func (suite *IntegrationTestSuite) TestNoStartAtTime() { verifier := suite.BuildVerifier() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := suite.Context() sess, err := suite.srcMongoClient.StartSession() suite.Require().NoError(err) sctx := mongo.NewSessionContext(ctx, sess) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 1d862c94..41d08d35 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -93,7 +93,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { for i := 0; i < verifier.numWorkers; i++ { eg.Go(func() error { return errors.Wrapf( - verifier.Work(ctx, i), + verifier.work(ctx, i), "worker %d failed", i, ) @@ -103,7 +103,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { waitForTaskCreation := 0 - succeededErr := errors.Errorf("generation %d finished", generation) + succeeded := false eg.Go(func() error { for { @@ -130,7 +130,8 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { continue } else { verifier.PrintVerificationSummary(ctx, GenerationComplete) - canceler(succeededErr) + succeeded = true + canceler(errors.New("ok")) return nil } } @@ -138,7 +139,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { err := eg.Wait() - if errors.Is(err, succeededErr) { + if succeeded { err = nil } @@ -225,7 +226,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any verifier.logger.Debug().Msgf("Initial verification phase: %+v", verificationStatus) } - err = verifier.CreateInitialTasks() + err = verifier.CreateInitialTasks(ctx) if err != nil { return err } @@ -335,7 +336,7 @@ func (verifier *Verifier) setupAllNamespaceList(ctx context.Context) error { return nil } -func (verifier *Verifier) CreateInitialTasks() error { +func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { // If we don't know the src namespaces, we're definitely not the primary task. if !verifier.verifyAll { if len(verifier.srcNamespaces) == 0 { @@ -359,13 +360,13 @@ func (verifier *Verifier) CreateInitialTasks() error { return nil } if verifier.verifyAll { - err := verifier.setupAllNamespaceList(context.Background()) + err := verifier.setupAllNamespaceList(ctx) if err != nil { return err } } for _, src := range verifier.srcNamespaces { - _, err := verifier.InsertCollectionVerificationTask(src) + _, err := verifier.InsertCollectionVerificationTask(ctx, src) if err != nil { verifier.logger.Error().Msgf("Failed to insert collection verification task: %s", err) return err @@ -407,8 +408,8 @@ func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection, return FailedTasks, IncompleteTasks, nil } -// Work is the logic for an individual worker thread. -func (verifier *Verifier) Work(ctx context.Context, workerNum int) error { +// work is the logic for an individual worker thread. +func (verifier *Verifier) work(ctx context.Context, workerNum int) error { verifier.logger.Debug(). Int("workerNum", workerNum). Msg("Worker started.") @@ -432,9 +433,12 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int) error { } continue - } else if errors.Is(err, context.Canceled) { - return nil + /* + } else if errors.Is(err, context.Canceled) { + return nil + */ } else if err != nil { + return errors.Wrap( err, "failed to seek next task", diff --git a/internal/verifier/clustertime_test.go b/internal/verifier/clustertime_test.go index 6a13c02d..14e4417a 100644 --- a/internal/verifier/clustertime_test.go +++ b/internal/verifier/clustertime_test.go @@ -1,15 +1,13 @@ package verifier import ( - "context" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" ) func (suite *IntegrationTestSuite) TestGetNewClusterTime() { - ctx := context.Background() + ctx := suite.Context() logger, _ := getLoggerAndWriter("stdout") sess, err := suite.srcMongoClient.StartSession() diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index ba7aea99..09a36021 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1142,7 +1142,7 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } insertFailedCollection := func() error { - _, err := verifier.InsertFailedCollectionVerificationTask(srcNs) + _, err := verifier.InsertFailedCollectionVerificationTask(ctx, srcNs) return errors.Wrapf( err, "failed to persist metadata mismatch for collection %#q", @@ -1236,7 +1236,7 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( task.SourceByteCount = bytesCount for _, partition := range partitions { - _, err := verifier.InsertPartitionVerificationTask(partition, shardKeys, dstNs) + _, err := verifier.InsertPartitionVerificationTask(ctx, partition, shardKeys, dstNs) if err != nil { return errors.Wrapf( err, diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 100e6d76..833553e0 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -216,10 +216,10 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { // Now add 2 namespaces. Add them “out of order” to test // that we sort the returned array by Namespace. - task2, err := verifier.InsertCollectionVerificationTask("mydb.coll2") + task2, err := verifier.InsertCollectionVerificationTask(ctx, "mydb.coll2") suite.Require().NoError(err) - task1, err := verifier.InsertCollectionVerificationTask("mydb.coll1") + task1, err := verifier.InsertCollectionVerificationTask(ctx, "mydb.coll1") suite.Require().NoError(err) stats, err = verifier.GetNamespaceStatistics(ctx) @@ -276,6 +276,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { task2parts := [2]*VerificationTask{} for i := range task1parts { task1part, err := verifier.InsertPartitionVerificationTask( + ctx, &partitions.Partition{ Ns: &partitions.Namespace{DB: "mydb", Coll: "coll1"}, }, @@ -287,6 +288,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { task1parts[i] = task1part task2part, err := verifier.InsertPartitionVerificationTask( + ctx, &partitions.Partition{ Ns: &partitions.Namespace{DB: "mydb", Coll: "coll2"}, }, diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 9a02d931..1e10adfb 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -261,6 +261,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { namespace := prevDBName + "." + prevCollName err := verifier.InsertDocumentRecheckTask( + ctx, idAccum, types.ByteCount(dataSizeAccum), namespace, diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index db821781..00d4c6aa 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -89,7 +89,7 @@ func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifie func (suite *IntegrationTestSuite) TestLargeIDInsertions() { verifier := suite.BuildVerifier() - ctx := context.Background() + ctx := suite.Context() overlyLarge := 7 * 1024 * 1024 // Three of these exceed our 16MB limit, but two do not id1 := strings.Repeat("a", overlyLarge) @@ -151,7 +151,7 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() { func (suite *IntegrationTestSuite) TestLargeDataInsertions() { verifier := suite.BuildVerifier() verifier.partitionSizeInBytes = 1024 * 1024 - ctx := context.Background() + ctx := suite.Context() id1 := "a" id2 := "b" @@ -212,7 +212,7 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() { func (suite *IntegrationTestSuite) TestMultipleNamespaces() { verifier := suite.BuildVerifier() - ctx := context.Background() + ctx := suite.Context() id1 := "a" id2 := "b" @@ -263,7 +263,7 @@ func (suite *IntegrationTestSuite) TestMultipleNamespaces() { func (suite *IntegrationTestSuite) TestGenerationalClear() { verifier := suite.BuildVerifier() - ctx := context.Background() + ctx := suite.Context() id1 := "a" id2 := "b" diff --git a/internal/verifier/reset_test.go b/internal/verifier/reset_test.go index 9acfe7ed..f1569ca8 100644 --- a/internal/verifier/reset_test.go +++ b/internal/verifier/reset_test.go @@ -11,17 +11,17 @@ import ( ) func (suite *IntegrationTestSuite) TestResetPrimaryTask() { + ctx := suite.Context() + verifier := suite.BuildVerifier() created, err := verifier.CheckIsPrimary() suite.Require().NoError(err) suite.Require().True(created) - _, err = verifier.InsertCollectionVerificationTask("foo.bar") + _, err = verifier.InsertCollectionVerificationTask(ctx, "foo.bar") suite.Require().NoError(err) - ctx := context.Background() - err = verifier.doInMetaTransaction( ctx, func(_ context.Context, ctx mongo.SessionContext) error { @@ -55,7 +55,7 @@ func (suite *IntegrationTestSuite) TestResetNonPrimaryTasks() { ns2 := "qux.quux" // Create a collection-verification task, and set it to processing. - collTask, err := verifier.InsertCollectionVerificationTask(ns1) + collTask, err := verifier.InsertCollectionVerificationTask(ctx, ns1) suite.Require().NoError(err) collTask.Status = verificationTaskProcessing @@ -79,6 +79,7 @@ func (suite *IntegrationTestSuite) TestResetNonPrimaryTasks() { {verificationTaskCompleted, ns2}, } { task, err := verifier.InsertPartitionVerificationTask( + ctx, &partitions.Partition{ Ns: &partitions.Namespace{ DB: strings.Split(taskParts.Namespace, ".")[0], diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index c35ba111..e7a8ddd9 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -125,17 +125,25 @@ func (verifier *Verifier) insertCollectionVerificationTask( } func (verifier *Verifier) InsertCollectionVerificationTask( - srcNamespace string) (*VerificationTask, error) { + ctx context.Context, + srcNamespace string, +) (*VerificationTask, error) { return verifier.insertCollectionVerificationTask(srcNamespace, verifier.generation) } func (verifier *Verifier) InsertFailedCollectionVerificationTask( - srcNamespace string) (*VerificationTask, error) { + ctx context.Context, + srcNamespace string, +) (*VerificationTask, error) { return verifier.insertCollectionVerificationTask(srcNamespace, verifier.generation+1) } -func (verifier *Verifier) InsertPartitionVerificationTask(partition *partitions.Partition, shardKeys []string, - dstNamespace string) (*VerificationTask, error) { +func (verifier *Verifier) InsertPartitionVerificationTask( + ctx context.Context, + partition *partitions.Partition, + shardKeys []string, + dstNamespace string, +) (*VerificationTask, error) { srcNamespace := strings.Join([]string{partition.Ns.DB, partition.Ns.Coll}, ".") verificationTask := VerificationTask{ PrimaryKey: primitive.NewObjectID(), @@ -153,7 +161,12 @@ func (verifier *Verifier) InsertPartitionVerificationTask(partition *partitions. return &verificationTask, err } -func (verifier *Verifier) InsertDocumentRecheckTask(ids []interface{}, dataSize types.ByteCount, srcNamespace string) error { +func (verifier *Verifier) InsertDocumentRecheckTask( + ctx context.Context, + ids []interface{}, + dataSize types.ByteCount, + srcNamespace string, +) error { dstNamespace := srcNamespace if len(verifier.nsMap) != 0 { var ok bool @@ -176,7 +189,7 @@ func (verifier *Verifier) InsertDocumentRecheckTask(ids []interface{}, dataSize SourceDocumentCount: types.DocumentCount(len(ids)), SourceByteCount: dataSize, } - var ctx = context.Background() + _, err := verifier.verificationTaskCollection().InsertOne(ctx, &verificationTask) return err } From eec456fb5dbaa12d8436e54d48803cb6ca1cf3b8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 19:55:40 -0500 Subject: [PATCH 4/7] more fixes for Error() logs --- internal/verifier/check.go | 15 ++- internal/verifier/migration_verifier.go | 167 ++++++++++++------------ 2 files changed, 96 insertions(+), 86 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 41d08d35..08475cac 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -346,9 +346,11 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { verifier.dstNamespaces = verifier.srcNamespaces } if len(verifier.srcNamespaces) != len(verifier.dstNamespaces) { - err := errors.Errorf("Different number of source and destination namespaces") - verifier.logger.Error().Msgf("%s", err) - return err + return errors.Errorf( + "source has %d namespace(s), but destination has %d (they must match)", + len(verifier.srcNamespaces), + len(verifier.dstNamespaces), + ) } } isPrimary, err := verifier.CheckIsPrimary() @@ -368,8 +370,11 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { for _, src := range verifier.srcNamespaces { _, err := verifier.InsertCollectionVerificationTask(ctx, src) if err != nil { - verifier.logger.Error().Msgf("Failed to insert collection verification task: %s", err) - return err + return errors.Wrapf( + err, + "failed to insert collection verification task for namespace %#q", + src, + ) } } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 09a36021..df86c9dd 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -607,82 +607,82 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, ) if err != nil { - task.Status = verificationTaskFailed - verifier.logger.Error(). - Err(err). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Msg("Failed to fetch and compare documents for document comparison task.") - } else { - task.SourceDocumentCount = docsCount - task.SourceByteCount = bytesCount + return errors.Wrapf( + err, + "worker %d failed to process document comparison task %s (namespace: %#q)", + workerNum, + task.PrimaryKey, + task.QueryFilter.Namespace, + ) + } - if len(problems) == 0 { - task.Status = verificationTaskCompleted + task.SourceDocumentCount = docsCount + task.SourceByteCount = bytesCount + + if len(problems) == 0 { + task.Status = verificationTaskCompleted + } else { + task.Status = verificationTaskFailed + // We know we won't change lastGeneration while verification tasks are running, so no mutex needed here. + if verifier.lastGeneration { + verifier.logger.Error(). + Int("workerNum", workerNum). + Interface("task", task.PrimaryKey). + Str("namespace", task.QueryFilter.Namespace). + Int("mismatchesCount", len(problems)). + Msg("Document(s) mismatched, and this is the final generation.") } else { - task.Status = verificationTaskFailed - // We know we won't change lastGeneration while verification tasks are running, so no mutex needed here. - if verifier.lastGeneration { - verifier.logger.Error(). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Msg("Document comparison task failed critical section and is a true error.") - } else { - verifier.logger.Debug(). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Msg("Document comparison task failed, but it may pass in the next generation.") - - var mismatches []VerificationResult - var missingIds []interface{} - var dataSizes []int - - // This stores all IDs for the next generation to check. - // Its length should equal len(mismatches) + len(missingIds). - var idsToRecheck []interface{} - - for _, mismatch := range problems { - idsToRecheck = append(idsToRecheck, mismatch.ID) - dataSizes = append(dataSizes, mismatch.dataSize) - - if mismatch.Details == Missing { - missingIds = append(missingIds, mismatch.ID) - } else { - mismatches = append(mismatches, mismatch) - } + verifier.logger.Debug(). + Int("workerNum", workerNum). + Interface("task", task.PrimaryKey). + Str("namespace", task.QueryFilter.Namespace). + Int("mismatchesCount", len(problems)). + Msg("Document comparison task failed, but it may pass in the next generation.") + + var mismatches []VerificationResult + var missingIds []interface{} + var dataSizes []int + + // This stores all IDs for the next generation to check. + // Its length should equal len(mismatches) + len(missingIds). + var idsToRecheck []interface{} + + for _, mismatch := range problems { + idsToRecheck = append(idsToRecheck, mismatch.ID) + dataSizes = append(dataSizes, mismatch.dataSize) + + if mismatch.Details == Missing { + missingIds = append(missingIds, mismatch.ID) + } else { + mismatches = append(mismatches, mismatch) } + } - // Update ids of the failed task so that only mismatches and - // missing are reported. Matching documents are thus hidden - // from the progress report. - task.Ids = missingIds - task.FailedDocs = mismatches - - // Create a task for the next generation to recheck the - // mismatched & missing docs. - err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes) - if err != nil { - verifier.logger.Error(). - Err(err). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Int("rechecksCount", len(idsToRecheck)). - Msg("Failed to enqueue rechecks after document mismatches.") - } + // Update ids of the failed task so that only mismatches and + // missing are reported. Matching documents are thus hidden + // from the progress report. + task.Ids = missingIds + task.FailedDocs = mismatches + + // Create a task for the next generation to recheck the + // mismatched & missing docs. + err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes) + if err != nil { + return errors.Wrapf( + err, + "failed to enqueue %d recheck(s) of mismatched documents", + len(idsToRecheck), + ) } } } - updateErr := verifier.UpdateVerificationTask(ctx, task) - if updateErr != nil { - verifier.logger.Error(). - Err(updateErr). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Msg("Failed to update task status.") - } - - return err + return errors.Wrapf( + verifier.UpdateVerificationTask(ctx, task), + "failed to persist task %s's new status (%#q)", + task.PrimaryKey, + task.Status, + ) } func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) { @@ -822,7 +822,7 @@ func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, name func (verifier *Verifier) compareCollectionSpecifications( srcNs, dstNs string, srcSpecOpt, dstSpecOpt option.Option[util.CollectionSpec], -) ([]VerificationResult, bool) { +) ([]VerificationResult, bool, error) { srcSpec, hasSrcSpec := srcSpecOpt.Get() dstSpec, hasDstSpec := dstSpecOpt.Get() @@ -830,20 +830,20 @@ func (verifier *Verifier) compareCollectionSpecifications( return []VerificationResult{{ NameSpace: srcNs, Cluster: ClusterSource, - Details: Missing}}, false + Details: Missing}}, false, nil } if !hasDstSpec { return []VerificationResult{{ NameSpace: dstNs, Cluster: ClusterTarget, - Details: Missing}}, false + Details: Missing}}, false, nil } if srcSpec.Type != dstSpec.Type { return []VerificationResult{{ NameSpace: srcNs, Cluster: ClusterTarget, Field: "Type", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Type, dstSpec.Type)}}, false + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Type, dstSpec.Type)}}, false, nil // If the types differ, the rest is not important. } var results []VerificationResult @@ -857,13 +857,11 @@ func (verifier *Verifier) compareCollectionSpecifications( if !bytes.Equal(srcSpec.Options, dstSpec.Options) { mismatchDetails, err := BsonUnorderedCompareRawDocumentWithDetails(srcSpec.Options, dstSpec.Options) if err != nil { - verifier.logger.Error().Msgf("Unable to parse collection options for %s: %+v", srcNs, err) - results = append(results, VerificationResult{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Field: "Options", - Details: "ParseError " + fmt.Sprintf("%v", err)}) - return results, false + return nil, false, errors.Wrapf( + err, + "failed to compare namespace %#q's specifications", + srcNs, + ) } if mismatchDetails == nil { results = append(results, VerificationResult{ @@ -881,7 +879,7 @@ func (verifier *Verifier) compareCollectionSpecifications( // Do not compare data between capped and uncapped collections because the partitioning is different. canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped")) - return results, canCompareData + return results, canCompareData, nil } func (verifier *Verifier) doIndexSpecsMatch(ctx context.Context, srcSpec bson.Raw, dstSpec bson.Raw) (bool, error) { @@ -1170,7 +1168,14 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( // Fall through here; comparing the collection specifications will produce the correct // failure output. } - specificationProblems, verifyData := verifier.compareCollectionSpecifications(srcNs, dstNs, srcSpecOpt, dstSpecOpt) + specificationProblems, verifyData, err := verifier.compareCollectionSpecifications(srcNs, dstNs, srcSpecOpt, dstSpecOpt) + if err != nil { + return errors.Wrapf( + err, + "failed to compare collection %#q's specifications", + srcNs, + ) + } if specificationProblems != nil { err := insertFailedCollection() if err != nil { From 1bfee57e91c9679828c51116b99b7810848f0e72 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 20:01:42 -0500 Subject: [PATCH 5/7] more context --- internal/verifier/check.go | 4 ++-- internal/verifier/reset_test.go | 6 +++--- internal/verifier/verification_task.go | 16 ++++++++-------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 08475cac..d756a22c 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -353,7 +353,7 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { ) } } - isPrimary, err := verifier.CheckIsPrimary() + isPrimary, err := verifier.CheckIsPrimary(ctx) if err != nil { return err } @@ -380,7 +380,7 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error { verifier.gen0PendingCollectionTasks.Store(int32(len(verifier.srcNamespaces))) - err = verifier.UpdatePrimaryTaskComplete() + err = verifier.UpdatePrimaryTaskComplete(ctx) if err != nil { return err } diff --git a/internal/verifier/reset_test.go b/internal/verifier/reset_test.go index f1569ca8..3a677ab9 100644 --- a/internal/verifier/reset_test.go +++ b/internal/verifier/reset_test.go @@ -15,7 +15,7 @@ func (suite *IntegrationTestSuite) TestResetPrimaryTask() { verifier := suite.BuildVerifier() - created, err := verifier.CheckIsPrimary() + created, err := verifier.CheckIsPrimary(ctx) suite.Require().NoError(err) suite.Require().True(created) @@ -45,11 +45,11 @@ func (suite *IntegrationTestSuite) TestResetNonPrimaryTasks() { verifier := suite.BuildVerifier() // Create a primary task, and set it to complete. - created, err := verifier.CheckIsPrimary() + created, err := verifier.CheckIsPrimary(ctx) suite.Require().NoError(err) suite.Require().True(created) - suite.Require().NoError(verifier.UpdatePrimaryTaskComplete()) + suite.Require().NoError(verifier.UpdatePrimaryTaskComplete(ctx)) ns1 := "foo.bar" ns2 := "qux.quux" diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index e7a8ddd9..edfaaed5 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -90,6 +90,7 @@ type VerificationRange struct { } func (verifier *Verifier) insertCollectionVerificationTask( + ctx context.Context, srcNamespace string, generation int) (*VerificationTask, error) { @@ -120,7 +121,7 @@ func (verifier *Verifier) insertCollectionVerificationTask( To: dstNamespace, }, } - _, err := verifier.verificationTaskCollection().InsertOne(context.Background(), verificationTask) + _, err := verifier.verificationTaskCollection().InsertOne(ctx, verificationTask) return &verificationTask, err } @@ -128,14 +129,14 @@ func (verifier *Verifier) InsertCollectionVerificationTask( ctx context.Context, srcNamespace string, ) (*VerificationTask, error) { - return verifier.insertCollectionVerificationTask(srcNamespace, verifier.generation) + return verifier.insertCollectionVerificationTask(ctx, srcNamespace, verifier.generation) } func (verifier *Verifier) InsertFailedCollectionVerificationTask( ctx context.Context, srcNamespace string, ) (*VerificationTask, error) { - return verifier.insertCollectionVerificationTask(srcNamespace, verifier.generation+1) + return verifier.insertCollectionVerificationTask(ctx, srcNamespace, verifier.generation+1) } func (verifier *Verifier) InsertPartitionVerificationTask( @@ -157,7 +158,7 @@ func (verifier *Verifier) InsertPartitionVerificationTask( To: dstNamespace, }, } - _, err := verifier.verificationTaskCollection().InsertOne(context.Background(), verificationTask) + _, err := verifier.verificationTaskCollection().InsertOne(ctx, verificationTask) return &verificationTask, err } @@ -247,7 +248,7 @@ func (verifier *Verifier) UpdateVerificationTask(ctx context.Context, task *Veri return err } -func (verifier *Verifier) CheckIsPrimary() (bool, error) { +func (verifier *Verifier) CheckIsPrimary(ctx context.Context) (bool, error) { ownerSetId := primitive.NewObjectID() filter := bson.M{"type": verificationTaskPrimary} opts := options.Update() @@ -259,7 +260,7 @@ func (verifier *Verifier) CheckIsPrimary() (bool, error) { "status": verificationTaskAdded, }, } - result, err := verifier.verificationTaskCollection().UpdateOne(context.Background(), filter, update, opts) + result, err := verifier.verificationTaskCollection().UpdateOne(ctx, filter, update, opts) if err != nil { return false, err } @@ -268,8 +269,7 @@ func (verifier *Verifier) CheckIsPrimary() (bool, error) { return isPrimary, nil } -func (verifier *Verifier) UpdatePrimaryTaskComplete() error { - var ctx = context.Background() +func (verifier *Verifier) UpdatePrimaryTaskComplete(ctx context.Context) error { updateFields := bson.M{ "$set": bson.M{ "status": verificationTaskCompleted, From 4c54c68cef1ceef87b9548ecac98804fbd8978bc Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 20:19:20 -0500 Subject: [PATCH 6/7] a few more tweaks --- internal/verifier/check.go | 10 +++------- internal/verifier/migration_verifier.go | 2 +- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index d756a22c..d34eb799 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -127,11 +127,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 { waitForTaskCreation++ time.Sleep(verifier.verificationStatusCheckInterval) - continue } else { verifier.PrintVerificationSummary(ctx, GenerationComplete) succeeded = true - canceler(errors.New("ok")) + canceler(errors.Errorf("generation %d succeeded", generation)) return nil } } @@ -438,12 +437,7 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } continue - /* - } else if errors.Is(err, context.Canceled) { - return nil - */ } else if err != nil { - return errors.Wrap( err, "failed to seek next task", @@ -467,6 +461,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { if err != nil { return err } + default: + panic("Unknown verification task type: " + task.Type) } } } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index df86c9dd..ff67b8c8 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1219,7 +1219,7 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( task.Status = verificationTaskMetadataMismatch } - // We’ve confirmed that the collection metadata (including indices and shard keys) + // We’ve confirmed that the collection metadata (including indices) // matches between soruce & destination. Now we can partition the collection. if task.Generation == 0 { From f434c5f63d2aecbd239e8cdc6be13e7d57877f0c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 27 Nov 2024 12:39:08 -0500 Subject: [PATCH 7/7] add assertion --- internal/verifier/check.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index d34eb799..836d098b 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -139,6 +139,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { err := eg.Wait() if succeeded { + if !errors.Is(err, context.Canceled) { + panic("success should mean that err is context.Canceled, not: " + err.Error()) + } + err = nil }