Skip to content

Commit ebc391f

Browse files
committed
check runner
1 parent d87dc0f commit ebc391f

File tree

4 files changed

+78
-48
lines changed

4 files changed

+78
-48
lines changed

internal/verifier/change_stream_test.go

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -246,26 +246,17 @@ func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
246246

247247
verifier := suite.BuildVerifier()
248248

249-
checkDoneChan := make(chan struct{})
250-
checkContinueChan := make(chan struct{})
251-
252249
db := suite.srcMongoClient.Database(suite.DBNameForTest())
253250
coll := db.Collection("mycoll")
254251
suite.Require().NoError(
255252
db.CreateCollection(ctx, coll.Name()),
256253
)
257254

258255
// start verifier
259-
verifierDoneChan := make(chan struct{})
260-
go func() {
261-
err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan)
262-
suite.Require().NoError(err)
256+
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
263257

264-
close(verifierDoneChan)
265-
}()
266-
267-
// wait for generation 1
268-
<-checkDoneChan
258+
// wait for generation 0 to end
259+
verifierRunner.AwaitGenerationEnd()
269260

270261
docsCount := 10_000
271262
docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} })
@@ -277,15 +268,7 @@ func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
277268

278269
suite.Require().NoError(verifier.WritesOff(ctx))
279270

280-
verifierDone := false
281-
for !verifierDone {
282-
select {
283-
case <-verifierDoneChan:
284-
verifierDone = true
285-
case <-checkDoneChan:
286-
case checkContinueChan <- struct{}{}:
287-
}
288-
}
271+
suite.Require().NoError(verifierRunner.Await())
289272

290273
generation := verifier.generation
291274
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(

internal/verifier/check_runner.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package verifier
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
type CheckRunner struct {
9+
checkDoneChan chan error
10+
generationDoneChan chan struct{}
11+
doNextGenerationChan chan struct{}
12+
}
13+
14+
func RunVerifierCheck(ctx context.Context, t *testing.T, verifier *Verifier) *CheckRunner {
15+
verifierDoneChan := make(chan error)
16+
17+
generationDoneChan := make(chan struct{})
18+
doNextGenerationChan := make(chan struct{})
19+
20+
go func() {
21+
err := verifier.CheckDriver(ctx, nil, generationDoneChan, doNextGenerationChan)
22+
verifierDoneChan <- err
23+
}()
24+
25+
return &CheckRunner{
26+
checkDoneChan: verifierDoneChan,
27+
generationDoneChan: generationDoneChan,
28+
doNextGenerationChan: doNextGenerationChan,
29+
}
30+
}
31+
32+
// AwaitGenerationEnd blocks until the check’s current generation ends.
33+
func (cr *CheckRunner) AwaitGenerationEnd() {
34+
<-cr.generationDoneChan
35+
}
36+
37+
// StartNextGeneration blocks until it can tell the check to start
38+
// the next generation.
39+
func (cr *CheckRunner) StartNextGeneration() {
40+
cr.doNextGenerationChan <- struct{}{}
41+
}
42+
43+
// Await will await generations and start new ones until the check
44+
// finishes. It returns the error that verifier.CheckDriver() returns.
45+
func (cr *CheckRunner) Await() error {
46+
for {
47+
select {
48+
case err := <-cr.checkDoneChan:
49+
return err
50+
51+
case <-cr.generationDoneChan:
52+
case cr.doNextGenerationChan <- struct{}{}:
53+
}
54+
}
55+
}

internal/verifier/migration_verifier.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
254254
verifier.writesOffTimestamp = &finalTs
255255

256256
verifier.mux.Unlock()
257+
258+
// This has to happen under the lock because the change stream
259+
// might be inserting docs into the recheck queue, which happens
260+
// under the lock.
257261
verifier.changeStreamWritesOffTsChan <- finalTs
258262
} else {
259263
verifier.mux.Unlock()

internal/verifier/migration_verifier_test.go

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/10gen/migration-verifier/internal/testutil"
2121
"github.com/cespare/permute/v2"
2222
"github.com/rs/zerolog"
23-
"github.com/rs/zerolog/log"
2423
"github.com/samber/lo"
2524
"github.com/stretchr/testify/assert"
2625
"github.com/stretchr/testify/require"
@@ -29,7 +28,6 @@ import (
2928
"go.mongodb.org/mongo-driver/bson/primitive"
3029
"go.mongodb.org/mongo-driver/mongo"
3130
"go.mongodb.org/mongo-driver/mongo/options"
32-
"golang.org/x/sync/errgroup"
3331
)
3432

3533
func TestIntegration(t *testing.T) {
@@ -1292,7 +1290,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
12921290
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
12931291
verifier.SetNamespaceMap()
12941292

1295-
ctx := context.Background()
1293+
ctx := suite.Context()
12961294

12971295
srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1")
12981296
dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3")
@@ -1303,18 +1301,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13031301
_, err = dstColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42})
13041302
suite.Require().NoError(err)
13051303

