@@ -42,16 +42,15 @@ const (
4242)
4343
4444type UnknownEventError struct {
45- Event ParsedEvent
46- RawEvent bson.Raw
45+ Event ParsedEvent
4746}
4847
4948func (uee UnknownEventError ) Error () string {
50- return fmt .Sprintf ("received event with unknown optype: %+v" , uee .RawEvent )
49+ return fmt .Sprintf ("received event with unknown optype: %+v" , uee .Event )
5150}
5251
5352// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
54- func (verifier * Verifier ) HandleChangeStreamEvents (ctx context.Context , batch []bson. Raw ) error {
53+ func (verifier * Verifier ) HandleChangeStreamEvents (ctx context.Context , batch []ParsedEvent ) error {
5554 if len (batch ) == 0 {
5655 return nil
5756 }
@@ -61,13 +60,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
6160 docIDs := make ([]interface {}, len (batch ))
6261 dataSizes := make ([]int , len (batch ))
6362
64- for i , rawChangeEvent := range batch {
65- var changeEvent ParsedEvent
66- err := bson .Unmarshal (rawChangeEvent , & changeEvent )
67- if err != nil {
68- return errors .Wrapf (err , "failed to unmarshal change event %d of %d (%+v) to %T" , 1 + i , len (batch ), rawChangeEvent , changeEvent )
69- }
70-
63+ for i , changeEvent := range batch {
7164 if changeEvent .ClusterTime != nil &&
7265 (verifier .lastChangeEventTime == nil ||
7366 verifier .lastChangeEventTime .Compare (* changeEvent .ClusterTime ) < 0 ) {
@@ -82,7 +75,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
8275 fallthrough
8376 case "update" :
8477 if err := verifier .eventRecorder .AddEvent (& changeEvent ); err != nil {
85- return errors .Wrapf (err , "failed to augment stats with change event (%+v)" , rawChangeEvent )
78+ return errors .Wrapf (err , "failed to augment stats with change event (%+v)" , changeEvent )
8679 }
8780 dbNames [i ] = changeEvent .Ns .DB
8881 collNames [i ] = changeEvent .Ns .Coll
@@ -97,10 +90,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
9790 dataSizes [i ] = len (changeEvent .FullDocument )
9891 }
9992 default :
100- return UnknownEventError {
101- Event : changeEvent ,
102- RawEvent : rawChangeEvent ,
103- }
93+ return UnknownEventError {Event : changeEvent }
10494 }
10595 }
10696
@@ -139,11 +129,23 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
139129func (verifier * Verifier ) readAndHandleOneChangeEventBatch (
140130 ctx context.Context ,
141131 cs * mongo.ChangeStream ,
132+ writesOffTs * primitive.Timestamp ,
142133) error {
143134 eventsRead := 0
144- var changeEventBatch []bson. Raw
135+ var changeEventBatch []ParsedEvent
145136
146137 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
138+ // Once the change stream reaches the writesOff timestamp we should stop reading.
139+ if writesOffTs != nil {
140+ csTimestamp , err := extractTimestampFromResumeToken (cs .ResumeToken ())
141+ if err != nil {
142+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
143+ }
144+ if ! csTimestamp .Before (* writesOffTs ) {
145+ break
146+ }
147+ }
148+
147149 gotEvent := cs .TryNext (ctx )
148150
149151 if cs .Err () != nil {
@@ -155,12 +157,9 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
155157 }
156158
157159 if changeEventBatch == nil {
158- changeEventBatch = make ([]bson. Raw , cs .RemainingBatchLength ()+ 1 )
160+ changeEventBatch = make ([]ParsedEvent , cs .RemainingBatchLength ()+ 1 )
159161 }
160162
161- fmt .Printf ("\n ===== event: %+v\n " , cs .Current )
162-
163- // NB: Decode() achieves a deep-clone of cs.Current.
164163 if err := cs .Decode (& changeEventBatch [eventsRead ]); err != nil {
165164 return errors .Wrap (err , "failed to decode change event" )
166165 }
@@ -240,15 +239,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
240239 break
241240 }
242241
243- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
242+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs , & writesOffTs )
244243
245244 if err != nil {
246245 break
247246 }
248247 }
249248
250249 default :
251- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
250+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs , nil )
252251
253252 if err == nil {
254253 err = persistResumeTokenIfNeeded ()
0 commit comments