@@ -11,14 +11,16 @@ import (
1111 "github.com/10gen/migration-verifier/internal/util"
1212 "github.com/10gen/migration-verifier/mbson"
1313 "github.com/pkg/errors"
14- "github.com/samber/lo"
1514 "go.mongodb.org/mongo-driver/bson"
1615 "go.mongodb.org/mongo-driver/bson/bsontype"
1716 "go.mongodb.org/mongo-driver/mongo"
1817 "go.mongodb.org/mongo-driver/mongo/options"
1918)
2019
2120const (
21+ recheckBatchByteLimit = 1024 * 1024
22+ recheckBatchCountLimit = 1000
23+
2224 recheckQueueCollectionNameBase = "recheckQueue"
2325)
2426
@@ -71,22 +73,6 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
7173 )
7274}
7375
74- // This will split the given slice into *roughly* the given number of chunks.
75- // It may end up being more or fewer, but it should be pretty close.
76- func splitToChunks [T any , Slice ~ []T ](elements Slice , numChunks int ) []Slice {
77- if numChunks < 1 {
78- panic (fmt .Sprintf ("numChunks (%v) should be >=1" , numChunks ))
79- }
80-
81- elsPerChunk := len (elements ) / numChunks
82-
83- if elsPerChunk == 0 {
84- elsPerChunk = 1
85- }
86-
87- return lo .Chunk (elements , elsPerChunk )
88- }
89-
9076func (verifier * Verifier ) insertRecheckDocs (
9177 ctx context.Context ,
9278 dbNames []string ,
@@ -106,29 +92,46 @@ func (verifier *Verifier) insertRecheckDocs(
10692
10793 generation , _ := verifier .getGenerationWhileLocked ()
10894
109- docIDIndexes := lo .Range (len (dbNames ))
110- indexesPerThread := splitToChunks (docIDIndexes , verifier .numWorkers )
111-
11295 eg , groupCtx := contextplus .ErrGroup (ctx )
11396
11497 genCollection := verifier .getRecheckQueueCollection (generation )
11598
116- for _ , curThreadIndexes := range indexesPerThread {
117- eg . Go ( func () error {
118- models := make ([]mongo. WriteModel , len ( curThreadIndexes ))
119- for m , i := range curThreadIndexes {
120- recheckDoc := RecheckDoc {
121- PrimaryKey : RecheckPrimaryKey {
122- SrcDatabaseName : dbNames [ i ] ,
123- SrcCollectionName : collNames [i ],
124- DocumentID : rawDocIDs [i ],
125- },
126- DataSize : dataSizes [i ],
127- }
99+ var recheckBatches [][]mongo. WriteModel
100+ var curRechecks []mongo. WriteModel
101+ curBatchSize := 0
102+ for i , dbName := range dbNames {
103+ recheckDoc := RecheckDoc {
104+ PrimaryKey : RecheckPrimaryKey {
105+ SrcDatabaseName : dbName ,
106+ SrcCollectionName : collNames [i ],
107+ DocumentID : rawDocIDs [i ],
108+ },
109+ DataSize : dataSizes [i ],
110+ }
128111
129- models [m ] = mongo .NewInsertOneModel ().
130- SetDocument (recheckDoc )
131- }
112+ recheckRaw , err := bson .Marshal (recheckDoc )
113+ if err != nil {
114+ return errors .Wrapf (err , "marshaling recheck for %#q" , dbName + "." + collNames [i ])
115+ }
116+
117+ curRechecks = append (
118+ curRechecks ,
119+ mongo .NewInsertOneModel ().SetDocument (recheckDoc ),
120+ )
121+ curBatchSize += len (recheckRaw )
122+ if curBatchSize > recheckBatchByteLimit || len (recheckRaw ) >= recheckBatchCountLimit {
123+ recheckBatches = append (recheckBatches , curRechecks )
124+ curRechecks = nil
125+ curBatchSize = 0
126+ }
127+ }
128+
129+ if len (curRechecks ) > 0 {
130+ recheckBatches = append (recheckBatches , curRechecks )
131+ }
132+
133+ for _ , models := range recheckBatches {
134+ eg .Go (func () error {
132135
133136 retryer := retry .New ()
134137 err := retryer .WithCallback (
@@ -165,22 +168,22 @@ func (verifier *Verifier) insertRecheckDocs(
165168 len (models ),
166169 ).Run (groupCtx , verifier .logger )
167170
168- return errors .Wrapf (err , "failed to persist %d recheck(s) for generation %d " , len (models ), generation )
171+ return errors .Wrapf (err , "batch of %d rechecks " , len (models ))
169172 })
170173 }
171174
172175 if err := eg .Wait (); err != nil {
173176 return errors .Wrapf (
174177 err ,
175- "failed to persist %d recheck(s) for generation %d" ,
176- len (dbNames ),
178+ "persisting %d recheck(s) for generation %d" ,
179+ len (documentIDs ),
177180 generation ,
178181 )
179182 }
180183
181184 verifier .logger .Trace ().
182185 Int ("generation" , generation ).
183- Int ("count" , len (dbNames )).
186+ Int ("count" , len (documentIDs )).
184187 Msg ("Persisted rechecks." )
185188
186189 return nil
@@ -335,13 +338,6 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
335338 var totalDocs types.DocumentCount
336339 var dataSizeAccum , totalRecheckData int64
337340
338- maxDocsPerTask := max (
339- int (rechecksCount )/ verifier .numWorkers ,
340- verifier .numWorkers ,
341- )
342-
343- maxDocsPerTask = min (maxDocsPerTask , maxRecheckIDs )
344-
345341 // The sort here is important because the recheck _id is an embedded
346342 // document that includes the namespace. Thus, all rechecks for a given
347343 // namespace will be consecutive in this query’s result.
@@ -409,7 +405,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
409405 //
410406 if doc .PrimaryKey .SrcDatabaseName != prevDBName ||
411407 doc .PrimaryKey .SrcCollectionName != prevCollName ||
412- len (idAccum ) > maxDocsPerTask ||
408+ len (idAccum ) > maxRecheckIDs ||
413409 types .ByteCount (idsSizer .Len ()) >= maxRecheckIDsBytes ||
414410 dataSizeAccum >= verifier .partitionSizeInBytes {
415411
0 commit comments