Skip to content

Commit a681dbf

Browse files
committed
fix tests
1 parent 9403332 commit a681dbf

File tree

3 files changed

+106
-80
lines changed

3 files changed

+106
-80
lines changed

internal/verifier/check.go

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
7676

7777
verifier.writeStringBuilder(genStartReport)
7878

79-
eg, ctx := errgroup.WithContext(ctxIn)
79+
cancelableCtx, canceler := context.WithCancelCause(ctxIn)
80+
eg, ctx := errgroup.WithContext(cancelableCtx)
8081

8182
// If the change stream fails, everything should stop.
8283
eg.Go(func() error {
@@ -102,6 +103,8 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
102103

103104
waitForTaskCreation := 0
104105

106+
succeededErr := errors.Errorf("generation %d finished", generation)
107+
105108
eg.Go(func() error {
106109
for {
107110
verificationStatus, err := verifier.GetVerificationStatus(ctx)
@@ -127,13 +130,18 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
127130
continue
128131
} else {
129132
verifier.PrintVerificationSummary(ctx, GenerationComplete)
133+
canceler(succeededErr)
130134
return nil
131135
}
132136
}
133137
})
134138

135139
err := eg.Wait()
136140

141+
if errors.Is(err, succeededErr) {
142+
err = nil
143+
}
144+
137145
if err != nil {
138146
verifier.logger.Debug().
139147
Int("generation", generation).
@@ -410,51 +418,46 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int) error {
410418
Msg("Worker finished.")
411419

412420
for {
413-
select {
414-
case <-ctx.Done():
415-
return ctx.Err()
416-
default:
417-
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
418-
if errors.Is(err, mongo.ErrNoDocuments) {
419-
duration := verifier.workerSleepDelayMillis * time.Millisecond
420-
421-
if duration > 0 {
422-
verifier.logger.Debug().
423-
Int("workerNum", workerNum).
424-
Stringer("duration", duration).
425-
Msg("No tasks found. Sleeping.")
426-
427-
time.Sleep(duration)
428-
}
421+
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
422+
if errors.Is(err, mongo.ErrNoDocuments) {
423+
duration := verifier.workerSleepDelayMillis * time.Millisecond
429424

430-
continue
431-
} else if err != nil {
432-
return errors.Wrap(
433-
err,
434-
"failed to seek next task",
435-
)
425+
if duration > 0 {
426+
verifier.logger.Debug().
427+
Int("workerNum", workerNum).
428+
Stringer("duration", duration).
429+
Msg("No tasks found. Sleeping.")
430+
431+
time.Sleep(duration)
436432
}
437433

438-
switch task.Type {
439-
case verificationTaskVerifyCollection:
440-
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
441-
if err != nil {
442-
return err
443-
}
444-
if task.Generation == 0 {
445-
newVal := verifier.gen0PendingCollectionTasks.Add(-1)
446-
if newVal == 0 {
447-
verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete)
448-
}
449-
}
450-
case verificationTaskVerifyDocuments:
451-
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
452-
if err != nil {
453-
return err
434+
continue
435+
} else if errors.Is(err, context.Canceled) {
436+
return nil
437+
} else if err != nil {
438+
return errors.Wrap(
439+
err,
440+
"failed to seek next task",
441+
)
442+
}
443+
444+
switch task.Type {
445+
case verificationTaskVerifyCollection:
446+
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
447+
if err != nil {
448+
return err
449+
}
450+
if task.Generation == 0 {
451+
newVal := verifier.gen0PendingCollectionTasks.Add(-1)
452+
if newVal == 0 {
453+
verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete)
454454
}
455455
}
456+
case verificationTaskVerifyDocuments:
457+
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
458+
if err != nil {
459+
return err
460+
}
456461
}
457462
}
458-
459-
return nil
460463
}

internal/verifier/migration_verifier.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -980,21 +980,6 @@ func (verifier *Verifier) ProcessCollectionVerificationTask(
980980
)
981981
}
982982

983-
func (verifier *Verifier) markCollectionFailed(workerNum int, task *VerificationTask, cluster string, namespace string, err error) {
984-
task.Status = verificationTaskFailed
985-
verifier.logger.Error().
986-
Int("workerNum", workerNum).
987-
Interface("task", task.PrimaryKey).
988-
Str("namespace", namespace).
989-
Err(err).
990-
Msg("Failed to read collection metadata.")
991-
992-
task.FailedDocs = append(task.FailedDocs, VerificationResult{
993-
NameSpace: namespace,
994-
Cluster: cluster,
995-
Details: Failed + fmt.Sprintf(" %v", err)})
996-
}
997-
998983
func getIndexesMap(
999984
ctx context.Context,
1000985
coll *mongo.Collection,

internal/verifier/migration_verifier_test.go

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() {
671671
QueryFilter: QueryFilter{
672672
Namespace: "testDb.sameView",
673673
To: "testDb.sameView"}}
674-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
674+
suite.Require().NoError(
675+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
676+
)
675677
suite.Equal(verificationTaskCompleted, task.Status)
676678
suite.Nil(task.FailedDocs)
677679

@@ -685,7 +687,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() {
685687
QueryFilter: QueryFilter{
686688
Namespace: "testDb.wrongColl",
687689
To: "testDb.wrongColl"}}
688-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
690+
suite.Require().NoError(
691+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
692+
)
689693
suite.Equal(verificationTaskFailed, task.Status)
690694
if suite.Equal(1, len(task.FailedDocs)) {
691695
suite.Equal(task.FailedDocs[0].Field, "Options.viewOn")
@@ -703,7 +707,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() {
703707
QueryFilter: QueryFilter{
704708
Namespace: "testDb.wrongPipeline",
705709
To: "testDb.wrongPipeline"}}
706-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
710+
suite.Require().NoError(
711+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
712+
)
707713
suite.Equal(verificationTaskFailed, task.Status)
708714
if suite.Equal(1, len(task.FailedDocs)) {
709715
suite.Equal(task.FailedDocs[0].Field, "Options.pipeline")
@@ -726,7 +732,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() {
726732
QueryFilter: QueryFilter{
727733
Namespace: "testDb.missingOptionsSrc",
728734
To: "testDb.missingOptionsSrc"}}
729-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
735+
suite.Require().NoError(
736+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
737+
)
730738
suite.Equal(verificationTaskFailed, task.Status)
731739
if suite.Equal(1, len(task.FailedDocs)) {
732740
suite.Equal(task.FailedDocs[0].Field, "Options.collation")
@@ -744,7 +752,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() {
744752
QueryFilter: QueryFilter{
745753
Namespace: "testDb.missingOptionsDst",
746754
To: "testDb.missingOptionsDst"}}
747-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
755+
suite.Require().NoError(
756+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
757+
)
748758
suite.Equal(verificationTaskFailed, task.Status)
749759
if suite.Equal(1, len(task.FailedDocs)) {
750760
suite.Equal(task.FailedDocs[0].Field, "Options.collation")
@@ -762,7 +772,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareViews() {
762772
QueryFilter: QueryFilter{
763773
Namespace: "testDb.differentOptions",
764774
To: "testDb.differentOptions"}}
765-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
775+
suite.Require().NoError(
776+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
777+
)
766778
suite.Equal(verificationTaskFailed, task.Status)
767779
if suite.Equal(1, len(task.FailedDocs)) {
768780
suite.Equal(task.FailedDocs[0].Field, "Options.collation")
@@ -783,7 +795,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
783795
QueryFilter: QueryFilter{
784796
Namespace: "testDb.testColl",
785797
To: "testDb.testColl"}}
786-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
798+
suite.Require().NoError(
799+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
800+
)
787801
suite.Equal(verificationTaskFailed, task.Status)
788802
suite.Equal(1, len(task.FailedDocs))
789803
suite.Equal(task.FailedDocs[0].Details, Missing)
@@ -798,7 +812,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
798812
QueryFilter: QueryFilter{
799813
Namespace: "testDb.testColl",
800814
To: "testDb.testCollTo"}}
801-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
815+
suite.Require().NoError(
816+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
817+
)
802818
suite.Equal(verificationTaskFailed, task.Status)
803819
suite.Equal(1, len(task.FailedDocs))
804820
suite.Equal(task.FailedDocs[0].Details, Missing)
@@ -813,7 +829,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
813829
QueryFilter: QueryFilter{
814830
Namespace: "testDb.destOnlyColl",
815831
To: "testDb.destOnlyColl"}}
816-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
832+
suite.Require().NoError(
833+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
834+
)
817835
suite.Equal(verificationTaskFailed, task.Status)
818836
suite.Equal(1, len(task.FailedDocs))
819837
suite.Equal(task.FailedDocs[0].Details, Missing)
@@ -830,7 +848,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
830848
QueryFilter: QueryFilter{
831849
Namespace: "testDb.viewOnSrc",
832850
To: "testDb.viewOnSrc"}}
833-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
851+
suite.Require().NoError(
852+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
853+
)
834854
suite.Equal(verificationTaskFailed, task.Status)
835855
suite.Equal(1, len(task.FailedDocs))
836856
suite.Equal(task.FailedDocs[0].Field, "Type")
@@ -847,7 +867,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
847867
QueryFilter: QueryFilter{
848868
Namespace: "testDb.cappedOnDst",
849869
To: "testDb.cappedOnDst"}}
850-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
870+
suite.Require().NoError(
871+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
872+
)
851873
suite.Equal(verificationTaskFailed, task.Status)
852874
// Capped and size should differ
853875
var wrongFields []string
@@ -864,7 +886,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
864886
QueryFilter: QueryFilter{
865887
Namespace: "testDb.testColl",
866888
To: "testDb.testColl"}}
867-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
889+
suite.Require().NoError(
890+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
891+
)
868892
suite.Equal(verificationTaskCompleted, task.Status)
869893

870894
// Neither collection exists success case
@@ -873,7 +897,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareMetadata() {
873897
QueryFilter: QueryFilter{
874898
Namespace: "testDb.testCollDNE",
875899
To: "testDb.testCollDNE"}}
876-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
900+
suite.Require().NoError(
901+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
902+
)
877903
suite.Equal(verificationTaskCompleted, task.Status)
878904
}
879905

@@ -900,7 +926,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() {
900926
To: "testDb.testColl1",
901927
},
902928
}
903-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
929+
suite.Require().NoError(
930+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
931+
)
904932
suite.Equal(verificationTaskMetadataMismatch, task.Status)
905933
if suite.Equal(1, len(task.FailedDocs)) {
906934
suite.Equal(srcIndexNames[1], task.FailedDocs[0].ID)
@@ -926,7 +954,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() {
926954
QueryFilter: QueryFilter{
927955
Namespace: "testDb.testColl2",
928956
To: "testDb.testColl2"}}
929-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
957+
suite.Require().NoError(
958+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
959+
)
930960
suite.Equal(verificationTaskMetadataMismatch, task.Status)
931961
if suite.Equal(1, len(task.FailedDocs)) {
932962
suite.Equal(dstIndexNames[1], task.FailedDocs[0].ID)
@@ -952,7 +982,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() {
952982
QueryFilter: QueryFilter{
953983
Namespace: "testDb.testColl3",
954984
To: "testDb.testColl3"}}
955-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
985+
suite.Require().NoError(
986+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
987+
)
956988
suite.Equal(verificationTaskMetadataMismatch, task.Status)
957989
if suite.Equal(2, len(task.FailedDocs)) {
958990
sort.Slice(task.FailedDocs, func(i, j int) bool {
@@ -987,7 +1019,9 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() {
9871019
QueryFilter: QueryFilter{
9881020
Namespace: "testDb.testColl4",
9891021
To: "testDb.testColl4"}}
990-
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task)
1022+
suite.Require().NoError(
1023+
verifier.verifyMetadataAndPartitionCollection(ctx, 1, task),
1024+
)
9911025
suite.Equal(verificationTaskMetadataMismatch, task.Status)
9921026
if suite.Equal(1, len(task.FailedDocs)) {
9931027
suite.Equal("wrong", task.FailedDocs[0].ID)
@@ -1281,16 +1315,19 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() {
12811315
}
12821316

12831317
func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
1318+
dbname1 := suite.DBNameForTest("1")
1319+
dbname2 := suite.DBNameForTest("2")
1320+
12841321
zerolog.SetGlobalLevel(zerolog.DebugLevel)
12851322
verifier := suite.BuildVerifier()
1286-
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
1287-
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
1323+
verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"})
1324+
verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"})
12881325
verifier.SetNamespaceMap()
12891326

12901327
ctx := suite.Context()
12911328

1292-
srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1")
1293-
dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3")
1329+
srcColl := suite.srcMongoClient.Database(dbname1).Collection("testColl1")
1330+
dstColl := suite.dstMongoClient.Database(dbname2).Collection("testColl3")
12941331
_, err := srcColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42})
12951332
suite.Require().NoError(err)
12961333
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 2, "x": 43})
@@ -1548,6 +1585,7 @@ func (suite *IntegrationTestSuite) TestBackgroundInIndexSpec() {
15481585

15491586
func (suite *IntegrationTestSuite) TestPartitionWithFilter() {
15501587
zerolog.SetGlobalLevel(zerolog.DebugLevel)
1588+
dbname := suite.DBNameForTest()
15511589

15521590
ctx := suite.Context()
15531591

@@ -1557,14 +1595,14 @@ func (suite *IntegrationTestSuite) TestPartitionWithFilter() {
15571595

15581596
// Set up the verifier for testing.
15591597
verifier := suite.BuildVerifier()
1560-
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
1598+
verifier.SetSrcNamespaces([]string{dbname + ".testColl1"})
15611599
verifier.SetNamespaceMap()
15621600
verifier.globalFilter = filter
15631601
// Use a small partition size so that we can test creating multiple partitions.
15641602
verifier.partitionSizeInBytes = 30
15651603

15661604
// Insert documents into the source.
1567-
srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1")
1605+
srcColl := suite.srcMongoClient.Database(dbname).Collection("testColl1")
15681606

15691607
// 30 documents with _ids [0, 30) are in the filter.
15701608
for i := 0; i < 30; i++ {
@@ -1579,7 +1617,7 @@ func (suite *IntegrationTestSuite) TestPartitionWithFilter() {
15791617
}
15801618

15811619
// Create partitions with the filter.
1582-
partitions, _, _, _, err := verifier.partitionAndInspectNamespace(ctx, "testDb1.testColl1")
1620+
partitions, _, _, _, err := verifier.partitionAndInspectNamespace(ctx, dbname+".testColl1")
15831621
suite.Require().NoError(err)
15841622

15851623
// Check that each partition have bounds in the filter.

0 commit comments

Comments
 (0)