Skip to content

Commit 351c94c

Browse files
committed
wip
1 parent 53ded13 commit 351c94c

File tree

5 files changed

+187
-102
lines changed

5 files changed

+187
-102
lines changed

internal/verifier/change_stream.go

Lines changed: 141 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/10gen/migration-verifier/internal/keystring"
9+
"github.com/10gen/migration-verifier/internal/logger"
910
"github.com/pkg/errors"
1011
"github.com/rs/zerolog"
1112
"go.mongodb.org/mongo-driver/bson"
@@ -50,6 +51,59 @@ func (uee UnknownEventError) Error() string {
5051
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
5152
}
5253

54+
type ChangeStreamReader struct {
55+
readerType clusterType
56+
57+
lastChangeEventTime *primitive.Timestamp
58+
logger *logger.Logger
59+
namespaces []string
60+
metaDBName string
61+
62+
metaClient *mongo.Client
63+
watcherClient *mongo.Client
64+
65+
changeStreamRunning bool
66+
ChangeEventBatchChan chan []ParsedEvent
67+
ChangeStreamEnderChan chan struct{}
68+
ChangeStreamErrChan chan error
69+
ChangeStreamDoneChan chan struct{}
70+
71+
startAtTs *primitive.Timestamp
72+
}
73+
74+
func (verifier *Verifier) initializeChangeStreamReaders() {
75+
verifier.srcChangeStreamReader = &ChangeStreamReader{
76+
readerType: srcReaderType,
77+
lastChangeEventTime: nil,
78+
logger: verifier.logger,
79+
namespaces: verifier.srcNamespaces,
80+
metaDBName: verifier.metaDBName,
81+
metaClient: verifier.metaClient,
82+
watcherClient: verifier.srcClient,
83+
changeStreamRunning: false,
84+
ChangeEventBatchChan: make(chan []ParsedEvent),
85+
ChangeStreamEnderChan: make(chan struct{}),
86+
ChangeStreamErrChan: make(chan error),
87+
ChangeStreamDoneChan: make(chan struct{}),
88+
startAtTs: nil,
89+
}
90+
verifier.dstChangeStreamReader = &ChangeStreamReader{
91+
readerType: dstReaderType,
92+
lastChangeEventTime: nil,
93+
logger: verifier.logger,
94+
namespaces: verifier.dstNamespaces,
95+
metaDBName: verifier.metaDBName,
96+
metaClient: verifier.metaClient,
97+
watcherClient: verifier.dstClient,
98+
changeStreamRunning: false,
99+
ChangeEventBatchChan: make(chan []ParsedEvent),
100+
ChangeStreamEnderChan: make(chan struct{}),
101+
ChangeStreamErrChan: make(chan error),
102+
ChangeStreamDoneChan: make(chan struct{}),
103+
startAtTs: nil,
104+
}
105+
}
106+
53107
// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
54108
func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error {
55109
if len(batch) == 0 {
@@ -62,11 +116,6 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
62116
dataSizes := make([]int, len(batch))
63117

64118
for i, changeEvent := range batch {
65-
if changeEvent.ClusterTime != nil &&
66-
(verifier.lastChangeEventTime == nil ||
67-
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
68-
verifier.lastChangeEventTime = changeEvent.ClusterTime
69-
}
70119
switch changeEvent.OpType {
71120
case "delete":
72121
fallthrough
@@ -109,20 +158,20 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
109158
// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still
110159
// want to verify migrations from 4.2. fullDocument is unlikely to be a
111160
// bottleneck anyway.
112-
func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
113-
if len(verifier.srcNamespaces) == 0 {
114-
return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}}
161+
func (csr *ChangeStreamReader) GetChangeStreamFilter() []bson.D {
162+
if len(csr.namespaces) == 0 {
163+
return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", csr.metaDBName}}}}}}}
115164
}
116165
filter := bson.A{}
117-
for _, ns := range verifier.srcNamespaces {
166+
for _, ns := range csr.namespaces {
118167
db, coll := SplitNamespace(ns)
119168
filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}})
120169
}
121170
stage := bson.D{{"$match", bson.D{{"$or", filter}}}}
122171
return []bson.D{stage}
123172
}
124173

125-
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
174+
func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
126175
defer cs.Close(ctx)
127176

128177
var lastPersistedTime time.Time
@@ -132,7 +181,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
132181
return nil
133182
}
134183

135-
err := verifier.persistChangeStreamResumeToken(ctx, cs)
184+
err := csr.persistChangeStreamResumeToken(ctx, cs)
136185
if err == nil {
137186
lastPersistedTime = time.Now()
138187
}
@@ -159,11 +208,19 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
159208
return false, errors.Wrap(err, "failed to decode change event")
160209
}
161210

211+
if changeEventBatch[eventsRead].ClusterTime != nil &&
212+
(csr.lastChangeEventTime == nil ||
213+
csr.lastChangeEventTime.Compare(*changeEventBatch[eventsRead].ClusterTime) < 0) {
214+
csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime
215+
}
216+
162217
eventsRead++
163218
}
164219

