Skip to content

Commit 1499c2e

Browse files
committed
fix test
1 parent 6742a59 commit 1499c2e

File tree

4 files changed

+72
-28
lines changed

4 files changed

+72
-28
lines changed

internal/verifier/change_stream.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
104104
// GetChangeStreamFilter returns an aggregation pipeline that filters
105105
// namespaces as per configuration.
106106
//
107+
// Note that this omits verifier.globalFilter because we still need to
108+
// recheck any out-filter documents that may have changed in order to
109+
// account for filter traversals (i.e., updates that change whether a
110+
// document matches the filter).
111+
//
107112
// NB: Ideally we could make the change stream give $bsonSize(fullDocument)
108113
// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still
109114
// want to verify migrations from 4.2. fullDocument is unlikely to be a
@@ -144,14 +149,20 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
144149
var changeEventBatch []ParsedEvent
145150

146151
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
147-
verifier.logger.Info().Msg("reading event")
148152
gotEvent := cs.TryNext(ctx)
149-
verifier.logger.Info().Err(cs.Err()).Msgf("got event? %v", gotEvent)
150153

151-
if !gotEvent || cs.Err() != nil {
154+
if cs.Err() != nil {
155+
return errors.Wrap(cs.Err(), "change stream iteration failed")
156+
}
157+
158+
if !gotEvent {
152159
break
153160
}
154161

162+
verifier.logger.Debug().
163+
Stringer("resumeToken", cs.ResumeToken()).
164+
Msg("TryNext got an event!")
165+
155166
if changeEventBatch == nil {
156167
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
157168
}
@@ -160,20 +171,13 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
160171
return errors.Wrap(err, "failed to decode change event")
161172
}
162173

163-
verifier.logger.Debug().Interface("event", changeEventBatch[eventsRead]).Msg("Got event.")
164-
165174
eventsRead++
166175
}
167176

168-
if cs.Err() != nil {
169-
return errors.Wrap(cs.Err(), "change stream iteration failed")
170-
}
171-
172177
if eventsRead == 0 {
173178
return nil
174179
}
175180

176-
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
177181
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
178182
if err != nil {
179183
return errors.Wrap(err, "failed to handle change events")
@@ -302,7 +306,9 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
302306
Msg("Failed to extract timestamp from persisted resume token.")
303307
}
304308

305-
logEvent.Msg("Starting change stream from persisted resume token.")
309+
logEvent.
310+
Interface("pipeline", pipeline).
311+
Msg("Starting change stream from persisted resume token.")
306312

