Skip to content

Commit 40f7e07

Browse files
committed
more comments & tweaks
1 parent 1fcb8a8 commit 40f7e07

File tree

1 file changed

+52
-24
lines changed

1 file changed

+52
-24
lines changed

internal/verifier/recheck.go

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@ const (
1919
recheckQueue = "recheckQueue"
2020
maxBSONObjSize = 16 * 1024 * 1024
2121
recheckInserterThreadsSoftMax = 100
22+
maxIdsPerRecheckTask = 12 * 1024 * 1024
2223
)
2324

2425
// RecheckPrimaryKey stores the implicit type of recheck to perform
2526
// Currently, we only handle document mismatches/change stream updates,
2627
// so DatabaseName, CollectionName, and DocumentID must always be specified.
28+
//
29+
// NB: Order is important here so that, within a given generation,
30+
// sorting by _id will guarantee that all rechecks for a given
31+
// namespace appear consecutively.
2732
type RecheckPrimaryKey struct {
2833
Generation int `bson:"generation"`
2934
DatabaseName string `bson:"db"`
@@ -195,11 +200,14 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
195200
func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
196201
prevGeneration := verifier.getPreviousGenerationWhileLocked()
197202

203+
findFilter := bson.D{{"_id.generation", prevGeneration}}
204+
205+
verifier.logger.Debug().
206+
Int("priorGeneration", prevGeneration).
207+
Msgf("Counting prior generation’s enqueued rechecks.")
208+
198209
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
199-
rechecksCount, err := recheckColl.CountDocuments(
200-
ctx,
201-
bson.D{{"_id.generation", prevGeneration}},
202-
)
210+
rechecksCount, err := recheckColl.CountDocuments(ctx, findFilter)
203211
if err != nil {
204212
return errors.Wrapf(err,
205213
"failed to count generation %d’s rechecks",
@@ -220,30 +228,49 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
220228
// 3) The number of documents exceeds $rechecksCount/$numWorkers. We do
221229
// this to prevent one thread from doing all of the rechecks.
222230

223-
prevDBName, prevCollName := "", ""
231+
var prevDBName, prevCollName string
224232
var idAccum []interface{}
225233
var idLenAccum int
226234
var dataSizeAccum int64
227-
const maxIdsSize = 12 * 1024 * 1024
235+
228236
maxDocsPerTask := rechecksCount / int64(verifier.numWorkers)
229237

230238
if maxDocsPerTask < int64(verifier.numWorkers) {
231239
maxDocsPerTask = int64(verifier.numWorkers)
232240
}
233241

242+
// The sort here is important because the recheck _id is an embedded
243+
// document that includes the namespace. Thus, all rechecks for a given
244+
// namespace will be consecutive in this query’s result.
234245
cursor, err := recheckColl.Find(
235-
ctx, bson.D{{"_id.generation", prevGeneration}}, options.Find().SetSort(bson.D{{"_id", 1}}))
246+
ctx,
247+
findFilter,
248+
options.Find().SetSort(bson.D{{"_id", 1}}),
249+
)
236250
if err != nil {
237251
return err
238252
}
239253
defer cursor.Close(ctx)
240254

241-
createRecheckTask := func() error {
255+
persistBufferedRechecks := func() error {
256+
if len(idAccum) == 0 {
257+
return nil
258+
}
259+
242260
namespace := prevDBName + "." + prevCollName
243261

244-
err := verifier.InsertDocumentRecheckTask(idAccum, types.ByteCount(dataSizeAccum), namespace)
262+
err := verifier.InsertDocumentRecheckTask(
263+
idAccum,
264+
types.ByteCount(dataSizeAccum),
265+
namespace,
266+
)
245267
if err != nil {
246-
return err
268+
return errors.Wrapf(
269+
err,
270+
"failed to create a %d-document recheck task for collection %#q",
271+
len(idAccum),
272+
namespace,
273+
)
247274
}
248275

249276
verifier.logger.Debug().
@@ -263,27 +290,34 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
263290
if err != nil {
264291
return err
265292
}
293+
266294
idRaw := cursor.Current.Lookup("_id", "docID")
267295
idLen := len(idRaw.Value)
268296

297+
// We persist rechecks if any of these happen:
298+
// - the namespace has changed
299+
// - we’ve reached the per-task recheck maximum
300+
// - the buffered document IDs’ size exceeds the per-task maximum
301+
// - the buffered documents exceed the partition size
302+
//
269303
if doc.PrimaryKey.DatabaseName != prevDBName ||
270304
doc.PrimaryKey.CollectionName != prevCollName ||
271305
int64(len(idAccum)) > maxDocsPerTask ||
272-
idLenAccum >= maxIdsSize ||
306+
idLenAccum >= maxIdsPerRecheckTask ||
273307
dataSizeAccum >= verifier.partitionSizeInBytes {
274308

275-
if len(idAccum) > 0 {
276-
err := createRecheckTask()
277-
if err != nil {
278-
return err
279-
}
309+
err := persistBufferedRechecks()
310+
if err != nil {
311+
return err
280312
}
313+
281314
prevDBName = doc.PrimaryKey.DatabaseName
282315
prevCollName = doc.PrimaryKey.CollectionName
283316
idLenAccum = 0
284317
dataSizeAccum = 0
285-
idAccum = []interface{}{}
318+
idAccum = idAccum[:0]
286319
}
320+
287321
idLenAccum += idLen
288322
dataSizeAccum += int64(doc.DataSize)
289323
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)
@@ -294,11 +328,5 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
294328
return err
295329
}
296330

297-
if len(idAccum) > 0 {
298-
err := createRecheckTask()
299-
if err != nil {
300-
return err
301-
}
302-
}
303-
return nil
331+
return persistBufferedRechecks()
304332
}

0 commit comments

Comments
 (0)