44 "bytes"
55 "context"
66 "fmt"
7+ "iter"
78 "time"
89
910 "github.com/10gen/migration-verifier/chanutil"
@@ -12,6 +13,7 @@ import (
1213 "github.com/10gen/migration-verifier/internal/retry"
1314 "github.com/10gen/migration-verifier/internal/types"
1415 "github.com/10gen/migration-verifier/internal/util"
16+ "github.com/10gen/migration-verifier/mmongo/cursor"
1517 "github.com/10gen/migration-verifier/option"
1618 "github.com/pkg/errors"
1719 "go.mongodb.org/mongo-driver/v2/bson"
@@ -30,6 +32,11 @@ const (
3032 docKeyInHashedCompare = "k"
3133)
3234
35+ type seqWithTs struct {
36+ seq iter.Seq2 [bson.Raw , error ]
37+ ts bson.Timestamp
38+ }
39+
3340type docWithTs struct {
3441 doc bson.Raw
3542 ts bson.Timestamp
@@ -45,7 +52,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
4552 types.ByteCount ,
4653 error ,
4754) {
48- var srcChannel , dstChannel <- chan docWithTs
55+ var srcChannel , dstChannel <- chan seqWithTs
4956 var readSrcCallback , readDstCallback func (context.Context , * retry.FuncInfo ) error
5057
5158 results := []VerificationResult {}
@@ -100,7 +107,7 @@ func (verifier *Verifier) compareDocsFromChannels(
100107 workerNum int ,
101108 fi * retry.FuncInfo ,
102109 task * VerificationTask ,
103- srcChannel , dstChannel <- chan docWithTs ,
110+ srcChannel , dstChannel <- chan seqWithTs ,
104111) (
105112 []VerificationResult ,
106113 types.DocumentCount ,
@@ -204,7 +211,7 @@ func (verifier *Verifier) compareDocsFromChannels(
204211 for ! srcClosed || ! dstClosed {
205212 simpleTimerReset (readTimer , readTimeout )
206213
207- var srcDocWithTs , dstDocWithTs docWithTs
214+ var srcDocsWithTs , dstDocsWithTs seqWithTs
208215
209216 eg , egCtx := contextplus .ErrGroup (ctx )
210217
@@ -219,21 +226,13 @@ func (verifier *Verifier) compareDocsFromChannels(
219226 "failed to read from source after %s" ,
220227 readTimeout ,
221228 )
222- case srcDocWithTs , alive = <- srcChannel :
229+ case srcDocsWithTs , alive = <- srcChannel :
223230 if ! alive {
224231 srcClosed = true
225232 break
226233 }
227234
228235 fi .NoteSuccess ("received document from source" )
229-
230- srcDocCount ++
231- srcByteCount += types .ByteCount (len (srcDocWithTs .doc ))
232- verifier .workerTracker .SetSrcCounts (
233- workerNum ,
234- srcDocCount ,
235- srcByteCount ,
236- )
237236 }
238237
239238 return nil
@@ -251,7 +250,7 @@ func (verifier *Verifier) compareDocsFromChannels(
251250 "failed to read from destination after %s" ,
252251 readTimeout ,
253252 )
254- case dstDocWithTs , alive = <- dstChannel :
253+ case dstDocsWithTs , alive = <- dstChannel :
255254 if ! alive {
256255 dstClosed = true
257256 break
@@ -271,32 +270,72 @@ func (verifier *Verifier) compareDocsFromChannels(
271270 )
272271 }
273272
274- if srcDocWithTs .doc != nil {
275- err := handleNewDoc (srcDocWithTs , true )
273+ if srcDocsWithTs .seq != nil {
274+ for doc , err := range srcDocsWithTs .seq {
275+ if err != nil {
276+ return nil , 0 , 0 , errors .Wrapf (
277+ err ,
278+ "reading batch of docs from source (task: %s)" ,
279+ task .PrimaryKey ,
280+ )
281+ }
276282
277- if err != nil {
283+ srcDocCount ++
284+ srcByteCount += types .ByteCount (len (doc ))
285+ verifier .workerTracker .SetSrcCounts (
286+ workerNum ,
287+ srcDocCount ,
288+ srcByteCount ,
289+ )
278290
279- return nil , 0 , 0 , errors . Wrapf (
280- err ,
281- "comparer thread failed to handle %#q's source doc (task: %s) with ID %v" ,
282- namespace ,
283- task . PrimaryKey ,
284- srcDocWithTs . doc . Lookup ( "_id" ) ,
291+ err := handleNewDoc (
292+ docWithTs {
293+ doc : doc ,
294+ ts : srcDocsWithTs . ts ,
295+ } ,
296+ true ,
285297 )
298+
299+ if err != nil {
300+ return nil , 0 , 0 , errors .Wrapf (
301+ err ,
302+ "comparer thread failed to handle %#q's source doc (task: %s) with ID %v" ,
303+ namespace ,
304+ task .PrimaryKey ,
305+ doc .Lookup ("_id" ),
306+ )
307+ }
286308 }
309+
287310 }
288311
289- if dstDocWithTs .doc != nil {
290- err := handleNewDoc (dstDocWithTs , false )
312+ if dstDocsWithTs .seq != nil {
313+ for doc , err := range dstDocsWithTs .seq {
314+ if err != nil {
315+ return nil , 0 , 0 , errors .Wrapf (
316+ err ,
317+ "reading batch of docs from destination (task: %s)" ,
318+ task .PrimaryKey ,
319+ )
320+ }
291321
292- if err != nil {
293- return nil , 0 , 0 , errors .Wrapf (
294- err ,
295- "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v" ,
296- namespace ,
297- task .PrimaryKey ,
298- dstDocWithTs .doc .Lookup ("_id" ),
322+ err := handleNewDoc (
323+ docWithTs {
324+ doc : doc ,
325+ ts : dstDocsWithTs .ts ,
326+ },
327+ false ,
299328 )
329+
330+ if err != nil {
331+ return nil , 0 , 0 , errors .Wrapf (
332+ err ,
333+ "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v" ,
334+ namespace ,
335+ task .PrimaryKey ,
336+ doc .Lookup ("_id" ),
337+ )
338+ }
300339 }
301340 }
302341 }
@@ -427,13 +466,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) {
427466func (verifier * Verifier ) getFetcherChannelsAndCallbacks (
428467 task * VerificationTask ,
429468) (
430- <- chan docWithTs ,
431- <- chan docWithTs ,
469+ <- chan seqWithTs ,
470+ <- chan seqWithTs ,
432471 func (context.Context , * retry.FuncInfo ) error ,
433472 func (context.Context , * retry.FuncInfo ) error ,
434473) {
435- srcChannel := make (chan docWithTs )
436- dstChannel := make (chan docWithTs )
474+ srcChannel := make (chan seqWithTs )
475+ dstChannel := make (chan seqWithTs )
437476
438477 readSrcCallback := func (ctx context.Context , state * retry.FuncInfo ) error {
439478 // We open a session here so that we can read the session’s cluster
@@ -510,38 +549,44 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
510549}
511550
512551func iterateCursorToChannel (
513- sctx context.Context ,
552+ ctx context.Context ,
514553 state * retry.FuncInfo ,
515- cursor * mongo. Cursor ,
516- writer chan <- docWithTs ,
554+ myCursor * cursor. BatchCursor ,
555+ writer chan <- seqWithTs ,
517556) error {
518557 defer close (writer )
519558
520- sess := mongo .SessionFromContext (sctx )
559+ for {
560+ seq := myCursor .GetCurrentBatchIterator ()
521561
522- for cursor .Next (sctx ) {
523562 state .NoteSuccess ("received a document" )
524563
525- clusterTime , err := util . GetClusterTimeFromSession ( sess )
564+ ct , err := myCursor . GetClusterTime ( )
526565 if err != nil {
527- return errors .Wrap (err , "reading cluster time from session " )
566+ return errors .Wrap (err , "reading cluster time from batch " )
528567 }
529568
530569 err = chanutil .WriteWithDoneCheck (
531- sctx ,
570+ ctx ,
532571 writer ,
533- docWithTs {
534- doc : slices . Clone ( cursor . Current ) ,
535- ts : clusterTime ,
572+ seqWithTs {
573+ seq : seq ,
574+ ts : ct ,
536575 },
537576 )
538577
539578 if err != nil {
540- return errors .Wrapf (err , "sending document to compare thread" )
579+ return errors .Wrapf (err , "sending iterator to compare thread" )
580+ }
581+
582+ if myCursor .IsFinished () {
583+ return nil
541584 }
542- }
543585
544- return errors .Wrap (cursor .Err (), "failed to iterate cursor" )
586+ if err := myCursor .GetNext (ctx ); err != nil {
587+ return errors .Wrap (err , "failed to iterate cursor" )
588+ }
589+ }
545590}
546591
547592func getMapKey (docKeyValues []bson.RawValue ) string {
@@ -555,8 +600,13 @@ func getMapKey(docKeyValues []bson.RawValue) string {
555600 return keyBuffer .String ()
556601}
557602
558- func (verifier * Verifier ) getDocumentsCursor (ctx context.Context , collection * mongo.Collection , clusterInfo * util.ClusterInfo ,
559- startAtTs * bson.Timestamp , task * VerificationTask ) (* mongo.Cursor , error ) {
603+ func (verifier * Verifier ) getDocumentsCursor (
604+ ctx context.Context ,
605+ collection * mongo.Collection ,
606+ clusterInfo * util.ClusterInfo ,
607+ startAtTs * bson.Timestamp ,
608+ task * VerificationTask ,
609+ ) (* cursor.BatchCursor , error ) {
560610 var findOptions bson.D
561611 runCommandOptions := options .RunCmd ()
562612 var andPredicates bson.A
@@ -673,7 +723,16 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
673723 }
674724 }
675725
676- return collection .Database ().RunCommandCursor (ctx , cmd , runCommandOptions )
726+ c , err := cursor .New (
727+ collection .Database (),
728+ collection .Database ().RunCommand (ctx , cmd , runCommandOptions ),
729+ )
730+
731+ if err == nil {
732+ c .SetSession (mongo .SessionFromContext (ctx ))
733+ }
734+
735+ return c , err
677736}
678737
679738func transformPipelineForToHashedIndexKey (
0 commit comments