307313
opts = opts.SetStartAfter(savedResumeToken)
308314
} else {

internal/verifier/check.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
7777
func (verifier *Verifier) CheckWorker(ctx context.Context) error {
7878
verifier.logger.Debug().Msgf("Starting %d verification workers", verifier.numWorkers)
7979
ctx, cancel := context.WithCancel(ctx)
80+
81+
if verifier.workerSleepDelayMillis == 0 {
82+
verifier.logger.Info().
83+
Msg("Worker sleep delay is zero. Only tests should do this.")
84+
}
85+
8086
wg := sync.WaitGroup{}
8187
for i := 0; i < verifier.numWorkers; i++ {
8288
wg.Add(1)
@@ -185,7 +191,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
185191
verifier.mux.RLock()
186192
csRunning := verifier.changeStreamRunning
187193
verifier.mux.RUnlock()
188-
if !csRunning {
194+
if csRunning {
195+
verifier.logger.Debug().Msg("Check: Change stream already running.")
196+
} else {
189197
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
190198

191199
err = verifier.StartChangeStream(ctx)
@@ -399,12 +407,15 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait
399407
if errors.Is(err, mongo.ErrNoDocuments) {
400408
duration := verifier.workerSleepDelayMillis * time.Millisecond
401409

402-
verifier.logger.Debug().
403-
Int("workerNum", workerNum).
404-
Stringer("duration", duration).
405-
Msg("No tasks found. Sleeping.")
410+
if duration > 0 {
411+
verifier.logger.Debug().
412+
Int("workerNum", workerNum).
413+
Stringer("duration", duration).
414+
Msg("No tasks found. Sleeping.")
415+
416+
time.Sleep(duration)
417+
}
406418

407-
time.Sleep(duration)
408419
continue
409420
} else if err != nil {
410421
panic(err)

internal/verifier/integration_test_suite.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,19 @@ func (suite *IntegrationTestSuite) SetupTest() {
9494
dbname,
9595
)
9696

97+
for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient} {
98+
dbNames, err := client.ListDatabaseNames(ctx, bson.D{})
99+
suite.Require().NoError(err, "should list database names")
100+
for _, dbName := range dbNames {
101+
if strings.Index(dbName, suite.DBNameForTest()) == 0 {
102+
suite.T().Logf("Dropping database %#q because it seems to be left over from an earlier run of this test.", dbName)
103+
suite.Require().NoError(client.Database(dbName).Drop(ctx))
104+
}
105+
106+
suite.initialDbNames.Add(dbName)
107+
}
108+
}
109+
97110
suite.testContext, suite.contextCanceller = ctx, canceller
98111
}
99112

internal/verifier/migration_verifier_test.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"regexp"
1515
"sort"
1616
"testing"
17+
"time"
1718

1819
"github.com/10gen/migration-verifier/internal/partitions"
1920
"github.com/10gen/migration-verifier/internal/testutil"
@@ -1236,10 +1237,13 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() {
12361237
suite.ElementsMatch([]string{"testDb1.testColl1", "testDb1.testColl2", "testDb1.testView1"}, verifier.dstNamespaces)
12371238

12381239
// Collections in admin, config, and local should not be found
1239-
err = suite.srcMongoClient.Database("local").CreateCollection(ctx, "islocalSrc")
1240-
suite.Require().NoError(err)
1241-
err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "islocalDest")
1242-
suite.Require().NoError(err)
1240+
if suite.GetSrcTopology() != TopologySharded {
1241+
err = suite.srcMongoClient.Database("local").CreateCollection(ctx, "islocalSrc")
1242+
suite.Require().NoError(err)
1243+
err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "islocalDest")
1244+
suite.Require().NoError(err)
1245+
}
1246+
12431247
err = suite.srcMongoClient.Database("admin").CreateCollection(ctx, "isAdminSrc")
12441248
suite.Require().NoError(err)
12451249
err = suite.dstMongoClient.Database("admin").CreateCollection(ctx, "isAdminDest")
@@ -1388,19 +1392,22 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13881392
func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
13891393
zerolog.SetGlobalLevel(zerolog.DebugLevel)
13901394

1391-
filter := map[string]any{"inFilter": map[string]any{"$ne": false}}
1395+
dbname1 := suite.DBNameForTest("1")
1396+
dbname2 := suite.DBNameForTest("2")
1397+
1398+
filter := bson.M{"inFilter": bson.M{"$ne": false}}
13921399
verifier := suite.BuildVerifier()
1393-
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
1394-
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
1400+
verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"})
1401+
verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"})
13951402
verifier.SetNamespaceMap()
13961403
verifier.SetIgnoreBSONFieldOrder(true)
13971404
// Set this value low to test the verifier with multiple partitions.
13981405
verifier.partitionSizeInBytes = 50
13991406

14001407
ctx := suite.Context()
14011408

1402-
srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1")
1403-
dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3")
1409+
srcColl := suite.srcMongoClient.Database(dbname1).Collection("testColl1")
1410+
dstColl := suite.dstMongoClient.Database(dbname2).Collection("testColl3")
14041411

14051412
// Documents with _id in [0, 100) should match.
14061413
var docs []interface{}
@@ -1432,7 +1439,11 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
14321439
suite.Require().NoError(err)
14331440

14341441
for status.TotalTasks == 0 && verifier.generation < 10 {
1435-
suite.T().Logf("TotalTasks is 0 (generation=%d); waiting another generation …", verifier.generation)
1442+
delay := time.Second
1443+
1444+
suite.T().Logf("TotalTasks is 0 (generation=%d); waiting %s then will run another generation …", verifier.generation, delay)
1445+
1446+
time.Sleep(delay)
14361447
checkContinueChan <- struct{}{}
14371448
<-checkDoneChan
14381449
status, err = verifier.GetVerificationStatus()
@@ -1449,6 +1460,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
14491460
suite.Require().Equal(status.FailedTasks, 0)
14501461

14511462
// Insert another document that is not in the filter.
1463+
// This should trigger a recheck despite being outside the filter.
14521464
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 200, "x": 200, "inFilter": false})
14531465
suite.Require().NoError(err)
14541466

@@ -1457,11 +1469,13 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
14571469

14581470
// Wait for generation to finish.
14591471
<-checkDoneChan
1472+
14601473
status = waitForTasks()
1474+
14611475
// There should be no failures, since the inserted document is not in the filter.
14621476
suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
14631477

1464-
// Now insert in the source, this should come up next generation.
1478+
// Now insert in the source. This should come up next generation.
14651479
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 201, "x": 201, "inFilter": true})
14661480
suite.Require().NoError(err)
14671481

0 commit comments

Comments
 (0)