Skip to content

Commit 56ab306

Browse files
committed
reshuffle a bit for clarity
1 parent a85badf commit 56ab306

File tree

1 file changed

+36
-32
lines changed

1 file changed

+36
-32
lines changed

internal/verifier/change_stream.go

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -366,56 +366,60 @@ func (verifier *Verifier) createChangeStream(
366366

367367
// StartChangeStream starts the change stream.
368368
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
369-
// Rather than Result we could have separate Timestamp and error channels,
370-
// but the single channel seems simpler/cleaner.
371-
resultChan := make(chan mo.Result[primitive.Timestamp])
369+
// This channel holds the first change stream creation's result, whether
370+
// success or failure. Rather than using a Result we could make separate
371+
// Timestamp and error channels, but the single channel is cleaner since
372+
// there's no chance of "nonsense" like both channels returning a payload.
373+
initialCreateResultChan := make(chan mo.Result[primitive.Timestamp])
372374

373375
go func() {
374376
retryer := retry.New(retry.DefaultDurationLimit)
375377
retryer = retryer.WithErrorCodes(util.CursorKilled)
376378

377379
parentThreadWaiting := true
378380

379-
err := retryer.
380-
RunForTransientErrorsOnly(
381-
ctx,
382-
verifier.logger,
383-
func(ri *retry.Info) error {
384-
sess, err := verifier.srcClient.StartSession()
385-
if err != nil {
386-
return errors.Wrap(err, "failed to start change stream session")
387-
}
381+
err := retryer.RunForTransientErrorsOnly(
382+
ctx,
383+
verifier.logger,
384+
func(ri *retry.Info) error {
385+
sess, err := verifier.srcClient.StartSession()
386+
if err != nil {
387+
return errors.Wrap(err, "failed to start change stream session")
388+
}
388389

389-
sctx := mongo.NewSessionContext(ctx, sess)
390+
sctx := mongo.NewSessionContext(ctx, sess)
390391

391-
srcChangeStream, startTs, err := verifier.createChangeStream(sctx)
392-
if err != nil {
393-
return err
392+
srcChangeStream, startTs, err := verifier.createChangeStream(sctx)
393+
if err != nil {
394+
if parentThreadWaiting {
395+
initialCreateResultChan <- mo.Err[primitive.Timestamp](err)
396+
return nil
394397
}
395398

396-
defer srcChangeStream.Close(ctx)
399+
return err
400+
}
397401

398-
if parentThreadWaiting {
399-
resultChan <- mo.Ok(startTs)
400-
close(resultChan)
401-
parentThreadWaiting = false
402-
}
402+
defer srcChangeStream.Close(ctx)
403+
404+
if parentThreadWaiting {
405+
initialCreateResultChan <- mo.Ok(startTs)
406+
close(initialCreateResultChan)
407+
parentThreadWaiting = false
408+
}
403409

404-
return verifier.iterateChangeStream(sctx, ri, srcChangeStream)
405-
},
406-
)
410+
return verifier.iterateChangeStream(sctx, ri, srcChangeStream)
411+
},
412+
)
407413

408414
if err != nil {
409-
if parentThreadWaiting {
410-
resultChan <- mo.Err[primitive.Timestamp](err)
411-
} else {
412-
verifier.changeStreamErrChan <- err
413-
close(verifier.changeStreamErrChan)
414-
}
415+
// NB: This failure always happens after the initial change stream
416+
// creation.
417+
verifier.changeStreamErrChan <- err
418+
close(verifier.changeStreamErrChan)
415419
}
416420
}()
417421

418-
result := <-resultChan
422+
result := <-initialCreateResultChan
419423

420424
startTs, err := result.Get()
421425
if err != nil {

0 commit comments

Comments
 (0)