Skip to content

Commit fc00f0f

Browse files
committed
remove test
1 parent 725bc7c commit fc00f0f

File tree

6 files changed

+29
-43
lines changed

6 files changed

+29
-43
lines changed

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
213213
// If the changeStreamEnderChan has a message, the user has indicated that
214214
// source writes are ended. This means we should exit rather than continue
215215
// reading the change stream since there should be no more events.
216-
case finalTs := <-verifier.changeStreamFinalTsChan:
216+
case finalTs := <-verifier.changeStreamWritesOffTsChan:
217217
verifier.logger.Debug().
218218
Interface("finalTimestamp", finalTs).
219219
Msg("Change stream thread received final timestamp. Finalizing change stream.")

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
142142
err = verifier.StartChangeStream(ctx)
143143
suite.Require().NoError(err)
144144
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
145-
verifier.changeStreamFinalTsChan <- *origStartTs
145+
verifier.changeStreamWritesOffTsChan <- *origStartTs
146146
<-verifier.changeStreamDoneChan
147147
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
148148
}
@@ -184,7 +184,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
184184
"session time after events should exceed the original",
185185
)
186186

187-
verifier.changeStreamFinalTsChan <- *postEventsSessionTime
187+
verifier.changeStreamWritesOffTsChan <- *postEventsSessionTime
188188
<-verifier.changeStreamDoneChan
189189

190190
suite.Assert().Equal(

internal/verifier/check.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ var failedStatus = mapset.NewSet(
2525
verificationTaskMetadataMismatch,
2626
)
2727

28-
var verificationStatusCheckInterval time.Duration = 15 * time.Second
29-
3028
// Check is the asynchronous entry point to Check, should only be called by the web server. Use
3129
// CheckDriver directly for synchronous run.
3230
// testChan is a pair of channels for coordinating generations in tests.
@@ -110,7 +108,7 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
110108
//wait for task to be created, if none of the tasks found.
111109
if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 || verificationStatus.RecheckTasks > 0 {
112110
waitForTaskCreation++
113-
time.Sleep(verificationStatusCheckInterval)
111+
time.Sleep(verifier.verificationStatusCheckInterval)
114112
} else {
115113
verifier.PrintVerificationSummary(ctx, GenerationComplete)
116114
verifier.logger.Debug().Msg("Verification tasks complete")

internal/verifier/integration_test_suite.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package verifier
33
import (
44
"context"
55
"strings"
6+
"time"
67

78
mapset "github.com/deckarep/golang-set/v2"
89
"github.com/pkg/errors"
@@ -166,6 +167,8 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
166167
verifier.SetGenerationPauseDelayMillis(0)
167168
verifier.SetWorkerSleepDelayMillis(0)
168169

170+
verifier.verificationStatusCheckInterval = 10 * time.Millisecond
171+
169172
ctx := suite.Context()
170173

171174
suite.Require().NoError(

internal/verifier/migration_verifier.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,13 @@ type Verifier struct {
121121
metaDBName string
122122
srcStartAtTs *primitive.Timestamp
123123

124-
mux sync.RWMutex
125-
changeStreamRunning bool
126-
changeStreamFinalTsChan chan primitive.Timestamp
127-
changeStreamErrChan chan error
128-
changeStreamDoneChan chan struct{}
129-
lastChangeEventTime *primitive.Timestamp
130-
writesOffTimestamp *primitive.Timestamp
124+
mux sync.RWMutex
125+
changeStreamRunning bool
126+
changeStreamWritesOffTsChan chan primitive.Timestamp
127+
changeStreamErrChan chan error
128+
changeStreamDoneChan chan struct{}
129+
lastChangeEventTime *primitive.Timestamp
130+
writesOffTimestamp *primitive.Timestamp
131131

132132
readConcernSetting ReadConcernSetting
133133

@@ -137,6 +137,8 @@ type Verifier struct {
137137
globalFilter map[string]any
138138

139139
pprofInterval time.Duration
140+
141+
verificationStatusCheckInterval time.Duration
140142
}
141143

142144
// VerificationStatus holds the Verification Status
@@ -187,19 +189,21 @@ func NewVerifier(settings VerifierSettings) *Verifier {
187189
}
188190

189191
return &Verifier{
190-
phase: Idle,
191-
numWorkers: NumWorkers,
192-
readPreference: readpref.Primary(),
193-
partitionSizeInBytes: 400 * 1024 * 1024,
194-
failureDisplaySize: DefaultFailureDisplaySize,
195-
changeStreamFinalTsChan: make(chan primitive.Timestamp),
196-
changeStreamErrChan: make(chan error),
197-
changeStreamDoneChan: make(chan struct{}),
198-
readConcernSetting: readConcern,
192+
phase: Idle,
193+
numWorkers: NumWorkers,
194+
readPreference: readpref.Primary(),
195+
partitionSizeInBytes: 400 * 1024 * 1024,
196+
failureDisplaySize: DefaultFailureDisplaySize,
197+
changeStreamWritesOffTsChan: make(chan primitive.Timestamp),
198+
changeStreamErrChan: make(chan error),
199+
changeStreamDoneChan: make(chan struct{}),
200+
readConcernSetting: readConcern,
199201

200202
// This will get recreated once gen0 starts, but we want it
201203
// here in case the change streams gets an event before then.
202204
eventRecorder: NewEventRecorder(),
205+
206+
verificationStatusCheckInterval: 15 * time.Second,
203207
}
204208
}
205209

@@ -250,7 +254,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
250254

251255
verifier.writesOffTimestamp = &finalTs
252256

253-
verifier.changeStreamFinalTsChan <- finalTs
257+
verifier.changeStreamWritesOffTsChan <- finalTs
254258
}
255259

256260
return nil

internal/verifier/migration_verifier_test.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,26 +1378,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
13781378
status = waitForTasks()
13791379

13801380
// there should be no failures now, since they are are equivalent at this point in time
1381-
suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
1382-
1383-
// turn writes off
1384-
suite.Require().NoError(verifier.WritesOff(ctx))
1385-
1386-
// now write to the source, this should not be seen by the change stream which should have ended
1387-
// because of the calls to WritesOff
1388-
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 1019, "x": 1019})
1389-
suite.Require().NoError(err)
1390-
checkContinueChan <- struct{}{}
1391-
<-checkDoneChan
1392-
1393-
status, err = verifier.GetVerificationStatus()
1394-
suite.Require().NoError(err)
1395-
1396-
// there should be a no more tasks
1397-
suite.Assert().Equal(VerificationStatus{}, *status)
1398-
1399-
checkContinueChan <- struct{}{}
1400-
require.NoError(suite.T(), errGroup.Wait())
1381+
suite.Assert().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
14011382
}
14021383

14031384
func (suite *IntegrationTestSuite) TestVerifierWithFilter() {

0 commit comments

Comments
 (0)