Skip to content

Commit a9cb0c1

Browse files
committed
Update change_stream.go
1 parent 2d95686 commit a9cb0c1

File tree

1 file changed

+11
-11
lines changed

1 file changed

+11
-11
lines changed

internal/verifier/change_stream.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
328328
return ctx.Err()
329329

330330
// If the ChangeStreamEnderChan has a message, the user has indicated that
331-
// source writes are ended. This means we should exit rather than continue
331+
// source and destination writes are ended. This means we should exit rather than continue
332332
// reading the change stream since there should be no more events.
333333
case writesOffTs := <-csr.ChangeStreamWritesOffTsChan:
334334
csr.logger.Debug().
@@ -337,7 +337,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
337337

338338
gotwritesOffTimestamp = true
339339

340-
// Read all change events until the source reports no events.
340+
// Read all change events until the source / destination reports no events.
341341
// (i.e., the `getMore` call returns empty)
342342
for {
343343
var curTs primitive.Timestamp
@@ -454,7 +454,7 @@ func (csr *ChangeStreamReader) createChangeStream(
454454
return nil, primitive.Timestamp{}, err
455455
}
456456

457-
startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
457+
startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken())
458458
if err != nil {
459459
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
460460
}
@@ -471,11 +471,11 @@ func (csr *ChangeStreamReader) createChangeStream(
471471
startTs = clusterTime
472472
}
473473

474-
return srcChangeStream, startTs, nil
474+
return changeStream, startTs, nil
475475
}
476476

477477
// StartChangeStream starts the change stream.
478-
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
478+
func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
479479
// This channel holds the first change stream creation's result, whether
480480
// success or failure. Rather than using a Result we could make separate
481481
// Timestamp and error channels, but the single channel is cleaner since
@@ -490,9 +490,9 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
490490

491491
err := retryer.RunForTransientErrorsOnly(
492492
ctx,
493-
verifier.logger,
493+
csr.logger,
494494
func(ri *retry.Info) error {
495-
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
495+
changeStream, startTs, err := csr.createChangeStream(ctx)
496496
if err != nil {
497497
if parentThreadWaiting {
498498
initialCreateResultChan <- mo.Err[primitive.Timestamp](err)
@@ -502,23 +502,23 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
502502
return err
503503
}
504504

505-
defer srcChangeStream.Close(ctx)
505+
defer changeStream.Close(ctx)
506506

507507
if parentThreadWaiting {
508508
initialCreateResultChan <- mo.Ok(startTs)
509509
close(initialCreateResultChan)
510510
parentThreadWaiting = false
511511
}
512512

513-
return verifier.iterateChangeStream(ctx, ri, srcChangeStream)
513+
return csr.iterateChangeStream(ctx, ri, changeStream)
514514
},
515515
)
516516

517517
if err != nil {
518518
// NB: This failure always happens after the initial change stream
519519
// creation.
520-
verifier.changeStreamErrChan <- err
521-
close(verifier.changeStreamErrChan)
520+
csr.ChangeStreamErrChan <- err
521+
close(csr.ChangeStreamErrChan)
522522
}
523523
}()
524524

0 commit comments

Comments
 (0)