44 "context"
55 "fmt"
66
7+ "github.com/10gen/migration-verifier/internal/reportutils"
78 "github.com/10gen/migration-verifier/internal/retry"
89 "github.com/10gen/migration-verifier/internal/types"
910 "github.com/pkg/errors"
@@ -194,19 +195,40 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
194195func (verifier * Verifier ) GenerateRecheckTasks (ctx context.Context ) error {
195196 prevGeneration := verifier .getPreviousGenerationWhileLocked ()
196197
197- verifier .logger .Debug ().Msgf ("Creating recheck tasks from generation %d’s %s documents" , prevGeneration , recheckQueue )
198+ recheckColl := verifier .verificationDatabase ().Collection (recheckQueue )
199+ estimatedRechecks , err := recheckColl .EstimatedDocumentCount (ctx )
200+ if err != nil {
201+ return errors .Wrapf (err ,
202+ "failed to retrieve %#q’s estimated document count" ,
203+ verifier .verificationDatabase ().Name ()+ "." + recheckQueue ,
204+ )
205+ }
206+
207+ verifier .logger .Debug ().
208+ Int ("enqueuedInGeneration" , prevGeneration ).
209+ Int ("estimatedRechecks" , int (estimatedRechecks )).
210+ Msgf ("Creating recheck tasks from enqueued rechecks." )
198211
199212 // We generate one recheck task per collection, unless
200213 // 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
201214 // the 16MB BSON limit)
202215 // 2) The size of the data would exceed our desired partition size. This limits memory use
203216 // during the recheck phase.
217+ // 3) The number of documents exceeds $rechecksCount/$numWorkers. We do
218+ // this to prevent one thread from doing all of the rechecks.
219+
204220 prevDBName , prevCollName := "" , ""
205221 var idAccum []interface {}
206222 var idLenAccum int
207223 var dataSizeAccum int64
208224 const maxIdsSize = 12 * 1024 * 1024
209- cursor , err := verifier .verificationDatabase ().Collection (recheckQueue ).Find (
225+ maxDocsPerTask := estimatedRechecks / int64 (verifier .numWorkers )
226+
227+ if maxDocsPerTask < int64 (verifier .numWorkers ) {
228+ maxDocsPerTask = int64 (verifier .numWorkers )
229+ }
230+
231+ cursor , err := recheckColl .Find (
210232 ctx , bson.D {{"_id.generation" , prevGeneration }}, options .Find ().SetSort (bson.D {{"_id" , 1 }}))
211233 if err != nil {
212234 return err
@@ -227,10 +249,9 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
227249 idRaw := cursor .Current .Lookup ("_id" , "docID" )
228250 idLen := len (idRaw .Value )
229251
230- verifier .logger .Debug ().Msgf ("Found persisted recheck doc for %s.%s" , doc .PrimaryKey .DatabaseName , doc .PrimaryKey .CollectionName )
231-
232252 if doc .PrimaryKey .DatabaseName != prevDBName ||
233253 doc .PrimaryKey .CollectionName != prevCollName ||
254+ int64 (len (idAccum )) > maxDocsPerTask ||
234255 idLenAccum >= maxIdsSize ||
235256 dataSizeAccum >= verifier .partitionSizeInBytes {
236257 namespace := prevDBName + "." + prevCollName
@@ -239,10 +260,11 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
239260 if err != nil {
240261 return err
241262 }
242- verifier .logger .Debug ().Msgf (
243- "Created ID verification task for namespace %s with %d ids, " +
244- "%d id bytes and %d data bytes" ,
245- namespace , len (idAccum ), idLenAccum , dataSizeAccum )
263+ verifier .logger .Debug ().
264+ Str ("namespace" , namespace ).
265+ Int ("numDocuments" , len (idAccum )).
266+ Str ("dataSize" , reportutils .FmtBytes (dataSizeAccum )).
267+ Msg ("Created document recheck task." )
246268 }
247269 prevDBName = doc .PrimaryKey .DatabaseName
248270 prevCollName = doc .PrimaryKey .CollectionName
0 commit comments