Skip to content

Commit 77eaf87

Browse files
committed
revert change_stream
1 parent 0f52df5 commit 77eaf87

File tree

1 file changed

+9
-25
lines changed

1 file changed

+9
-25
lines changed

internal/verifier/change_stream.go

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type ParsedEvent struct {
2525
}
2626

2727
func (pe *ParsedEvent) String() string {
28-
return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime)
28+
return fmt.Sprintf("{ OpType: %s, namespace: %s, docID: %v}", pe.OpType, pe.Ns, pe.DocKey.ID)
2929
}
3030

3131
// DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about
@@ -44,7 +44,7 @@ type UnknownEventError struct {
4444
}
4545

4646
func (uee UnknownEventError) Error() string {
47-
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
47+
return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType)
4848
}
4949

5050
// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
@@ -63,12 +63,6 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
6363
case "replace":
6464
fallthrough
6565
case "update":
66-
/*
67-
if err := verifier.generationEventRecorder.AddEvent(changeEvent); err != nil {
68-
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
69-
}
70-
*/
71-
7266
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
7367
default:
7468
return UnknownEventError{Event: changeEvent}
@@ -135,19 +129,24 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
135129
// source writes are ended. This means we should exit rather than continue
136130
// reading the change stream since there should be no more events.
137131
case <-verifier.changeStreamEnderChan:
132+
var gotEvent bool
133+
138134
changeStreamEnded = true
139135

140136
// Read all change events until the source reports no events.
141137
// (i.e., the `getMore` call returns empty)
142138
for {
143-
var gotEvent bool
144139
gotEvent, err = readOneChangeEvent()
145140

146141
if !gotEvent || err != nil {
147142
break
148143
}
149144
}
150145

146+
if err != nil {
147+
break
148+
}
149+
151150
default:
152151
_, err = readOneChangeEvent()
153152
}
@@ -157,22 +156,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
157156
}
158157

159158
if err != nil && !errors.Is(err, context.Canceled) {
160-
timeout := time.Minute
161-
timer := time.NewTimer(timeout)
162-
defer timer.Stop()
163-
164-
select {
165-
case <-timer.C:
166-
verifier.logger.Fatal().
167-
Err(err).
168-
Stringer("timeout", timeout).
169-
Msg("Failed to send change stream err within timeout.")
170-
case verifier.changeStreamErrChan <- err:
171-
}
172-
173-
if !changeStreamEnded {
174-
return
175-
}
159+
verifier.changeStreamErrChan <- err
176160
}
177161

178162
if changeStreamEnded {

0 commit comments

Comments
 (0)