Skip to content

Commit 53404bc

Browse files
committed
refactor
1 parent 0f645f1 commit 53404bc

File tree

7 files changed

+152
-125
lines changed

7 files changed

+152
-125
lines changed

internal/verifier/change_stream.go

Lines changed: 83 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -61,50 +61,67 @@ type ChangeStreamReader struct {
6161
metaClient *mongo.Client
6262
watcherClient *mongo.Client
6363

64-
changeStreamRunning bool
65-
ChangeEventBatchChan chan []ParsedEvent
66-
ChangeStreamEnderChan chan struct{}
67-
ChangeStreamErrChan chan error
68-
ChangeStreamDoneChan chan struct{}
64+
changeStreamRunning bool
65+
ChangeEventBatchChan chan []ParsedEvent
66+
ChangeStreamWritesOffTsChan chan primitive.Timestamp
67+
ChangeStreamErrChan chan error
68+
ChangeStreamDoneChan chan struct{}
6969

7070
startAtTs *primitive.Timestamp
7171
}
7272

7373
func (verifier *Verifier) initializeChangeStreamReaders() {
7474
verifier.srcChangeStreamReader = &ChangeStreamReader{
75-
readerType: srcReaderType,
76-
lastChangeEventTime: nil,
77-
logger: verifier.logger,
78-
namespaces: verifier.srcNamespaces,
79-
metaDBName: verifier.metaDBName,
80-
metaClient: verifier.metaClient,
81-
watcherClient: verifier.srcClient,
82-
changeStreamRunning: false,
83-
ChangeEventBatchChan: make(chan []ParsedEvent),
84-
ChangeStreamEnderChan: make(chan struct{}),
85-
ChangeStreamErrChan: make(chan error),
86-
ChangeStreamDoneChan: make(chan struct{}),
87-
startAtTs: nil,
75+
readerType: srcReaderType,
76+
lastChangeEventTime: nil,
77+
logger: verifier.logger,
78+
namespaces: verifier.srcNamespaces,
79+
metaDBName: verifier.metaDBName,
80+
metaClient: verifier.metaClient,
81+
watcherClient: verifier.srcClient,
82+
changeStreamRunning: false,
83+
ChangeEventBatchChan: make(chan []ParsedEvent),
84+
ChangeStreamWritesOffTsChan: make(chan primitive.Timestamp),
85+
ChangeStreamErrChan: make(chan error),
86+
ChangeStreamDoneChan: make(chan struct{}),
87+
startAtTs: nil,
8888
}
8989
verifier.dstChangeStreamReader = &ChangeStreamReader{
90-
readerType: dstReaderType,
91-
lastChangeEventTime: nil,
92-
logger: verifier.logger,
93-
namespaces: verifier.dstNamespaces,
94-
metaDBName: verifier.metaDBName,
95-
metaClient: verifier.metaClient,
96-
watcherClient: verifier.dstClient,
97-
changeStreamRunning: false,
98-
ChangeEventBatchChan: make(chan []ParsedEvent),
99-
ChangeStreamEnderChan: make(chan struct{}),
100-
ChangeStreamErrChan: make(chan error),
101-
ChangeStreamDoneChan: make(chan struct{}),
102-
startAtTs: nil,
90+
readerType: dstReaderType,
91+
lastChangeEventTime: nil,
92+
logger: verifier.logger,
93+
namespaces: verifier.dstNamespaces,
94+
metaDBName: verifier.metaDBName,
95+
metaClient: verifier.metaClient,
96+
watcherClient: verifier.dstClient,
97+
changeStreamRunning: false,
98+
ChangeEventBatchChan: make(chan []ParsedEvent),
99+
ChangeStreamWritesOffTsChan: make(chan primitive.Timestamp),
100+
ChangeStreamErrChan: make(chan error),
101+
ChangeStreamDoneChan: make(chan struct{}),
102+
startAtTs: nil,
103103
}
104104
}
105105

