44 "bytes"
55 "context"
66 "fmt"
7- "iter"
87 "time"
98
109 "github.com/10gen/migration-verifier/chanutil"
@@ -13,7 +12,6 @@ import (
1312 "github.com/10gen/migration-verifier/internal/retry"
1413 "github.com/10gen/migration-verifier/internal/types"
1514 "github.com/10gen/migration-verifier/internal/util"
16- "github.com/10gen/migration-verifier/mmongo/cursor"
1715 "github.com/10gen/migration-verifier/option"
1816 "github.com/pkg/errors"
1917 "go.mongodb.org/mongo-driver/v2/bson"
@@ -32,11 +30,6 @@ const (
3230 docKeyInHashedCompare = "k"
3331)
3432
35- type seqWithTs struct {
36- seq iter.Seq2 [bson.Raw , error ]
37- ts bson.Timestamp
38- }
39-
4033type docWithTs struct {
4134 doc bson.Raw
4235 ts bson.Timestamp
@@ -52,7 +45,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
5245 types.ByteCount ,
5346 error ,
5447) {
55- var srcChannel , dstChannel <- chan seqWithTs
48+ var srcChannel , dstChannel <- chan docWithTs
5649 var readSrcCallback , readDstCallback func (context.Context , * retry.FuncInfo ) error
5750
5851 results := []VerificationResult {}
@@ -107,7 +100,7 @@ func (verifier *Verifier) compareDocsFromChannels(
107100 workerNum int ,
108101 fi * retry.FuncInfo ,
109102 task * VerificationTask ,
110- srcChannel , dstChannel <- chan seqWithTs ,
103+ srcChannel , dstChannel <- chan docWithTs ,
111104) (
112105 []VerificationResult ,
113106 types.DocumentCount ,
@@ -211,7 +204,7 @@ func (verifier *Verifier) compareDocsFromChannels(
211204 for ! srcClosed || ! dstClosed {
212205 simpleTimerReset (readTimer , readTimeout )
213206
214- var srcDocsWithTs , dstDocsWithTs seqWithTs
207+ var srcDocWithTs , dstDocWithTs docWithTs
215208
216209 eg , egCtx := contextplus .ErrGroup (ctx )
217210
@@ -226,13 +219,21 @@ func (verifier *Verifier) compareDocsFromChannels(
226219 "failed to read from source after %s" ,
227220 readTimeout ,
228221 )
229- case srcDocsWithTs , alive = <- srcChannel :
222+ case srcDocWithTs , alive = <- srcChannel :
230223 if ! alive {
231224 srcClosed = true
232225 break
233226 }
234227
235228 fi .NoteSuccess ("received document from source" )
229+
230+ srcDocCount ++
231+ srcByteCount += types .ByteCount (len (srcDocWithTs .doc ))
232+ verifier .workerTracker .SetSrcCounts (
233+ workerNum ,
234+ srcDocCount ,
235+ srcByteCount ,
236+ )
236237 }
237238
238239 return nil
@@ -250,7 +251,7 @@ func (verifier *Verifier) compareDocsFromChannels(
250251 "failed to read from destination after %s" ,
251252 readTimeout ,
252253 )
253- case dstDocsWithTs , alive = <- dstChannel :
254+ case dstDocWithTs , alive = <- dstChannel :
254255 if ! alive {
255256 dstClosed = true
256257 break
@@ -270,72 +271,32 @@ func (verifier *Verifier) compareDocsFromChannels(
270271 )
271272 }
272273
273- if srcDocsWithTs .seq != nil {
274- for doc , err := range srcDocsWithTs .seq {
275- if err != nil {
276- return nil , 0 , 0 , errors .Wrapf (
277- err ,
278- "reading batch of docs from source (task: %s)" ,
279- task .PrimaryKey ,
280- )
281- }
274+ if srcDocWithTs .doc != nil {
275+ err := handleNewDoc (srcDocWithTs , true )
282276
283- srcDocCount ++
284- srcByteCount += types .ByteCount (len (doc ))
285- verifier .workerTracker .SetSrcCounts (
286- workerNum ,
287- srcDocCount ,
288- srcByteCount ,
289- )
277+ if err != nil {
290278
291- err := handleNewDoc (
292- docWithTs {
293- doc : doc ,
294- ts : srcDocsWithTs . ts ,
295- } ,
296- true ,
279+ return nil , 0 , 0 , errors . Wrapf (
280+ err ,
281+ "comparer thread failed to handle %#q's source doc (task: %s) with ID %v" ,
282+ namespace ,
283+ task . PrimaryKey ,
284+ srcDocWithTs . doc . Lookup ( "_id" ) ,
297285 )
298-
299- if err != nil {
300- return nil , 0 , 0 , errors .Wrapf (
301- err ,
302- "comparer thread failed to handle %#q's source doc (task: %s) with ID %v" ,
303- namespace ,
304- task .PrimaryKey ,
305- doc .Lookup ("_id" ),
306- )
307- }
308286 }
309-
310287 }
311288
312- if dstDocsWithTs .seq != nil {
313- for doc , err := range dstDocsWithTs .seq {
314- if err != nil {
315- return nil , 0 , 0 , errors .Wrapf (
316- err ,
317- "reading batch of docs from destination (task: %s)" ,
318- task .PrimaryKey ,
319- )
320- }
289+ if dstDocWithTs .doc != nil {
290+ err := handleNewDoc (dstDocWithTs , false )
321291
322- err := handleNewDoc (
323- docWithTs {
324- doc : doc ,
325- ts : dstDocsWithTs .ts ,
326- },
327- false ,
292+ if err != nil {
293+ return nil , 0 , 0 , errors .Wrapf (
294+ err ,
295+ "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v" ,
296+ namespace ,
297+ task .PrimaryKey ,
298+ dstDocWithTs .doc .Lookup ("_id" ),
328299 )
329-
330- if err != nil {
331- return nil , 0 , 0 , errors .Wrapf (
332- err ,
333- "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v" ,
334- namespace ,
335- task .PrimaryKey ,
336- doc .Lookup ("_id" ),
337- )
338- }
339300 }
340301 }
341302 }
@@ -466,13 +427,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) {
466427func (verifier * Verifier ) getFetcherChannelsAndCallbacks (
467428 task * VerificationTask ,
468429) (
469- <- chan seqWithTs ,
470- <- chan seqWithTs ,
430+ <- chan docWithTs ,
431+ <- chan docWithTs ,
471432 func (context.Context , * retry.FuncInfo ) error ,
472433 func (context.Context , * retry.FuncInfo ) error ,
473434) {
474- srcChannel := make (chan seqWithTs )
475- dstChannel := make (chan seqWithTs )
435+ srcChannel := make (chan docWithTs )
436+ dstChannel := make (chan docWithTs )
476437
477438 readSrcCallback := func (ctx context.Context , state * retry.FuncInfo ) error {
478439 // We open a session here so that we can read the session’s cluster
@@ -549,44 +510,38 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
549510}
550511
551512func iterateCursorToChannel (
552- ctx context.Context ,
513+ sctx context.Context ,
553514 state * retry.FuncInfo ,
554- myCursor * cursor. BatchCursor ,
555- writer chan <- seqWithTs ,
515+ cursor * mongo. Cursor ,
516+ writer chan <- docWithTs ,
556517) error {
557518 defer close (writer )
558519
559- for {
560- seq := myCursor .GetCurrentBatchIterator ()
520+ sess := mongo .SessionFromContext (sctx )
561521
522+ for cursor .Next (sctx ) {
562523 state .NoteSuccess ("received a document" )
563524
564- ct , err := myCursor . GetClusterTime ( )
525+ clusterTime , err := util . GetClusterTimeFromSession ( sess )
565526 if err != nil {
566- return errors .Wrap (err , "reading cluster time from batch " )
527+ return errors .Wrap (err , "reading cluster time from session " )
567528 }
568529
569530 err = chanutil .WriteWithDoneCheck (
570- ctx ,
531+ sctx ,
571532 writer ,
572- seqWithTs {
573- seq : seq ,
574- ts : ct ,
533+ docWithTs {
534+ doc : slices . Clone ( cursor . Current ) ,
535+ ts : clusterTime ,
575536 },
576537 )
577538
578539 if err != nil {
579- return errors .Wrapf (err , "sending iterator to compare thread" )
580- }
581-
582- if myCursor .IsFinished () {
583- return nil
584- }
585-
586- if err := myCursor .GetNext (ctx ); err != nil {
587- return errors .Wrap (err , "failed to iterate cursor" )
540+ return errors .Wrapf (err , "sending document to compare thread" )
588541 }
589542 }
543+
544+ return errors .Wrap (cursor .Err (), "failed to iterate cursor" )
590545}
591546
592547func getMapKey (docKeyValues []bson.RawValue ) string {
@@ -600,13 +555,8 @@ func getMapKey(docKeyValues []bson.RawValue) string {
600555 return keyBuffer .String ()
601556}
602557
603- func (verifier * Verifier ) getDocumentsCursor (
604- ctx context.Context ,
605- collection * mongo.Collection ,
606- clusterInfo * util.ClusterInfo ,
607- startAtTs * bson.Timestamp ,
608- task * VerificationTask ,
609- ) (* cursor.BatchCursor , error ) {
558+ func (verifier * Verifier ) getDocumentsCursor (ctx context.Context , collection * mongo.Collection , clusterInfo * util.ClusterInfo ,
559+ startAtTs * bson.Timestamp , task * VerificationTask ) (* mongo.Cursor , error ) {
610560 var findOptions bson.D
611561 runCommandOptions := options .RunCmd ()
612562 var andPredicates bson.A
@@ -723,16 +673,7 @@ func (verifier *Verifier) getDocumentsCursor(
723673 }
724674 }
725675
726- c , err := cursor .New (
727- collection .Database (),
728- collection .Database ().RunCommand (ctx , cmd , runCommandOptions ),
729- )
730-
731- if err == nil {
732- c .SetSession (mongo .SessionFromContext (ctx ))
733- }
734-
735- return c , err
676+ return collection .Database ().RunCommandCursor (ctx , cmd , runCommandOptions )
736677}
737678
738679func transformPipelineForToHashedIndexKey (
0 commit comments