165220
if eventsRead > 0 {
166-
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
221+
csr.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
222+
csr.ChangeEventBatchChan <- changeEventBatch
223+
// TODO: run change stream event handler in a verifier goroutine.
167224
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
168225
if err != nil {
169226
return false, errors.Wrap(err, "failed to handle change events")
@@ -181,16 +238,16 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
181238

182239
// If the context is canceled, return immmediately.
183240
case <-ctx.Done():
184-
verifier.logger.Debug().
241+
csr.logger.Debug().
185242
Err(ctx.Err()).
186243
Msg("Change stream quitting.")
187244
return
188245

189-
// If the changeStreamEnderChan has a message, the user has indicated that
246+
// If the ChangeStreamEnderChan has a message, the user has indicated that
190247
// source writes are ended. This means we should exit rather than continue
191248
// reading the change stream since there should be no more events.
192-
case <-verifier.changeStreamEnderChan:
193-
verifier.logger.Debug().
249+
case <-csr.ChangeStreamEnderChan:
250+
csr.logger.Debug().
194251
Msg("Change stream thread received shutdown request.")
195252

196253
changeStreamEnded = true
@@ -215,64 +272,63 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
215272
}
216273

217274
if err != nil && !errors.Is(err, context.Canceled) {
218-
verifier.logger.Debug().
275+
csr.logger.Debug().
219276
Err(err).
220277
Msg("Sending change stream error.")
221278

222-
verifier.changeStreamErrChan <- err
279+
csr.ChangeStreamErrChan <- err
223280

224281
if !changeStreamEnded {
225282
break
226283
}
227284
}
228285

229286
if changeStreamEnded {
230-
verifier.mux.Lock()
231-
verifier.changeStreamRunning = false
232-
if verifier.lastChangeEventTime != nil {
233-
verifier.srcStartAtTs = verifier.lastChangeEventTime
287+
csr.changeStreamRunning = false
288+
if csr.lastChangeEventTime != nil {
289+
csr.startAtTs = csr.lastChangeEventTime
234290
}
235-
verifier.mux.Unlock()
236291
// since we have started Recheck, we must signal that we have
237292
// finished the change stream changes so that Recheck can continue.
238-
verifier.changeStreamDoneChan <- struct{}{}
293+
csr.ChangeStreamDoneChan <- struct{}{}
239294
break
240295
}
241296
}
242297

243-
infoLog := verifier.logger.Info()
244-
if verifier.lastChangeEventTime == nil {
298+
infoLog := csr.logger.Info()
299+
if csr.lastChangeEventTime == nil {
245300
infoLog = infoLog.Str("changeStreamStopTime", "none")
246301
} else {
247-
infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime)
302+
infoLog = infoLog.Interface("changeStreamStopTime", *csr.lastChangeEventTime)
248303
}
249304

250305
infoLog.Msg("Change stream is done.")
251306
}
252307

253308
// StartChangeStream starts the change stream.
254-
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
255-
pipeline := verifier.GetChangeStreamFilter()
309+
func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
310+
pipeline := csr.GetChangeStreamFilter()
311+
256312
opts := options.ChangeStream().
257313
SetMaxAwaitTime(1 * time.Second).
258314
SetFullDocument(options.UpdateLookup)
259315

260-
savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
316+
savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx)
261317
if err != nil {
262318
return errors.Wrap(err, "failed to load persisted change stream resume token")
263319
}
264320

265-
csStartLogEvent := verifier.logger.Info()
321+
csStartLogEvent := csr.logger.Info()
266322

267323
if savedResumeToken != nil {
268324
logEvent := csStartLogEvent.
269-
Stringer("resumeToken", savedResumeToken)
325+
Stringer(csr.resumeTokenDocID(), savedResumeToken)
270326

271327
ts, err := extractTimestampFromResumeToken(savedResumeToken)
272328
if err == nil {
273329
logEvent = addUnixTimeToLogEvent(ts.T, logEvent)
274330
} else {
275-
verifier.logger.Warn().
331+
csr.logger.Warn().
276332
Err(err).
277333
Msg("Failed to extract timestamp from persisted resume token.")
278334
}
@@ -281,25 +337,25 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
281337

282338
opts = opts.SetStartAfter(savedResumeToken)
283339
} else {
284-
csStartLogEvent.Msg("Starting change stream from current source cluster time.")
340+
csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType)
285341
}
286342

287-
sess, err := verifier.srcClient.StartSession()
343+
sess, err := csr.watcherClient.StartSession()
288344
if err != nil {
289345
return errors.Wrap(err, "failed to start session")
290346
}
291347
sctx := mongo.NewSessionContext(ctx, sess)
292-
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
348+
changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts)
293349
if err != nil {
294350
return errors.Wrap(err, "failed to open change stream")
295351
}
296352

297-
err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
353+
err = csr.persistChangeStreamResumeToken(ctx, changeStream)
298354
if err != nil {
299355
return err
300356
}
301357

302-
csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
358+
csTimestamp, err := extractTimestampFromResumeToken(changeStream.ResumeToken())
303359
if err != nil {
304360
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
305361
}
@@ -309,16 +365,14 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
309365
return errors.Wrap(err, "failed to read cluster time from session")
310366
}
311367

