@@ -42,15 +42,15 @@ const (
4242)
4343
4444type UnknownEventError struct {
45- Event * ParsedEvent
45+ RawEvent bson. Raw
4646}
4747
4848func (uee UnknownEventError ) Error () string {
49- return fmt .Sprintf ("Received event with unknown optype: %+v" , uee .Event )
49+ return fmt .Sprintf ("received event with unknown optype: %+v" , uee .RawEvent )
5050}
5151
5252// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
53- func (verifier * Verifier ) HandleChangeStreamEvents (ctx context.Context , batch []ParsedEvent ) error {
53+ func (verifier * Verifier ) HandleChangeStreamEvents (ctx context.Context , batch []bson. Raw ) error {
5454 if len (batch ) == 0 {
5555 return nil
5656 }
@@ -60,7 +60,13 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
6060 docIDs := make ([]interface {}, len (batch ))
6161 dataSizes := make ([]int , len (batch ))
6262
63- for i , changeEvent := range batch {
63+ for i , rawChangeEvent := range batch {
64+ var changeEvent ParsedEvent
65+ err := bson .Unmarshal (rawChangeEvent , & changeEvent )
66+ if err != nil {
67+ return errors .Wrapf (err , "failed to unmarshal change event (%+v) to %T" , rawChangeEvent , changeEvent )
68+ }
69+
6470 if changeEvent .ClusterTime != nil &&
6571 (verifier .lastChangeEventTime == nil ||
6672 verifier .lastChangeEventTime .Compare (* changeEvent .ClusterTime ) < 0 ) {
@@ -75,7 +81,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
7581 fallthrough
7682 case "update" :
7783 if err := verifier .eventRecorder .AddEvent (& changeEvent ); err != nil {
78- return errors .Wrapf (err , "failed to augment stats with change event: %+v" , changeEvent )
84+ return errors .Wrapf (err , "failed to augment stats with change event ( %+v) " , rawChangeEvent )
7985 }
8086 dbNames [i ] = changeEvent .Ns .DB
8187 collNames [i ] = changeEvent .Ns .Coll
@@ -90,7 +96,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
9096 dataSizes [i ] = len (changeEvent .FullDocument )
9197 }
9298 default :
93- return UnknownEventError {Event : & changeEvent }
99+ return UnknownEventError {RawEvent : rawChangeEvent }
94100 }
95101 }
96102
@@ -129,19 +135,19 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
129135func (verifier * Verifier ) readAndHandleOneChangeEventBatch (
130136 ctx context.Context ,
131137 cs * mongo.ChangeStream ,
132- finalTs * primitive.Timestamp ,
138+ writesOffTs * primitive.Timestamp ,
133139) error {
134140 eventsRead := 0
135141 var changeEventBatch []ParsedEvent
136142
137143 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
138- // Once the change stream reaches the final timestamp we should stop reading.
139- if finalTs != nil {
144+ // Once the change stream reaches the writesOff timestamp we should stop reading.
145+ if writesOffTs != nil {
140146 csTimestamp , err := extractTimestampFromResumeToken (cs .ResumeToken ())
141147 if err != nil {
142148 return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
143149 }
144- if ! csTimestamp .Before (* finalTs ) {
150+ if ! csTimestamp .Before (* writesOffTs ) {
145151 break
146152 }
147153 }
@@ -199,7 +205,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
199205
200206 for {
201207 var err error
202- var gotFinalTimestamp bool
208+ var gotwritesOffTimestamp bool
203209
204210 select {
205211
@@ -213,12 +219,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
213219 // If the changeStreamEnderChan has a message, the user has indicated that
214220 // source writes are ended. This means we should exit rather than continue
215221 // reading the change stream since there should be no more events.
216- case finalTs := <- verifier .changeStreamWritesOffTsChan :
222+ case writesOffTs := <- verifier .changeStreamWritesOffTsChan :
217223 verifier .logger .Debug ().
218- Interface ("finalTimestamp " , finalTs ).
219- Msg ("Change stream thread received final timestamp. Finalizing change stream." )
224+ Interface ("writesOffTimestamp " , writesOffTs ).
225+ Msg ("Change stream thread received writesOff timestamp. Finalizing change stream." )
220226
221- gotFinalTimestamp = true
227+ gotwritesOffTimestamp = true
222228
223229 // Read all change events until the source reports no events.
224230 // (i.e., the `getMore` call returns empty)
@@ -230,16 +236,16 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
230236 break
231237 }
232238
233- if curTs == finalTs || curTs .After (finalTs ) {
239+ if curTs == writesOffTs || curTs .After (writesOffTs ) {
234240 verifier .logger .Debug ().
235241 Interface ("currentTimestamp" , curTs ).
236- Interface ("finalTimestamp " , finalTs ).
237- Msg ("Change stream has reached the final timestamp. Shutting down." )
242+ Interface ("writesOffTimestamp " , writesOffTs ).
243+ Msg ("Change stream has reached the writesOff timestamp. Shutting down." )
238244
239245 break
240246 }
241247
242- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs , & finalTs )
248+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs , & writesOffTs )
243249
244250 if err != nil {
245251 break
@@ -261,12 +267,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
261267
262268 verifier .changeStreamErrChan <- err
263269
264- if ! gotFinalTimestamp {
270+ if ! gotwritesOffTimestamp {
265271 break
266272 }
267273 }
268274
269- if gotFinalTimestamp {
275+ if gotwritesOffTimestamp {
270276 verifier .mux .Lock ()
271277 verifier .changeStreamRunning = false
272278 if verifier .lastChangeEventTime != nil {
0 commit comments