@@ -2,16 +2,22 @@ package verifier
22
33import (
44 "context"
5+ "fmt"
56
7+ "github.com/10gen/migration-verifier/internal/retry"
68 "github.com/10gen/migration-verifier/internal/types"
9+ "github.com/pkg/errors"
10+ "github.com/samber/lo"
711 "go.mongodb.org/mongo-driver/bson"
812 "go.mongodb.org/mongo-driver/mongo"
913 "go.mongodb.org/mongo-driver/mongo/options"
14+ "golang.org/x/sync/errgroup"
1015)
1116
1217const (
13- recheckQueue = "recheckQueue"
14- maxBSONObjSize = 16 * 1024 * 1024
18+ recheckQueue = "recheckQueue"
19+ maxBSONObjSize = 16 * 1024 * 1024
20+ recheckInserterThreadsSoftMax = 100
1521)
1622
1723// RecheckPrimaryKey stores the implicit type of recheck to perform
@@ -42,6 +48,10 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
4248 collNames [i ] = collName
4349 }
4450
51+ verifier .logger .Debug ().
52+ Int ("count" , len (documentIDs )).
53+ Msg ("Persisting rechecks for mismatched or missing documents." )
54+
4555 return verifier .insertRecheckDocs (
4656 context .Background (),
4757 dbNames ,
@@ -51,6 +61,22 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
5161 )
5262}
5363
64+ // This will split the given slice into *roughly* the given number of chunks.
65+ // It may end up being more or fewer, but it should be pretty close.
66+ func splitToChunks [T any , Slice ~ []T ](elements Slice , numChunks int ) []Slice {
67+ if numChunks < 1 {
68+ panic (fmt .Sprintf ("numChunks (%v) should be >=1" , numChunks ))
69+ }
70+
71+ elsPerChunk := len (elements ) / numChunks
72+
73+ if elsPerChunk == 0 {
74+ elsPerChunk = 1
75+ }
76+
77+ return lo .Chunk (elements , elsPerChunk )
78+ }
79+
5480func (verifier * Verifier ) insertRecheckDocs (
5581 ctx context.Context ,
5682 dbNames []string ,
@@ -63,38 +89,78 @@ func (verifier *Verifier) insertRecheckDocs(
6389
6490 generation , _ := verifier .getGenerationWhileLocked ()
6591
66- models := []mongo.WriteModel {}
67- for i , documentID := range documentIDs {
68- pk := RecheckPrimaryKey {
69- Generation : generation ,
70- DatabaseName : dbNames [i ],
71- CollectionName : collNames [i ],
72- DocumentID : documentID ,
73- }
92+ docIDIndexes := lo .Range (len (documentIDs ))
93+ indexesPerThread := splitToChunks (docIDIndexes , recheckInserterThreadsSoftMax )
7494
75- // The filter must exclude DataSize; otherwise, if a failed comparison
76- // and a change event happen on the same document for the same
77- // generation, the 2nd insert will fail because a) its filter won’t
78- // match anything, and b) it’ll try to insert a new document with the
79- // same _id as the one that the 1st insert already created.
80- filterDoc := bson.D {{"_id" , pk }}
95+ eg , groupCtx := errgroup .WithContext (ctx )
8196
82- recheckDoc := RecheckDoc {
83- PrimaryKey : pk ,
84- DataSize : dataSizes [i ],
85- }
97+ for _ , curThreadIndexes := range indexesPerThread {
98+ curThreadIndexes := curThreadIndexes
99+
100+ eg .Go (func () error {
101+ models := make ([]mongo.WriteModel , len (curThreadIndexes ))
102+ for m , i := range curThreadIndexes {
103+ pk := RecheckPrimaryKey {
104+ Generation : generation ,
105+ DatabaseName : dbNames [i ],
106+ CollectionName : collNames [i ],
107+ DocumentID : documentIDs [i ],
108+ }
109+
110+ // The filter must exclude DataSize; otherwise, if a failed comparison
111+ // and a change event happen on the same document for the same
112+ // generation, the 2nd insert will fail because a) its filter won’t
113+ // match anything, and b) it’ll try to insert a new document with the
114+ // same _id as the one that the 1st insert already created.
115+ filterDoc := bson.D {{"_id" , pk }}
86116
87- models = append (models ,
88- mongo .NewReplaceOneModel ().
89- SetFilter (filterDoc ).SetReplacement (recheckDoc ).SetUpsert (true ))
117+ recheckDoc := RecheckDoc {
118+ PrimaryKey : pk ,
119+ DataSize : dataSizes [i ],
120+ }
121+
122+ models [m ] = mongo .NewReplaceOneModel ().
123+ SetFilter (filterDoc ).
124+ SetReplacement (recheckDoc ).
125+ SetUpsert (true )
126+ }
127+
128+ retryer := retry .New (retry .DefaultDurationLimit )
129+ err := retryer .RunForTransientErrorsOnly (
130+ groupCtx ,
131+ verifier .logger ,
132+ func (_ * retry.Info ) error {
133+ _ , err := verifier .verificationDatabase ().Collection (recheckQueue ).BulkWrite (
134+ groupCtx ,
135+ models ,
136+ options .BulkWrite ().SetOrdered (false ),
137+ )
138+
139+ return err
140+ },
141+ )
142+
143+ return errors .Wrapf (err , "failed to persist %d recheck(s) for generation %d" , len (models ), generation )
144+ })
90145 }
91- _ , err := verifier .verificationDatabase ().Collection (recheckQueue ).BulkWrite (ctx , models )
92146
93- if err == nil {
94- verifier .logger .Debug ().Msgf ("Persisted %d recheck doc(s) for generation %d" , len (models ), generation )
147+ err := eg .Wait ()
148+
149+ if err != nil {
150+ return errors .Wrapf (
151+ err ,
152+ "failed to persist %d recheck(s) for generation %d" ,
153+ len (documentIDs ),
154+ generation ,
155+ )
95156 }
96157
97- return err
158+ verifier .logger .Debug ().
159+ Int ("generation" , generation ).
160+ Int ("count" , len (documentIDs )).
161+ Msg ("Persisted rechecks." )
162+
163+ return nil
98164}
99165
100166// ClearRecheckDocs deletes the previous generation’s recheck
0 commit comments