Skip to content

Commit c61f5c1

Browse files
committed
final timestsamp in change stream
1 parent a9c09a5 commit c61f5c1

File tree

4 files changed

+62
-29
lines changed

4 files changed

+62
-29
lines changed

internal/verifier/change_stream.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -160,15 +160,21 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
160160
eventsRead++
161161
}
162162

163-
if eventsRead > 0 {
164-
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
165-
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
166-
if err != nil {
167-
return false, errors.Wrap(err, "failed to handle change events")
168-
}
163+
if cs.Err() != nil {
164+
return false, errors.Wrap(cs.Err(), "change stream iteration failed")
165+
}
166+
167+
if eventsRead == 0 {
168+
return false, nil
169169
}
170170

171-
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
171+
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
172+
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
173+
if err != nil {
174+
return false, errors.Wrap(err, "failed to handle change events")
175+
}
176+
177+
return true, nil
172178
}
173179

174180
for {
@@ -184,15 +190,31 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
184190
// If the changeStreamEnderChan has a message, the user has indicated that
185191
// source writes are ended. This means we should exit rather than continue
186192
// reading the change stream since there should be no more events.
187-
case <-verifier.changeStreamEnderChan:
193+
case finalTs := <-verifier.changeStreamFinalTsChan:
188194
verifier.logger.Debug().
189-
Msg("Change stream thread received shutdown request.")
195+
Interface("finalTimestamp", finalTs).
196+
Msg("Change stream thread received final timestamp.")
190197

191198
changeStreamEnded = true
192199

193200
// Read all change events until the source reports no events.
194201
// (i.e., the `getMore` call returns empty)
195202
for {
203+
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
204+
if err != nil {
205+
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
206+
break
207+
}
208+
209+
if curTs.Compare(finalTs) >= 0 {
210+
verifier.logger.Debug().
211+
Interface("currentTimestamp", curTs).
212+
Interface("finalTimestamp", finalTs).
213+
Msg("Change stream has reached the final timestamp. Shutting down.")
214+
215+
break
216+
}
217+
196218
var gotEvent bool
197219
gotEvent, err = readAndHandleOneChangeEventBatch()
198220

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() {
141141
err = verifier.StartChangeStream(ctx)
142142
suite.Require().NoError(err)
143143
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
144-
verifier.changeStreamEnderChan <- struct{}{}
144+
verifier.changeStreamFinalTsChan <- *origStartTs
145145
<-verifier.changeStreamDoneChan
146146
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
147147
}
@@ -176,7 +176,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() {
176176
newStartTs := sess.OperationTime()
177177
suite.Require().NotNil(newStartTs)
178178
suite.Require().Negative(origStartTs.Compare(*newStartTs))
179-
verifier.changeStreamEnderChan <- struct{}{}
179+
verifier.changeStreamFinalTsChan <- *newStartTs
180180
<-verifier.changeStreamDoneChan
181181
suite.Require().Equal(verifier.srcStartAtTs, newStartTs)
182182
}

internal/verifier/check.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,24 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
4242
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
4343
}
4444

45-
func (verifier *Verifier) waitForChangeStream() error {
45+
func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
4646
verifier.mux.RLock()
4747
csRunning := verifier.changeStreamRunning
4848
verifier.mux.RUnlock()
4949
if csRunning {
5050
verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit")
51-
verifier.changeStreamEnderChan <- struct{}{}
51+
52+
finalTs, err := GetClusterTime(
53+
ctx,
54+
verifier.logger,
55+
verifier.srcClient,
56+
)
57+
58+
if err != nil {
59+
return errors.Wrapf(err, "failed to fetch source's cluster time")
60+
}
61+
62+
verifier.changeStreamFinalTsChan <- finalTs
5263
select {
5364
case err := <-verifier.changeStreamErrChan:
5465
verifier.logger.Warn().Err(err).
@@ -242,7 +253,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
242253
// It's necessary to wait for the change stream to finish before incrementing the
243254
// generation number, or the last changes will not be checked.
244255
verifier.mux.Unlock()
245-
err := verifier.waitForChangeStream()
256+
err := verifier.waitForChangeStream(ctx)
246257
if err != nil {
247258
return err
248259
}

internal/verifier/migration_verifier.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ type Verifier struct {
123123
metaDBName string
124124
srcStartAtTs *primitive.Timestamp
125125

126-
mux sync.RWMutex
127-
changeStreamRunning bool
128-
changeStreamEnderChan chan struct{}
129-
changeStreamErrChan chan error
130-
changeStreamDoneChan chan struct{}
131-
lastChangeEventTime *primitive.Timestamp
126+
mux sync.RWMutex
127+
changeStreamRunning bool
128+
changeStreamFinalTsChan chan primitive.Timestamp
129+
changeStreamErrChan chan error
130+
changeStreamDoneChan chan struct{}
131+
lastChangeEventTime *primitive.Timestamp
132132

133133
readConcernSetting ReadConcernSetting
134134

@@ -188,15 +188,15 @@ func NewVerifier(settings VerifierSettings) *Verifier {
188188
}
189189

190190
return &Verifier{
191-
phase: Idle,
192-
numWorkers: NumWorkers,
193-
readPreference: readpref.Primary(),
194-
partitionSizeInBytes: 400 * 1024 * 1024,
195-
failureDisplaySize: DefaultFailureDisplaySize,
196-
changeStreamEnderChan: make(chan struct{}),
197-
changeStreamErrChan: make(chan error),
198-
changeStreamDoneChan: make(chan struct{}),
199-
readConcernSetting: readConcern,
191+
phase: Idle,
192+
numWorkers: NumWorkers,
193+
readPreference: readpref.Primary(),
194+
partitionSizeInBytes: 400 * 1024 * 1024,
195+
failureDisplaySize: DefaultFailureDisplaySize,
196+
changeStreamFinalTsChan: make(chan primitive.Timestamp),
197+
changeStreamErrChan: make(chan error),
198+
changeStreamDoneChan: make(chan struct{}),
199+
readConcernSetting: readConcern,
200200

201201
// This will get recreated once gen0 starts, but we want it
202202
// here in case the change streams gets an event before then.

0 commit comments

Comments
 (0)