106+
// StartChangeEventHandler starts a goroutine that handles change event batches from the reader.
107+
func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *ChangeStreamReader) {
108+
go func() {
109+
for {
110+
select {
111+
case <-ctx.Done():
112+
return
113+
case batch := <-reader.ChangeEventBatchChan:
114+
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
115+
if err != nil {
116+
reader.ChangeStreamErrChan <- err
117+
}
118+
}
119+
}
120+
}()
121+
}
122+
106123
// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
107-
func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error {
124+
func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent, eventOrigin clusterType) error {
108125
if len(batch) == 0 {
109126
return nil
110127
}
@@ -126,8 +143,33 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
126143
if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil {
127144
return errors.Wrapf(err, "failed to augment stats with change event (%+v)", changeEvent)
128145
}
129-
dbNames[i] = changeEvent.Ns.DB
130-
collNames[i] = changeEvent.Ns.Coll
146+
147+
var srcDBName, srcCollName string
148+
149+
// Recheck Docs are keyed by source namespaces.
150+
// We need to retrieve the source namespaces if change events are from the destination.
151+
switch eventOrigin {
152+
case dstReaderType:
153+
if len(verifier.dstSrcNsMap) == 0 {
154+
srcDBName = changeEvent.Ns.DB
155+
srcCollName = changeEvent.Ns.Coll
156+
} else {
157+
dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll)
158+
srcNs, exist := verifier.dstSrcNsMap[dstNs]
159+
if !exist {
160+
return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs)
161+
}
162+
srcDBName, srcCollName = SplitNamespace(srcNs)
163+
}
164+
case srcReaderType:
165+
srcDBName = changeEvent.Ns.DB
166+
srcCollName = changeEvent.Ns.Coll
167+
default:
168+
return errors.Errorf("unknown event origin: %s", eventOrigin)
169+
}
170+
171+
dbNames[i] = srcDBName
172+
collNames[i] = srcCollName
131173
docIDs[i] = changeEvent.DocKey.ID
132174

133175
if changeEvent.FullDocument == nil {
@@ -183,7 +225,7 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() []bson.D {
183225
// the verifier will enqueue rechecks from those post-writesOff events. This
184226
// is unideal but shouldn’t impede correctness since post-writesOff events
185227
// shouldn’t really happen anyway by definition.
186-
func (verifier *Verifier) readAndHandleOneChangeEventBatch(
228+
func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
187229
ctx context.Context,
188230
cs *mongo.ChangeStream,
189231
) error {
@@ -222,11 +264,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
222264
return nil
223265
}
224266

225-
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
226-
if err != nil {
227-
return errors.Wrap(err, "failed to handle change events")
228-
}
229-
267+
csr.ChangeEventBatchChan <- changeEventBatch
230268
return nil
231269
}
232270

@@ -264,8 +302,8 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
264302
// If the ChangeStreamEnderChan has a message, the user has indicated that
265303
// source writes are ended. This means we should exit rather than continue
266304
// reading the change stream since there should be no more events.
267-
case writesOffTs := <-verifier.changeStreamWritesOffTsChan:
268-
verifier.logger.Debug().
305+
case writesOffTs := <-csr.ChangeStreamWritesOffTsChan:
306+
csr.logger.Debug().
269307
Interface("writesOffTimestamp", writesOffTs).
270308
Msg("Change stream thread received writesOff timestamp. Finalizing change stream.")
271309

@@ -282,23 +320,23 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
282320
}
283321

284322
if curTs == writesOffTs || curTs.After(writesOffTs) {
285-
verifier.logger.Debug().
323+
csr.logger.Debug().
286324
Interface("currentTimestamp", curTs).
287325
Interface("writesOffTimestamp", writesOffTs).
288326
Msg("Change stream has reached the writesOff timestamp. Shutting down.")
289327

290328
break
291329
}
292330

293-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
331+
err = csr.readAndHandleOneChangeEventBatch(ctx, cs)
294332

295333
if err != nil {
296334
break
297335
}
298336
}
299337

300338
default:
301-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
339+
err = csr.readAndHandleOneChangeEventBatch(ctx, cs)
302340

303341
if err == nil {
304342
err = persistResumeTokenIfNeeded()
@@ -451,24 +489,6 @@ func (csr *ChangeStreamReader) resumeTokenDocID() string {
451489
}
452490
}
453491

454-
func (csr *ChangeStreamReader) Wait() error {
455-
if csr.changeStreamRunning {
456-
csr.logger.Debug().Msgf("%s still running, signalling that writes are done and waiting for change stream to exit", csr)
457-
csr.ChangeStreamEnderChan <- struct{}{}
458-
select {
459-
case err := <-csr.ChangeStreamErrChan:
460-
csr.logger.Warn().Err(err).
461-
Msg("Received error from change stream.")
462-
return err
463-
case <-csr.ChangeEventBatchChan:
464-
csr.logger.Debug().
465-
Msg("Received completion signal from change stream.")
466-
break
467-
}
468-
}
469-
return nil
470-
}
471-
472492
func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error {
473493
token := cs.ResumeToken()
474494

internal/verifier/check.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
4040
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
4141
}
4242

