Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)

// wait for generation 0 to end
verifierRunner.AwaitGenerationEnd()
suite.Require().NoError(verifierRunner.AwaitGenerationEnd())

const mvName = "Migration Verifier"

Expand Down Expand Up @@ -374,7 +374,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)

// wait for generation 0 to end
verifierRunner.AwaitGenerationEnd()
suite.Require().NoError(verifierRunner.AwaitGenerationEnd())

docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} })
_, err := coll.InsertMany(
Expand Down Expand Up @@ -424,7 +424,7 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)

// wait for generation 0 to end
verifierRunner.AwaitGenerationEnd()
suite.Require().NoError(verifierRunner.AwaitGenerationEnd())

db := suite.srcMongoClient.Database(suite.DBNameForTest())
coll := db.Collection("mycoll")
Expand Down
21 changes: 17 additions & 4 deletions internal/verifier/check_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package verifier
import (
"context"
"testing"

"github.com/pkg/errors"
)

type CheckRunner struct {
Expand Down Expand Up @@ -30,14 +32,25 @@ func RunVerifierCheck(ctx context.Context, t *testing.T, verifier *Verifier) *Ch
}

// AwaitGenerationEnd blocks until the check’s current generation ends.
func (cr *CheckRunner) AwaitGenerationEnd() {
<-cr.generationDoneChan
func (cr *CheckRunner) AwaitGenerationEnd() error {
select {
case <-cr.generationDoneChan:
return nil
case err := <-cr.checkDoneChan:
return errors.Wrap(err, "verifier failed while test awaited generation completion")
}
}

// StartNextGeneration blocks until it can tell the check to start
// the next generation.
func (cr *CheckRunner) StartNextGeneration() {
cr.doNextGenerationChan <- struct{}{}
func (cr *CheckRunner) StartNextGeneration() error {
select {
case cr.doNextGenerationChan <- struct{}{}:
return nil
case err := <-cr.checkDoneChan:
return errors.Wrap(err, "verifier failed while test waited to start next generation")
}

}

// Await will await generations and start new ones until the check
Expand Down
24 changes: 12 additions & 12 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,7 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() {
suite.Require().NoError(err)

runner := RunVerifierCheck(ctx, suite.T(), verifier)
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.AwaitGenerationEnd())

cursor, err := verifier.verificationTaskCollection().Find(
ctx,
Expand All @@ -1263,8 +1263,8 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() {
suite.Require().Equal(verificationTaskVerifyCollection, tasks[1].Type)
suite.Require().Equal(verificationTaskMetadataMismatch, tasks[1].Status)

runner.StartNextGeneration()
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.StartNextGeneration())
suite.Require().NoError(runner.AwaitGenerationEnd())

cursor, err = verifier.verificationTaskCollection().Find(
ctx,
Expand Down Expand Up @@ -1310,16 +1310,16 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
suite.T().Logf("TotalTasks is 0 (generation=%d); waiting %s then will run another generation …", verifier.generation, delay)

time.Sleep(delay)
runner.StartNextGeneration()
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.StartNextGeneration())
suite.Require().NoError(runner.AwaitGenerationEnd())
status, err = verifier.GetVerificationStatus()
suite.Require().NoError(err)
}
return status
}

// wait for one generation to finish
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.AwaitGenerationEnd())
status := waitForTasks()
suite.Require().Equal(VerificationStatus{TotalTasks: 2, FailedTasks: 1, CompletedTasks: 1}, *status)

Expand All @@ -1328,10 +1328,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
suite.Require().NoError(err)

// tell check to start the next generation
runner.StartNextGeneration()
suite.Require().NoError(runner.StartNextGeneration())

// wait for generation to finish
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.AwaitGenerationEnd())
status = waitForTasks()
// there should be no failures now, since they are are equivalent at this point in time
suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
Expand All @@ -1341,10 +1341,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
suite.Require().NoError(err)

// tell check to start the next generation
runner.StartNextGeneration()
suite.Require().NoError(runner.StartNextGeneration())

// wait for one generation to finish
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.AwaitGenerationEnd())
status = waitForTasks()

// there should be a failure from the src insert
Expand All @@ -1355,10 +1355,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
suite.Require().NoError(err)

// continue
runner.StartNextGeneration()
suite.Require().NoError(runner.StartNextGeneration())

// wait for it to finish again, this should be a clean run
runner.AwaitGenerationEnd()
suite.Require().NoError(runner.AwaitGenerationEnd())
status = waitForTasks()

// there should be no failures now, since they are are equivalent at this point in time
Expand Down