312-
verifier.srcStartAtTs = &csTimestamp
368+
csr.startAtTs = &csTimestamp
313369
if csTimestamp.After(clusterTime) {
314-
verifier.srcStartAtTs = &clusterTime
370+
csr.startAtTs = &clusterTime
315371
}
316372

317-
verifier.mux.Lock()
318-
verifier.changeStreamRunning = true
319-
verifier.mux.Unlock()
373+
csr.changeStreamRunning = true
320374

321-
go verifier.iterateChangeStream(ctx, srcChangeStream)
375+
go csr.iterateChangeStream(ctx, changeStream)
322376

323377
return nil
324378
}
@@ -327,16 +381,16 @@ func addUnixTimeToLogEvent[T constraints.Integer](unixTime T, event *zerolog.Eve
327381
return event.Time("timestampTime", time.Unix(int64(unixTime), int64(0)))
328382
}
329383

330-
func (v *Verifier) getChangeStreamMetadataCollection() *mongo.Collection {
331-
return v.metaClient.Database(v.metaDBName).Collection(metadataChangeStreamCollectionName)
384+
func (csr *ChangeStreamReader) getChangeStreamMetadataCollection() *mongo.Collection {
385+
return csr.metaClient.Database(csr.metaDBName).Collection(metadataChangeStreamCollectionName)
332386
}
333387

334-
func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) {
335-
coll := verifier.getChangeStreamMetadataCollection()
388+
func (csr *ChangeStreamReader) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) {
389+
coll := csr.getChangeStreamMetadataCollection()
336390

337391
token, err := coll.FindOne(
338392
ctx,
339-
bson.D{{"_id", "resumeToken"}},
393+
bson.D{{"_id", csr.resumeTokenDocID()}},
340394
).Raw()
341395

342396
if errors.Is(err, mongo.ErrNoDocuments) {
@@ -346,26 +400,59 @@ func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson
346400
return token, err
347401
}
348402

349-
func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error {
403+
func (csr *ChangeStreamReader) String() string {
404+
return fmt.Sprintf("%s change stream reader", csr.readerType)
405+
}
406+
407+
func (csr *ChangeStreamReader) resumeTokenDocID() string {
408+
switch csr.readerType {
409+
case srcReaderType:
410+
return "srcResumeToken"
411+
case dstReaderType:
412+
return "dstResumeToken"
413+
default:
414+
panic("unknown readerType: " + csr.readerType)
415+
}
416+
}
417+
418+
func (csr *ChangeStreamReader) Wait() error {
419+
if csr.changeStreamRunning {
420+
csr.logger.Debug().Msgf("%s still running, signalling that writes are done and waiting for change stream to exit", csr)
421+
csr.ChangeStreamEnderChan <- struct{}{}
422+
select {
423+
case err := <-csr.ChangeStreamErrChan:
424+
csr.logger.Warn().Err(err).
425+
Msg("Received error from change stream.")
426+
return err
427+
case <-csr.ChangeEventBatchChan:
428+
csr.logger.Debug().
429+
Msg("Received completion signal from change stream.")
430+
break
431+
}
432+
}
433+
return nil
434+
}
435+
436+
func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error {
350437
token := cs.ResumeToken()
351438

352-
coll := verifier.getChangeStreamMetadataCollection()
439+
coll := csr.getChangeStreamMetadataCollection()
353440
_, err := coll.ReplaceOne(
354441
ctx,
355-
bson.D{{"_id", "resumeToken"}},
442+
bson.D{{"_id", csr.resumeTokenDocID()}},
356443
token,
357444
options.Replace().SetUpsert(true),
358445
)
359446

360447
if err == nil {
361448
ts, err := extractTimestampFromResumeToken(token)
362449

363-
logEvent := verifier.logger.Debug()
450+
logEvent := csr.logger.Debug()
364451

365452
if err == nil {
366453
logEvent = addUnixTimeToLogEvent(ts.T, logEvent)
367454
} else {
368-
verifier.logger.Warn().Err(err).
455+
csr.logger.Warn().Err(err).
369456
Msg("failed to extract resume token timestamp")
370457
}
371458

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestChangeStreamFilter(t *testing.T) {
1616
verifier := Verifier{}
1717
verifier.SetMetaDBName("metadb")
1818
require.Equal(t, []bson.D{{{"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}}}},
19-
verifier.GetChangeStreamFilter())
19+
verifier.GetChangeStreamFilter(verifier.srcNamespaces))
2020
verifier.srcNamespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
2121
require.Equal(t, []bson.D{
2222
{{"$match", bson.D{
@@ -27,7 +27,7 @@ func TestChangeStreamFilter(t *testing.T) {
2727
bson.D{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
2828
}},
2929
}}},
30-
}, verifier.GetChangeStreamFilter())
30+
}, verifier.GetChangeStreamFilter(verifier.srcNamespaces))
3131
}
3232

3333
// TestChangeStreamResumability creates a verifier, starts its change stream,

0 commit comments

Comments
 (0)