Skip to content

Commit 47f1aad

Browse files
committed
more diag
1 parent 9a2990c commit 47f1aad

File tree

6 files changed

+23
-12
lines changed

6 files changed

+23
-12
lines changed

internal/verifier/change_stream.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
6363
case "replace":
6464
fallthrough
6565
case "update":
66-
if err := verifier.generationEventRecorder.AddEvent(changeEvent); err != nil {
66+
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
6767
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
6868
}
6969

@@ -171,12 +171,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
171171
verifier.logger.Fatal().
172172
Err(err).
173173
Stringer("timeout", timeout).
174-
Msg("Failed to send change stream err within timeout.")
174+
Msg("Failed to send change stream error within timeout.")
175175
case verifier.changeStreamErrChan <- err:
176176
}
177177

178178
if !changeStreamEnded {
179-
return
179+
break
180180
}
181181
}
182182

@@ -191,8 +191,8 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
191191
// finished the change stream changes so that Recheck can continue.
192192
verifier.changeStreamDoneChan <- struct{}{}
193193
// since the changeStream is exhausted, we now return
194-
verifier.logger.Debug().Msg("Change stream is done")
195-
return
194+
verifier.logger.Debug().Msg("Change stream is done.")
195+
break
196196
}
197197
}
198198
}

internal/verifier/check.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ func (verifier *Verifier) waitForChangeStream() error {
5151
verifier.changeStreamEnderChan <- struct{}{}
5252
select {
5353
case err := <-verifier.changeStreamErrChan:
54+
verifier.logger.Warn().Err(err).Msg("Received error from change stream.")
5455
return err
5556
case <-verifier.changeStreamDoneChan:
57+
verifier.logger.Debug().Msg("Received completion signal from change stream.")
5658
break
5759
}
5860
}
@@ -182,16 +184,23 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
182184
// Now enter the multi-generational steady check state
183185
for {
184186
verifier.generationStartTime = time.Now()
187+
verifier.eventRecorder.Reset()
185188

186189
err := verifier.CheckWorker(ctx)
187190
if err != nil {
188191
return err
189192
}
190193
// we will only coordinate when the number of channels is exactly 2.
191-
// * Channel 0 signals a generation is done
194+
// * Channel 0 signals a generation boundary
192195
// * Channel 1 signals to check to continue the next generation
193196
if len(testChan) == 2 {
197+
198+
verifier.logger.Debug().
199+
Msg("Telling test about generation boundary.")
194200
testChan[0] <- struct{}{}
201+
202+
verifier.logger.Debug().
203+
Msg("Awaiting test's signal to continue.")
195204
<-testChan[1]
196205
}
197206
time.Sleep(verifier.generationPauseDelayMillis * time.Millisecond)

internal/verifier/migration_verifier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ type Verifier struct {
9595
numWorkers int
9696
failureDisplaySize int64
9797

98-
generationEventRecorder *EventRecorder
98+
eventRecorder *EventRecorder
9999

100100
// Used only with generation 0 to defer the first
101101
// progress report until after we’ve finished partitioning
@@ -202,7 +202,7 @@ func NewVerifier(settings VerifierSettings) *Verifier {
202202

203203
// This will get recreated once gen0 starts, but we want it
204204
// here in case the change streams gets an event before then.
205-
generationEventRecorder: NewEventRecorder(),
205+
eventRecorder: NewEventRecorder(),
206206
}
207207
}
208208

internal/verifier/migration_verifier_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,7 +1461,6 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
14611461
suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)
14621462
}
14631463

1464-
/*
14651464
func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {
14661465
zerolog.SetGlobalLevel(zerolog.DebugLevel)
14671466

@@ -1571,8 +1570,11 @@ func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {
15711570

15721571
// Turn writes off.
15731572
verifier.WritesOff(ctx)
1573+
1574+
// Tell CheckDriver to do one more pass. This should terminate the change stream.
1575+
checkContinueChan <- struct{}{}
1576+
<-checkDoneChan
15741577
}
1575-
*/
15761578

15771579
func (suite *MultiDataVersionTestSuite) TestPartitionWithFilter() {
15781580
zerolog.SetGlobalLevel(zerolog.DebugLevel)

internal/verifier/recheck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, chang
5656
verifier.mux.Lock()
5757
defer verifier.mux.Unlock()
5858

59-
if err := verifier.generationEventRecorder.AddEvent(changeEvent); err != nil {
59+
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
6060
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
6161
}
6262

internal/verifier/summary.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (verifier *Verifier) printMismatchInvestigationNotes(strBuilder *strings.Bu
369369
}
370370

371371
func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
372-
nsStats := verifier.generationEventRecorder.Read()
372+
nsStats := verifier.eventRecorder.Read()
373373

374374
activeNamespacesCount := len(nsStats)
375375

0 commit comments

Comments
 (0)