@@ -3,7 +3,6 @@ package verifier
33import (
44 "context"
55 "fmt"
6- "golang.org/x/sync/errgroup"
76 "time"
87
98 "github.com/10gen/migration-verifier/internal/keystring"
@@ -17,6 +16,7 @@ import (
1716 "go.mongodb.org/mongo-driver/bson/primitive"
1817 "go.mongodb.org/mongo-driver/mongo"
1918 "go.mongodb.org/mongo-driver/mongo/options"
19+ "golang.org/x/sync/errgroup"
2020)
2121
2222const fauxDocSizeForDeleteEvents = 1024
@@ -83,6 +83,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8383 watcherClient : verifier .srcClient ,
8484 buildInfo : * verifier .srcBuildInfo ,
8585 changeStreamRunning : false ,
86+ ChangeEventBatchChan : make (chan []ParsedEvent ),
8687 ChangeStreamWritesOffTsChan : make (chan primitive.Timestamp ),
8788 ChangeStreamErrChan : make (chan error ),
8889 ChangeStreamDoneChan : make (chan struct {}),
@@ -95,6 +96,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
9596 watcherClient : verifier .dstClient ,
9697 buildInfo : * verifier .dstBuildInfo ,
9798 changeStreamRunning : false ,
99+ ChangeEventBatchChan : make (chan []ParsedEvent ),
98100 ChangeStreamWritesOffTsChan : make (chan primitive.Timestamp ),
99101 ChangeStreamErrChan : make (chan error ),
100102 ChangeStreamDoneChan : make (chan struct {}),
@@ -111,7 +113,7 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
111113 return ctx .Err ()
112114 case batch , more := <- reader .ChangeEventBatchChan :
113115 if ! more {
114- // Change stream reader has closed the event batch channel because it has finished.
116+ verifier . logger . Debug (). Msgf ( " Change Event Batch Channel has been closed by %s, returning..." , reader )
115117 return nil
116118 }
117119 verifier .logger .Trace ().Msgf ("Verifier is handling a change event batch from %s: %v" , reader , batch )
@@ -279,6 +281,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
279281 }
280282
281283 csr .logger .Trace ().Msgf ("%s received a change event: %v" , csr , changeEventBatch [eventsRead ])
284+ fmt .Printf ("%d %d\n " , changeEventBatch [eventsRead ].ClusterTime .T , changeEventBatch [eventsRead ].ClusterTime .I )
282285
283286 if changeEventBatch [eventsRead ].ClusterTime != nil &&
284287 (csr .lastChangeEventTime == nil ||
@@ -306,8 +309,6 @@ func (csr *ChangeStreamReader) iterateChangeStream(
306309) error {
307310 var lastPersistedTime time.Time
308311
309- defer close (csr .ChangeEventBatchChan )
310-
311312 persistResumeTokenIfNeeded := func () error {
312313 if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
313314 return nil
@@ -357,7 +358,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
357358 csr .logger .Debug ().
358359 Interface ("currentTimestamp" , curTs ).
359360 Interface ("writesOffTimestamp" , writesOffTs ).
360- Msg ( "Change stream has reached the writesOff timestamp. Shutting down." )
361+ Msgf ( "%s has reached the writesOff timestamp. Shutting down.", csr )
361362
362363 break
363364 }
@@ -487,12 +488,11 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
487488 // there's no chance of "nonsense" like both channels returning a payload.
488489 initialCreateResultChan := make (chan mo.Result [primitive.Timestamp ])
489490
490- // Change stream reader closes ChangeEventBatchChan each time after
491- // finish reading change events.
492- // It needs to be re-initialized when a change stream starts.
493- csr .ChangeEventBatchChan = make (chan []ParsedEvent )
494-
495491 go func () {
492+ // Closing ChangeEventBatchChan at the end of change stream goroutine
493+ // notifies the verifier's change event handler to exit.
494+ defer close (csr .ChangeEventBatchChan )
495+
496496 retryer := retry .New (retry .DefaultDurationLimit )
497497 retryer = retryer .WithErrorCodes (util .CursorKilled )
498498
0 commit comments