Skip to content

Commit bb72776

Browse files
committed
Revert "revert change_stream"
This reverts commit 77eaf87.
1 parent ca9d995 commit bb72776

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

internal/verifier/change_stream.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type ParsedEvent struct {
2525
}
2626

2727
func (pe *ParsedEvent) String() string {
28-
return fmt.Sprintf("{ OpType: %s, namespace: %s, docID: %v}", pe.OpType, pe.Ns, pe.DocKey.ID)
28+
return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime)
2929
}
3030

3131
// DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about
@@ -44,7 +44,7 @@ type UnknownEventError struct {
4444
}
4545

4646
func (uee UnknownEventError) Error() string {
47-
return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType)
47+
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
4848
}
4949

5050
// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
@@ -63,6 +63,12 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
6363
case "replace":
6464
fallthrough
6565
case "update":
66+
/*
67+
if err := verifier.generationEventRecorder.AddEvent(changeEvent); err != nil {
68+
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
69+
}
70+
*/
71+
6672
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
6773
default:
6874
return UnknownEventError{Event: changeEvent}
@@ -129,24 +135,19 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
129135
// source writes are ended. This means we should exit rather than continue
130136
// reading the change stream since there should be no more events.
131137
case <-verifier.changeStreamEnderChan:
132-
var gotEvent bool
133-
134138
changeStreamEnded = true
135139

136140
// Read all change events until the source reports no events.
137141
// (i.e., the `getMore` call returns empty)
138142
for {
143+
var gotEvent bool
139144
gotEvent, err = readOneChangeEvent()
140145

141146
if !gotEvent || err != nil {
142147
break
143148
}
144149
}
145150

146-
if err != nil {
147-
break
148-
}
149-
150151
default:
151152
_, err = readOneChangeEvent()
152153
}
@@ -156,7 +157,22 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
156157
}
157158

158159
if err != nil && !errors.Is(err, context.Canceled) {
159-
verifier.changeStreamErrChan <- err
160+
timeout := time.Minute
161+
timer := time.NewTimer(timeout)
162+
defer timer.Stop()
163+
164+
select {
165+
case <-timer.C:
166+
verifier.logger.Fatal().
167+
Err(err).
168+
Stringer("timeout", timeout).
169+
Msg("Failed to send change stream err within timeout.")
170+
case verifier.changeStreamErrChan <- err:
171+
}
172+
173+
if !changeStreamEnded {
174+
return
175+
}
160176
}
161177

162178
if changeStreamEnded {

0 commit comments

Comments
 (0)