1306-
checkDoneChan := make(chan struct{})
1307-
checkContinueChan := make(chan struct{})
1308-
1309-
errGroup, errGrpCtx := errgroup.WithContext(context.Background())
1310-
errGroup.Go(func() error {
1311-
checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan)
1312-
// Log this as fatal error so that the test doesn't hang.
1313-
if checkDriverErr != nil {
1314-
log.Fatal().Err(checkDriverErr).Msg("check driver error")
1315-
}
1316-
return checkDriverErr
1317-
})
1304+
runner := RunVerifierCheck(ctx, suite.T(), verifier)
13181305

13191306
waitForTasks := func() *VerificationStatus {
13201307
status, err := verifier.GetVerificationStatus()
@@ -1326,16 +1313,16 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13261313
suite.T().Logf("TotalTasks is 0 (generation=%d); waiting %s then will run another generation …", verifier.generation, delay)
13271314

13281315
time.Sleep(delay)
1329-
checkContinueChan <- struct{}{}
1330-
<-checkDoneChan
1316+
runner.StartNextGeneration()
1317+
runner.AwaitGenerationEnd()
13311318
status, err = verifier.GetVerificationStatus()
13321319
suite.Require().NoError(err)
13331320
}
13341321
return status
13351322
}
13361323

13371324
// wait for one generation to finish
1338-
<-checkDoneChan
1325+
runner.AwaitGenerationEnd()
13391326
status := waitForTasks()
13401327
suite.Require().Equal(VerificationStatus{TotalTasks: 2, FailedTasks: 1, CompletedTasks: 1}, *status)
13411328

@@ -1344,10 +1331,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13441331
suite.Require().NoError(err)
13451332

13461333
// tell check to start the next generation
1347-
checkContinueChan <- struct{}{}
1334+
runner.StartNextGeneration()
13481335

13491336
// wait for generation to finish
1350-
<-checkDoneChan
1337+
runner.AwaitGenerationEnd()
13511338
status = waitForTasks()
13521339
// there should be no failures now, since they are are equivalent at this point in time
13531340
suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
@@ -1357,10 +1344,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13571344
suite.Require().NoError(err)
13581345

13591346
// tell check to start the next generation
1360-
checkContinueChan <- struct{}{}
1347+
runner.StartNextGeneration()
13611348

13621349
// wait for one generation to finish
1363-
<-checkDoneChan
1350+
runner.AwaitGenerationEnd()
13641351
status = waitForTasks()
13651352

13661353
// there should be a failure from the src insert
@@ -1371,19 +1358,20 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13711358
suite.Require().NoError(err)
13721359

13731360
// continue
1374-
checkContinueChan <- struct{}{}
1361+
runner.StartNextGeneration()
13751362

13761363
// wait for it to finish again, this should be a clean run
1377-
<-checkDoneChan
1364+
runner.AwaitGenerationEnd()
13781365
status = waitForTasks()
13791366

13801367
// there should be no failures now, since they are are equivalent at this point in time
13811368
suite.Assert().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
13821369

1370+
// We could just abandon this verifier, but we might as well shut it down
1371+
// gracefully. That prevents a spurious error in the log from “drop”
1372+
// change events.
13831373
suite.Require().NoError(verifier.WritesOff(ctx))
1384-
1385-
checkContinueChan <- struct{}{}
1386-
require.NoError(suite.T(), errGroup.Wait())
1374+
suite.Require().NoError(runner.Await())
13871375
}
13881376

13891377
func (suite *IntegrationTestSuite) TestVerifierWithFilter() {

0 commit comments

Comments
 (0)