43-
func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
43+
func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeStreamReader) error {
4444
select {
4545
case <-ctx.Done():
4646
return ctx.Err()
47-
case err := <-verifier.changeStreamErrChan:
47+
case err := <-csr.ChangeStreamErrChan:
4848
verifier.logger.Warn().Err(err).
49-
Msg("Received error from change stream.")
49+
Msgf("Received error from %s.", csr)
5050
return err
51-
case <-verifier.changeStreamDoneChan:
51+
case <-csr.ChangeStreamDoneChan:
5252
verifier.logger.Debug().
53-
Msg("Received completion signal from change stream.")
53+
Msgf("Received completion signal from %s.", csr)
5454
break
5555
}
5656

@@ -87,10 +87,10 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
8787
select {
8888
case err := <-verifier.srcChangeStreamReader.ChangeStreamErrChan:
8989
cancel()
90-
return err
90+
return errors.Wrapf(err, "got an error from %s", verifier.srcChangeStreamReader)
9191
case err := <-verifier.dstChangeStreamReader.ChangeStreamErrChan:
9292
cancel()
93-
return err
93+
return errors.Wrapf(err, "got an error from %s", verifier.dstChangeStreamReader)
9494
case <-ctx.Done():
9595
cancel()
9696
return nil
@@ -170,27 +170,30 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
170170
verifier.phase = Idle
171171
}()
172172

173-
verifier.mux.RLock()
174-
csRunning := verifier.changeStreamRunning
175-
verifier.mux.RUnlock()
176-
if csRunning {
177-
verifier.logger.Debug().Msg("Check: Change stream already running.")
173+
if verifier.srcChangeStreamReader.changeStreamRunning {
174+
verifier.logger.Debug().Msg("Check: Source change stream already running.")
178175
} else {
179-
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
176+
verifier.logger.Debug().Msg("Source change stream not running; starting change stream")
180177

181178
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
182179
if err != nil {
183180
return errors.Wrap(err, "failed to start change stream on source")
184181
}
182+
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
185183
}
186-
if !verifier.dstChangeStreamReader.changeStreamRunning {
184+
185+
if verifier.dstChangeStreamReader.changeStreamRunning {
186+
verifier.logger.Debug().Msg("Check: Destination change stream already running.")
187+
} else {
187188
verifier.logger.Debug().Msg("Destination change stream not running; starting change stream")
188189

189190
err = verifier.dstChangeStreamReader.StartChangeStream(ctx)
190191
if err != nil {
191192
return errors.Wrap(err, "failed to start change stream on destination")
192193
}
194+
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader)
193195
}
196+
194197
// Log out the verification status when initially booting up so it's easy to see the current state
195198
verificationStatus, err := verifier.GetVerificationStatus()
196199
if err != nil {
@@ -251,10 +254,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
251254
// It's necessary to wait for the change stream to finish before incrementing the
252255
// generation number, or the last changes will not be checked.
253256
verifier.mux.Unlock()
254-
if err = verifier.srcChangeStreamReader.Wait(); err != nil {
257+
if err = verifier.waitForChangeStream(ctx, verifier.srcChangeStreamReader); err != nil {
255258
return err
256259
}
257-
if err = verifier.dstChangeStreamReader.Wait(); err != nil {
260+
if err = verifier.waitForChangeStream(ctx, verifier.dstChangeStreamReader); err != nil {
258261
return err
259262
}
260263
verifier.mux.Lock()

internal/verifier/compare.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (verifier *Verifier) getFetcherChannels(
267267
ctx,
268268
verifier.srcClientCollection(task),
269269
verifier.srcBuildInfo,
270-
verifier.srcStartAtTs,
270+
verifier.srcChangeStreamReader.startAtTs,
271271
task,
272272
)
273273

@@ -291,7 +291,7 @@ func (verifier *Verifier) getFetcherChannels(
291291
ctx,
292292
verifier.dstClientCollection(task),
293293
verifier.dstBuildInfo,
294-
nil, //startAtTs
294+
verifier.dstChangeStreamReader.startAtTs,
295295
task,
296296
)
297297

0 commit comments

Comments
 (0)