44 "context"
55 "fmt"
66
7+ "github.com/10gen/migration-verifier/internal/reportutils"
78 "github.com/10gen/migration-verifier/internal/retry"
89 "github.com/10gen/migration-verifier/internal/types"
910 "github.com/pkg/errors"
@@ -18,11 +19,16 @@ const (
1819 recheckQueue = "recheckQueue"
1920 maxBSONObjSize = 16 * 1024 * 1024
2021 recheckInserterThreadsSoftMax = 100
22+ maxIdsPerRecheckTask = 12 * 1024 * 1024
2123)
2224
2325// RecheckPrimaryKey stores the implicit type of recheck to perform
2426// Currently, we only handle document mismatches/change stream updates,
2527// so DatabaseName, CollectionName, and DocumentID must always be specified.
28+ //
29+ // NB: Order is important here so that, within a given generation,
30+ // sorting by _id will guarantee that all rechecks for a given
31+ // namespace appear consecutively.
2632type RecheckPrimaryKey struct {
2733 Generation int `bson:"generation"`
2834 DatabaseName string `bson:"db"`
@@ -194,76 +200,133 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
194200func (verifier * Verifier ) GenerateRecheckTasks (ctx context.Context ) error {
195201 prevGeneration := verifier .getPreviousGenerationWhileLocked ()
196202
197- verifier .logger .Debug ().Msgf ("Creating recheck tasks from generation %d’s %s documents" , prevGeneration , recheckQueue )
203+ findFilter := bson.D {{"_id.generation" , prevGeneration }}
204+
205+ verifier .logger .Debug ().
206+ Int ("priorGeneration" , prevGeneration ).
207+ Msgf ("Counting prior generation’s enqueued rechecks." )
208+
209+ recheckColl := verifier .verificationDatabase ().Collection (recheckQueue )
210+ rechecksCount , err := recheckColl .CountDocuments (ctx , findFilter )
211+ if err != nil {
212+ return errors .Wrapf (err ,
213+ "failed to count generation %d’s rechecks" ,
214+ prevGeneration ,
215+ )
216+ }
217+
218+ verifier .logger .Debug ().
219+ Int ("priorGeneration" , prevGeneration ).
220+ Int ("rechecksCount" , int (rechecksCount )).
221+ Msgf ("Creating recheck tasks from prior generation’s enqueued rechecks." )
198222
199223 // We generate one recheck task per collection, unless
200224 // 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
201225 // the 16MB BSON limit)
202226 // 2) The size of the data would exceed our desired partition size. This limits memory use
203227 // during the recheck phase.
204- prevDBName , prevCollName := "" , ""
228+ // 3) The number of documents exceeds $rechecksCount/$numWorkers. We do
229+ // this to prevent one thread from doing all of the rechecks.
230+
231+ var prevDBName , prevCollName string
205232 var idAccum []interface {}
206233 var idLenAccum int
207234 var dataSizeAccum int64
208- const maxIdsSize = 12 * 1024 * 1024
209- cursor , err := verifier .verificationDatabase ().Collection (recheckQueue ).Find (
210- ctx , bson.D {{"_id.generation" , prevGeneration }}, options .Find ().SetSort (bson.D {{"_id" , 1 }}))
235+
236+ maxDocsPerTask := rechecksCount / int64 (verifier .numWorkers )
237+
238+ if maxDocsPerTask < int64 (verifier .numWorkers ) {
239+ maxDocsPerTask = int64 (verifier .numWorkers )
240+ }
241+
242+ // The sort here is important because the recheck _id is an embedded
243+ // document that includes the namespace. Thus, all rechecks for a given
244+ // namespace will be consecutive in this query’s result.
245+ cursor , err := recheckColl .Find (
246+ ctx ,
247+ findFilter ,
248+ options .Find ().SetSort (bson.D {{"_id" , 1 }}),
249+ )
211250 if err != nil {
212251 return err
213252 }
214253 defer cursor .Close (ctx )
254+
255+ persistBufferedRechecks := func () error {
256+ if len (idAccum ) == 0 {
257+ return nil
258+ }
259+
260+ namespace := prevDBName + "." + prevCollName
261+
262+ err := verifier .InsertDocumentRecheckTask (
263+ idAccum ,
264+ types .ByteCount (dataSizeAccum ),
265+ namespace ,
266+ )
267+ if err != nil {
268+ return errors .Wrapf (
269+ err ,
270+ "failed to create a %d-document recheck task for collection %#q" ,
271+ len (idAccum ),
272+ namespace ,
273+ )
274+ }
275+
276+ verifier .logger .Debug ().
277+ Str ("namespace" , namespace ).
278+ Int ("numDocuments" , len (idAccum )).
279+ Str ("dataSize" , reportutils .FmtBytes (dataSizeAccum )).
280+ Msg ("Created document recheck task." )
281+
282+ return nil
283+ }
284+
215285 // We group these here using a sort rather than using aggregate because aggregate is
216286 // subject to a 16MB limit on group size.
217287 for cursor .Next (ctx ) {
218- err := cursor .Err ()
219- if err != nil {
220- return err
221- }
222288 var doc RecheckDoc
223289 err = cursor .Decode (& doc )
224290 if err != nil {
225291 return err
226292 }
293+
227294 idRaw := cursor .Current .Lookup ("_id" , "docID" )
228295 idLen := len (idRaw .Value )
229296
230- verifier .logger .Debug ().Msgf ("Found persisted recheck doc for %s.%s" , doc .PrimaryKey .DatabaseName , doc .PrimaryKey .CollectionName )
231-
297+ // We persist rechecks if any of these happen:
298+ // - the namespace has changed
299+ // - we’ve reached the per-task recheck maximum
300+ // - the buffered document IDs’ size exceeds the per-task maximum
301+ // - the buffered documents exceed the partition size
302+ //
232303 if doc .PrimaryKey .DatabaseName != prevDBName ||
233304 doc .PrimaryKey .CollectionName != prevCollName ||
234- idLenAccum >= maxIdsSize ||
305+ int64 (len (idAccum )) > maxDocsPerTask ||
306+ idLenAccum >= maxIdsPerRecheckTask ||
235307 dataSizeAccum >= verifier .partitionSizeInBytes {
236- namespace := prevDBName + "." + prevCollName
237- if len (idAccum ) > 0 {
238- err := verifier .InsertFailedIdsVerificationTask (idAccum , types .ByteCount (dataSizeAccum ), namespace )
239- if err != nil {
240- return err
241- }
242- verifier .logger .Debug ().Msgf (
243- "Created ID verification task for namespace %s with %d ids, " +
244- "%d id bytes and %d data bytes" ,
245- namespace , len (idAccum ), idLenAccum , dataSizeAccum )
308+
309+ err := persistBufferedRechecks ()
310+ if err != nil {
311+ return err
246312 }
313+
247314 prevDBName = doc .PrimaryKey .DatabaseName
248315 prevCollName = doc .PrimaryKey .CollectionName
249316 idLenAccum = 0
250317 dataSizeAccum = 0
251- idAccum = [] interface {}{}
318+ idAccum = idAccum [: 0 ]
252319 }
320+
253321 idLenAccum += idLen
254322 dataSizeAccum += int64 (doc .DataSize )
255323 idAccum = append (idAccum , doc .PrimaryKey .DocumentID )
256324 }
257- if len (idAccum ) > 0 {
258- namespace := prevDBName + "." + prevCollName
259- err := verifier .InsertFailedIdsVerificationTask (idAccum , types .ByteCount (dataSizeAccum ), namespace )
260- if err != nil {
261- return err
262- }
263- verifier .logger .Debug ().Msgf (
264- "Created ID verification task for namespace %s with %d ids, " +
265- "%d id bytes and %d data bytes" ,
266- namespace , len (idAccum ), idLenAccum , dataSizeAccum )
325+
326+ err = cursor .Err ()
327+ if err != nil {
328+ return err
267329 }
268- return nil
330+
331+ return persistBufferedRechecks ()
269332}
0 commit comments