Skip to content

Commit bf92401

Browse files
committed
can’t do transactions for all
1 parent 1200928 commit bf92401

File tree

4 files changed

+108
-91
lines changed

4 files changed

+108
-91
lines changed

internal/verifier/check.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
229229
}
230230
verifier.generation++
231231
verifier.phase = Recheck
232+
232233
err = verifier.GenerateRecheckTasks(ctx)
233234
if err != nil {
234235
verifier.mux.Unlock()
@@ -292,8 +293,8 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
292293

293294
err := verifier.doInMetaTransaction(
294295
ctx,
295-
func(sctx mongo.SessionContext) error {
296-
isPrimary, err := verifier.CheckIsPrimary(ctx)
296+
func(ctx context.Context, metaCtx mongo.SessionContext) error {
297+
isPrimary, err := verifier.CheckIsPrimary(metaCtx)
297298
if err != nil {
298299
return err
299300
}
@@ -308,15 +309,15 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
308309
}
309310
}
310311
for _, src := range verifier.srcNamespaces {
311-
_, err := verifier.InsertCollectionVerificationTask(ctx, src)
312+
_, err := verifier.InsertCollectionVerificationTask(metaCtx, src)
312313
if err != nil {
313314
return errors.Wrap(err, "failed to insert collection verification task")
314315
}
315316
}
316317

317318
verifier.gen0PendingCollectionTasks.Store(int32(len(verifier.srcNamespaces)))
318319

319-
err = verifier.UpdatePrimaryTaskComplete(ctx)
320+
err = verifier.UpdatePrimaryTaskComplete(metaCtx)
320321
if err != nil {
321322
return errors.Wrap(err, "failed to set primary task to complete")
322323
}
@@ -368,12 +369,12 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait
368369
default:
369370
err := verifier.doInMetaTransaction(
370371
ctx,
371-
func(sctx mongo.SessionContext) error {
372-
return verifier.workInTransaction(sctx, workerNum)
372+
func(ctx context.Context, metaCtx mongo.SessionContext) error {
373+
return verifier.workInTransaction(ctx, metaCtx, workerNum)
373374
},
374375
)
375376

376-
if err != nil {
377+
if err != nil && !errors.Is(err, context.Canceled) {
377378
verifier.logger.Fatal().Err(err).Send()
378379
}
379380
}
@@ -382,7 +383,7 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait
382383

383384
func (verifier *Verifier) doInMetaTransaction(
384385
ctx context.Context,
385-
todo func(sctx mongo.SessionContext) error,
386+
todo func(ctx context.Context, metaCtx mongo.SessionContext) error,
386387
) error {
387388
session, err := verifier.metaClient.StartSession()
388389
if err != nil {
@@ -392,17 +393,18 @@ func (verifier *Verifier) doInMetaTransaction(
392393
defer session.EndSession(ctx)
393394

394395
_, err = session.WithTransaction(ctx, func(sctx mongo.SessionContext) (any, error) {
395-
return nil, todo(sctx)
396+
return nil, todo(ctx, sctx)
396397
})
397398

398399
return err
399400
}
400401

401402
func (verifier *Verifier) workInTransaction(
402403
ctx context.Context,
404+
metaCtx mongo.SessionContext,
403405
workerNum int,
404406
) error {
405-
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
407+
task, err := verifier.FindNextVerifyTaskAndUpdate(metaCtx)
406408
if errors.Is(err, mongo.ErrNoDocuments) {
407409
verifier.logger.Debug().Msgf("[Worker %d] No tasks found, sleeping...", workerNum)
408410
time.Sleep(verifier.workerSleepDelayMillis * time.Millisecond)
@@ -412,15 +414,15 @@ func (verifier *Verifier) workInTransaction(
412414
}
413415

414416
if task.Type == verificationTaskVerifyCollection {
415-
verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
417+
verifier.ProcessCollectionVerificationTask(ctx, metaCtx, workerNum, task)
416418
if task.Generation == 0 {
417419
newVal := verifier.gen0PendingCollectionTasks.Add(-1)
418420
if newVal == 0 {
419421
verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete)
420422
}
421423
}
422424
} else {
423-
verifier.ProcessVerifyTask(ctx, workerNum, task)
425+
verifier.ProcessVerifyTask(ctx, metaCtx, workerNum, task)
424426
}
425427

426428
return nil

internal/verifier/migration_verifier.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,12 @@ func (verifier *Verifier) compareOneDocument(srcClientDoc, dstClientDoc bson.Raw
632632
}}, nil
633633
}
634634

635-
func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, task *VerificationTask) {
635+
func (verifier *Verifier) ProcessVerifyTask(
636+
ctx context.Context,
637+
metaCtx mongo.SessionContext,
638+
workerNum int,
639+
task *VerificationTask,
640+
) {
636641
verifier.logger.Debug().Msgf("[Worker %d] Processing verify task", workerNum)
637642

638643
problems, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(ctx, task)
@@ -690,7 +695,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
690695
}
691696
}
692697

693-
err = verifier.UpdateVerificationTask(ctx, task)
698+
err = verifier.UpdateVerificationTask(metaCtx, task)
694699
if err != nil {
695700
verifier.logger.Error().Msgf("Failed updating verification status: %v", err)
696701
}
@@ -967,11 +972,11 @@ func nilableToString[T any](ptr *T) string {
967972
return fmt.Sprintf("%v", *ptr)
968973
}
969974

970-
func (verifier *Verifier) ProcessCollectionVerificationTask(ctx context.Context, workerNum int, task *VerificationTask) {
975+
func (verifier *Verifier) ProcessCollectionVerificationTask(ctx context.Context, metaCtx mongo.SessionContext, workerNum int, task *VerificationTask) {
971976
verifier.logger.Debug().Msgf("[Worker %d] Processing collection", workerNum)
972977

973-
verifier.verifyMetadataAndPartitionCollection(ctx, workerNum, task)
974-
err := verifier.UpdateVerificationTask(ctx, task)
978+
verifier.verifyMetadataAndPartitionCollection(ctx, metaCtx, workerNum, task)
979+
err := verifier.UpdateVerificationTask(metaCtx, task)
975980

976981
if err != nil {
977982
verifier.logger.Fatal().Err(err).
@@ -1040,7 +1045,7 @@ func verifyIndexes(ctx context.Context, _ int, _ *VerificationTask, srcColl, dst
10401045
return results, nil
10411046
}
10421047

1043-
func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Context, workerNum int, task *VerificationTask) {
1048+
func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Context, metaSess mongo.Session, workerNum int, task *VerificationTask) {
10441049
srcColl := verifier.srcClientCollection(task)
10451050
dstColl := verifier.dstClientCollection(task)
10461051
srcNs := FullName(srcColl)
@@ -1060,8 +1065,10 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte
10601065
return
10611066
}
10621067

1068+
metadataCtx := mongo.NewSessionContext(ctx, metaSess)
1069+
10631070
insertFailedCollection := func() {
1064-
_, err := verifier.InsertFailedCollectionVerificationTask(ctx, srcNs)
1071+
_, err := verifier.InsertFailedCollectionVerificationTask(metadataCtx, srcNs)
10651072
if err != nil {
10661073
verifier.
10671074
logger.
@@ -1129,7 +1136,7 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte
11291136
task.SourceByteCount = bytesCount
11301137

11311138
for _, partition := range partitions {
1132-
_, err := verifier.InsertPartitionVerificationTask(ctx, partition, shardKeys, dstNs)
1139+
_, err := verifier.InsertPartitionVerificationTask(metadataCtx, partition, shardKeys, dstNs)
11331140
if err != nil {
11341141
task.Status = verificationTaskFailed
11351142
verifier.logger.Error().Msgf("[Worker %d] Error inserting verifier tasks: %+v", workerNum, err)

internal/verifier/recheck.go

Lines changed: 66 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -125,46 +125,70 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
125125

126126
verifier.logger.Debug().Msgf("Creating recheck tasks from generation %d’s %s documents", prevGeneration, recheckQueue)
127127

128-
// We generate one recheck task per collection, unless
129-
// 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
130-
// the 16MB BSON limit)
131-
// 2) The size of the data would exceed our desired partition size. This limits memory use
132-
// during the recheck phase.
133-
prevDBName, prevCollName := "", ""
134-
var idAccum []interface{}
135-
var idLenAccum int
136-
var dataSizeAccum int64
137-
const maxIdsSize = 12 * 1024 * 1024
138-
cursor, err := verifier.verificationDatabase().Collection(recheckQueue).Find(
139-
ctx, bson.D{{"_id.generation", prevGeneration}}, options.Find().SetSort(bson.D{{"_id", 1}}))
140-
if err != nil {
141-
return err
142-
}
143-
defer cursor.Close(ctx)
144-
// We group these here using a sort rather than using aggregate because aggregate is
145-
// subject to a 16MB limit on group size.
146-
for cursor.Next(ctx) {
147-
err := cursor.Err()
148-
if err != nil {
149-
return err
150-
}
151-
var doc RecheckDoc
152-
err = cursor.Decode(&doc)
153-
if err != nil {
154-
return err
155-
}
156-
idRaw := cursor.Current.Lookup("_id", "docID")
157-
idLen := len(idRaw.Value)
158-
159-
verifier.logger.Debug().Msgf("Found persisted recheck doc for %s.%s", doc.PrimaryKey.DatabaseName, doc.PrimaryKey.CollectionName)
160-
161-
if doc.PrimaryKey.DatabaseName != prevDBName ||
162-
doc.PrimaryKey.CollectionName != prevCollName ||
163-
idLenAccum >= maxIdsSize ||
164-
dataSizeAccum >= verifier.partitionSizeInBytes {
165-
namespace := prevDBName + "." + prevCollName
128+
return verifier.doInMetaTransaction(
129+
ctx,
130+
func(_ context.Context, metaCtx mongo.SessionContext) error {
131+
// We generate one recheck task per collection, unless
132+
// 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
133+
// the 16MB BSON limit)
134+
// 2) The size of the data would exceed our desired partition size. This limits memory use
135+
// during the recheck phase.
136+
prevDBName, prevCollName := "", ""
137+
var idAccum []interface{}
138+
var idLenAccum int
139+
var dataSizeAccum int64
140+
const maxIdsSize = 12 * 1024 * 1024
141+
cursor, err := verifier.verificationDatabase().Collection(recheckQueue).Find(
142+
ctx, bson.D{{"_id.generation", prevGeneration}}, options.Find().SetSort(bson.D{{"_id", 1}}))
143+
if err != nil {
144+
return err
145+
}
146+
defer cursor.Close(ctx)
147+
// We group these here using a sort rather than using aggregate because aggregate is
148+
// subject to a 16MB limit on group size.
149+
for cursor.Next(ctx) {
150+
err := cursor.Err()
151+
if err != nil {
152+
return err
153+
}
154+
var doc RecheckDoc
155+
err = cursor.Decode(&doc)
156+
if err != nil {
157+
return err
158+
}
159+
idRaw := cursor.Current.Lookup("_id", "docID")
160+
idLen := len(idRaw.Value)
161+
162+
verifier.logger.Debug().Msgf("Found persisted recheck doc for %s.%s", doc.PrimaryKey.DatabaseName, doc.PrimaryKey.CollectionName)
163+
164+
if doc.PrimaryKey.DatabaseName != prevDBName ||
165+
doc.PrimaryKey.CollectionName != prevCollName ||
166+
idLenAccum >= maxIdsSize ||
167+
dataSizeAccum >= verifier.partitionSizeInBytes {
168+
namespace := prevDBName + "." + prevCollName
169+
if len(idAccum) > 0 {
170+
err := verifier.InsertFailedIdsVerificationTask(metaCtx, idAccum, types.ByteCount(dataSizeAccum), namespace)
171+
if err != nil {
172+
return err
173+
}
174+
verifier.logger.Debug().Msgf(
175+
"Created ID verification task for namespace %s with %d ids, "+
176+
"%d id bytes and %d data bytes",
177+
namespace, len(idAccum), idLenAccum, dataSizeAccum)
178+
}
179+
prevDBName = doc.PrimaryKey.DatabaseName
180+
prevCollName = doc.PrimaryKey.CollectionName
181+
idLenAccum = 0
182+
dataSizeAccum = 0
183+
idAccum = []interface{}{}
184+
}
185+
idLenAccum += idLen
186+
dataSizeAccum += int64(doc.DataSize)
187+
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)
188+
}
166189
if len(idAccum) > 0 {
167-
err := verifier.InsertFailedIdsVerificationTask(ctx, idAccum, types.ByteCount(dataSizeAccum), namespace)
190+
namespace := prevDBName + "." + prevCollName
191+
err := verifier.InsertFailedIdsVerificationTask(metaCtx, idAccum, types.ByteCount(dataSizeAccum), namespace)
168192
if err != nil {
169193
return err
170194
}
@@ -173,26 +197,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
173197
"%d id bytes and %d data bytes",
174198
namespace, len(idAccum), idLenAccum, dataSizeAccum)
175199
}
176-
prevDBName = doc.PrimaryKey.DatabaseName
177-
prevCollName = doc.PrimaryKey.CollectionName
178-
idLenAccum = 0
179-
dataSizeAccum = 0
180-
idAccum = []interface{}{}
181-
}
182-
idLenAccum += idLen
183-
dataSizeAccum += int64(doc.DataSize)
184-
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)
185-
}
186-
if len(idAccum) > 0 {
187-
namespace := prevDBName + "." + prevCollName
188-
err := verifier.InsertFailedIdsVerificationTask(ctx, idAccum, types.ByteCount(dataSizeAccum), namespace)
189-
if err != nil {
190-
return err
191-
}
192-
verifier.logger.Debug().Msgf(
193-
"Created ID verification task for namespace %s with %d ids, "+
194-
"%d id bytes and %d data bytes",
195-
namespace, len(idAccum), idLenAccum, dataSizeAccum)
196-
}
197-
return nil
200+
return nil
201+
},
202+
)
198203
}

0 commit comments

Comments
 (0)