Skip to content

Commit 1348a60

Browse files
committed
REP-5318 Make change stream restartable.
This entails a small refactor of the change stream code so that the change stream’s creation and iteration both happen under a retryer.
1 parent 4d04316 commit 1348a60

31 files changed

+3474
-39
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/pkg/errors v0.9.1
1313
github.com/rs/zerolog v1.28.0
1414
github.com/samber/lo v1.47.0
15+
github.com/samber/mo v1.13.0
1516
github.com/stretchr/testify v1.8.0
1617
github.com/urfave/cli v1.22.9
1718
go.mongodb.org/mongo-driver v1.17.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
8484
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
8585
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
8686
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
87+
github.com/samber/mo v1.13.0 h1:LB1OwfJMju3a6FjghH+AIvzMG0ZPOzgTWj1qaHs1IQ4=
88+
github.com/samber/mo v1.13.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs=
8789
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
8890
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8991
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=

internal/util/error.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
// `ErrorCode` newtype, but that requires a more invasive change to everything
2121
// that uses error codes.
2222
const (
23-
LockFailed int = 107
24-
SampleTooManyDuplicates int = 28799
23+
LockFailed = 107
24+
SampleTooManyDuplicates = 28799
25+
CursorKilled = 237
2526
)
2627

2728
//

internal/verifier/change_stream.go

