@@ -44,11 +44,10 @@ type DocKey struct {
4444
4545const (
4646 minChangeStreamPersistInterval = time .Second * 10
47+ maxChangeStreamAwaitTime = time .Second
4748 metadataChangeStreamCollectionName = "changeStream"
4849)
4950
50- var maxChangeStreamAwaitTime = time .Second
51-
5251type UnknownEventError struct {
5352 Event * ParsedEvent
5453}
@@ -269,6 +268,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
269268 eventsRead := 0
270269 var changeEventBatch []ParsedEvent
271270
271+ latestEvent := option .None [ParsedEvent ]()
272+
272273 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
273274 gotEvent := cs .TryNext (ctx )
274275
@@ -294,10 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
294295 if changeEventBatch [eventsRead ].ClusterTime != nil &&
295296 (csr .lastChangeEventTime == nil ||
296297 csr .lastChangeEventTime .Before (* changeEventBatch [eventsRead ].ClusterTime )) {
298+
297299 csr .lastChangeEventTime = changeEventBatch [eventsRead ].ClusterTime
298- csr .logger .Trace ().
299- Interface ("event" , changeEventBatch [eventsRead ]).
300- Msg ("Updated lastChangeEventTime." )
300+ latestEvent = option .Some (changeEventBatch [eventsRead ])
301301 }
302302
303303 eventsRead ++
@@ -309,6 +309,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
309309 return nil
310310 }
311311
312+ if event , has := latestEvent .Get (); has {
313+ csr .logger .Trace ().
314+ Interface ("event" , event ).
315+ Msg ("Updated lastChangeEventTime." )
316+ }
317+
312318 var curTs primitive.Timestamp
313319 curTs , err := extractTimestampFromResumeToken (cs .ResumeToken ())
314320 if err == nil {
0 commit comments