Lines changed: 84 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ import (
66
"time"
77

88
"github.com/10gen/migration-verifier/internal/keystring"
9+
"github.com/10gen/migration-verifier/internal/retry"
10+
"github.com/10gen/migration-verifier/internal/util"
911
"github.com/pkg/errors"
1012
"github.com/rs/zerolog"
13+
"github.com/samber/mo"
1114
"go.mongodb.org/mongo-driver/bson"
1215
"go.mongodb.org/mongo-driver/bson/primitive"
1316
"go.mongodb.org/mongo-driver/mongo"
@@ -63,7 +66,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
6366
for i, changeEvent := range batch {
6467
if changeEvent.ClusterTime != nil &&
6568
(verifier.lastChangeEventTime == nil ||
66-
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
69+
verifier.lastChangeEventTime.Before(*changeEvent.ClusterTime)) {
6770
verifier.lastChangeEventTime = changeEvent.ClusterTime
6871
}
6972
switch changeEvent.OpType {
@@ -175,9 +178,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
175178
return nil
176179
}
177180

178-
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
179-
defer cs.Close(ctx)
180-
181+
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) error {
181182
var lastPersistedTime time.Time
182183

183184
persistResumeTokenIfNeeded := func() error {
@@ -201,10 +202,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
201202

202203
// If the context is canceled, return immmediately.
203204
case <-ctx.Done():
204-
verifier.logger.Debug().
205-
Err(ctx.Err()).
206-
Msg("Change stream quitting.")
207-
return
205+
return ctx.Err()
208206

209207
// If the changeStreamEnderChan has a message, the user has indicated that
210208
// source writes are ended. This means we should exit rather than continue
@@ -222,10 +220,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
222220
var curTs primitive.Timestamp
223221
curTs, err = extractTimestampFromResumeToken(cs.ResumeToken())
224222
if err != nil {
225-
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
226-
break
223+
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
227224
}
228225

226+
// writesOffTs never refers to a real event,
227+
// so we can stop once curTs >= writesOffTs.
229228
if !curTs.Before(writesOffTs) {
230229
verifier.logger.Debug().
231230
Interface("currentTimestamp", curTs).
@@ -238,7 +237,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
238237
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
239238

240239
if err != nil {
241-
break
240+
return err
242241
}
243242
}
244243

@@ -248,17 +247,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
248247
if err == nil {
249248
err = persistResumeTokenIfNeeded()
250249
}
251-
}
252-
253-
if err != nil && !errors.Is(err, context.Canceled) {
254-
verifier.logger.Debug().
255-
Err(err).
256-
Msg("Sending change stream error.")
257250

258-
verifier.changeStreamErrChan <- err
259-
260-
if !gotwritesOffTimestamp {
261-
break
251+
if err != nil {
252+
return err
262253
}
263254
}
264255

@@ -284,18 +275,21 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
284275
}
285276

286277
infoLog.Msg("Change stream is done.")
278+
279+
return nil
287280
}
288281

289-
// StartChangeStream starts the change stream.
290-
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
282+
func (verifier *Verifier) createChangeStream(
283+
ctx context.Context,
284+
) (*mongo.ChangeStream, primitive.Timestamp, error) {
291285
pipeline := verifier.GetChangeStreamFilter()
292286
opts := options.ChangeStream().
293287
SetMaxAwaitTime(1 * time.Second).
294288
SetFullDocument(options.UpdateLookup)
295289

296290
savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
297291
if err != nil {
298-
return errors.Wrap(err, "failed to load persisted change stream resume token")
292+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
299293
}
300294

301295
csStartLogEvent := verifier.logger.Info()
@@ -322,40 +316,95 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
322316

323317
sess, err := verifier.srcClient.StartSession()
324318
if err != nil {
325-
return errors.Wrap(err, "failed to start session")
319+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
326320
}
327321
sctx := mongo.NewSessionContext(ctx, sess)
328322
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
329323
if err != nil {
330-
return errors.Wrap(err, "failed to open change stream")
324+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
331325
}
332326

333327
err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
334328
if err != nil {
335-
return err
329+
return nil, primitive.Timestamp{}, err
336330
}
337331

338-
csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
332+
startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
339333
if err != nil {
340-
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
334+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
341335
}
342336

337+
// With sharded clusters the resume token might lead the cluster time
338+
// by 1 increment. In that case we need the actual cluster time;
339+
// otherwise we will get errors.
343340
clusterTime, err := getClusterTimeFromSession(sess)
344341
if err != nil {
345-
return errors.Wrap(err, "failed to read cluster time from session")
342+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
346343
}
347344

348-
verifier.srcStartAtTs = &csTimestamp
349-
if csTimestamp.After(clusterTime) {
350-
verifier.srcStartAtTs = &clusterTime
345+
if startTs.After(clusterTime) {
346+
startTs = clusterTime
351347
}
352348

349+
return srcChangeStream, startTs, nil
350+
}
351+
352+
// StartChangeStream starts the change stream.
353+
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
354+
// Result seems a bit simpler than messing with 2 separate channels.
355+
resultChan := make(chan mo.Result[primitive.Timestamp])
356+
357+
go func() {
358+
retryer := retry.New(retry.DefaultDurationLimit)
359+
retryer = retryer.WithErrorCodes(util.CursorKilled)
360+
361+
parentThreadWaiting := true
362+
363+
err := retryer.
364+
RunForTransientErrorsOnly(
365+
ctx,
366+
verifier.logger,
367+
func(i *retry.Info) error {
368+
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
369+
if err != nil {
370+
return err
371+
}
372+
373+
defer srcChangeStream.Close(ctx)
374+
375+
if parentThreadWaiting {
376+
resultChan <- mo.Ok(startTs)
377+
close(resultChan)
378+
parentThreadWaiting = false
379+
}
380+
381+
return verifier.iterateChangeStream(ctx, srcChangeStream)
382+
},
383+
)
384+
385+
if err != nil {
386+
if parentThreadWaiting {
387+
resultChan <- mo.Err[primitive.Timestamp](err)
388+
} else {
389+
verifier.changeStreamErrChan <- err
390+
close(verifier.changeStreamErrChan)
391+
}
392+
}
393+
}()
394+
395+
result := <-resultChan
396+
397+
startTs, err := result.Get()
398+
if err != nil {
399+
return err
400+
}
401+
402+
verifier.srcStartAtTs = &startTs
403+
353404
verifier.mux.Lock()
354405
verifier.changeStreamRunning = true
355406
verifier.mux.Unlock()
356407

357-
go verifier.iterateChangeStream(ctx, srcChangeStream)
358-
359408
return nil
360409
}
361410

internal/verifier/change_stream_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"go.mongodb.org/mongo-driver/bson"
1212
"go.mongodb.org/mongo-driver/bson/primitive"
1313
"go.mongodb.org/mongo-driver/mongo"
14+
"go.mongodb.org/mongo-driver/mongo/options"
15+
"go.mongodb.org/mongo-driver/mongo/readconcern"
1416
)
1517

1618
func TestChangeStreamFilter(t *testing.T) {
@@ -247,6 +249,92 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
247249
)
248250
}
249251

252+
func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
253+
ctx := suite.Context()
254+
255+
verifier := suite.BuildVerifier()
256+
257+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
258+
coll := db.Collection("mycoll")
259+
suite.Require().NoError(
260+
db.CreateCollection(ctx, coll.Name()),
261+
)
262+
263+
// start verifier
264+
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
265+
266+
// wait for generation 0 to end
267+
verifierRunner.AwaitGenerationEnd()
268+
269+
const mvName = "Migration Verifier"
270+
271+
// Kill verifier’s change stream.
272+
cursor, err := suite.srcMongoClient.Database(
273+
"admin",
274+
options.Database().SetReadConcern(readconcern.Local()),
275+
).Aggregate(
276+
ctx,
277+
mongo.Pipeline{
278+
{
279+
{"$currentOp", bson.D{
280+
{"idleCursors", true},
281+
}},
282+
},
283+
{
284+
{"$match", bson.D{
285+
{"clientMetadata.application.name", mvName},
286+
{"command.collection", "$cmd.aggregate"},
287+
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
288+
bson.D{{"$type", "object"}},
289+
},
290+
}},
291+
},
292+
},
293+
)
294+
suite.Require().NoError(err)
295+
296+
var ops []bson.Raw
297+
suite.Require().NoError(cursor.All(ctx, &ops))
298+
299+
for _, cursorRaw := range ops {
300+
opId, err := cursorRaw.LookupErr("opid")
301+
suite.Require().NoError(err, "should get opid from op")
302+
303+
suite.T().Logf("Killing change stream op %+v", opId)
304+
305+
suite.Require().NoError(
306+
suite.srcMongoClient.Database("admin").RunCommand(
307+
ctx,
308+
bson.D{
309+
{"killOp", 1},
310+
{"op", opId},
311+
},
312+
).Err(),
313+
)
314+
}
315+
316+
_, err = coll.InsertOne(
317+
ctx,
318+
bson.D{{"_id", "after kill"}},
319+
)
320+
suite.Require().NoError(err)
321+
322+
suite.Require().NoError(verifier.WritesOff(ctx))
323+
324+
suite.Require().NoError(verifierRunner.Await())
325+
326+
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
327+
ctx,
328+
verifier.verificationTaskCollection(),
329+
verificationTaskVerifyDocuments,
330+
verifier.generation,
331+
)
332+
suite.Require().NoError(err)
333+
334+
suite.Assert().Zero(incompleteTasks, "no incomplete tasks")
335+
suite.Require().Len(failedTasks, 1, "expect one failed task")
336+
}
337+
250338
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
251339
suite.testInsertsBeforeWritesOff(10_000)
252340
}

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
8787
select {
8888
case err := <-verifier.changeStreamErrChan:
8989
cancel()
90-
return err
90+
return errors.Wrap(err, "change stream failed")
9191
case <-ctx.Done():
9292
cancel()
9393
return nil

internal/verifier/migration_verifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
255255

256256
verifier.mux.Unlock()
257257

258-
// This has to happen under the lock because the change stream
258+
// This has to happen outside the lock because the change stream
259259
// might be inserting docs into the recheck queue, which happens
260260
// under the lock.
261261
verifier.changeStreamWritesOffTsChan <- finalTs

0 commit comments

Comments
 (0)