@@ -3,15 +3,20 @@ package verifier
33import (
44 "context"
55
6+ "github.com/10gen/migration-verifier/internal/retry"
67 "github.com/10gen/migration-verifier/internal/types"
8+ "github.com/pkg/errors"
9+ "github.com/samber/lo"
710 "go.mongodb.org/mongo-driver/bson"
811 "go.mongodb.org/mongo-driver/mongo"
912 "go.mongodb.org/mongo-driver/mongo/options"
13+ "golang.org/x/sync/errgroup"
1014)
1115
1216const (
13- recheckQueue = "recheckQueue"
14- maxBSONObjSize = 16 * 1024 * 1024
17+ recheckQueue = "recheckQueue"
18+ maxBSONObjSize = 16 * 1024 * 1024
19+ recheckInserterThreadsSoftMax = 100
1520)
1621
1722// RecheckPrimaryKey stores the implicit type of recheck to perform
@@ -51,6 +56,18 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
5156 )
5257}
5358
59+ // This will split the given slice into *roughly* the given number of chunks.
60+ // It may end up being more or fewer, but it should be pretty close.
61+ func splitToChunks [T any , Slice ~ []T ](elements Slice , numChunks int ) []Slice {
62+ elsPerChunk := len (elements ) / numChunks
63+
64+ if elsPerChunk == 0 {
65+ elsPerChunk = 1
66+ }
67+
68+ return lo .Chunk (elements , elsPerChunk )
69+ }
70+
5471func (verifier * Verifier ) insertRecheckDocs (
5572 ctx context.Context ,
5673 dbNames []string ,
@@ -63,38 +80,78 @@ func (verifier *Verifier) insertRecheckDocs(
6380
6481 generation , _ := verifier .getGenerationWhileLocked ()
6582
66- models := []mongo.WriteModel {}
67- for i , documentID := range documentIDs {
68- pk := RecheckPrimaryKey {
69- Generation : generation ,
70- DatabaseName : dbNames [i ],
71- CollectionName : collNames [i ],
72- DocumentID : documentID ,
73- }
83+ docIDIndexes := lo .Range (len (documentIDs ))
84+ indexesPerThread := splitToChunks (docIDIndexes , recheckInserterThreadsSoftMax )
7485
75- // The filter must exclude DataSize; otherwise, if a failed comparison
76- // and a change event happen on the same document for the same
77- // generation, the 2nd insert will fail because a) its filter won’t
78- // match anything, and b) it’ll try to insert a new document with the
79- // same _id as the one that the 1st insert already created.
80- filterDoc := bson.D {{"_id" , pk }}
86+ eg , groupCtx := errgroup .WithContext (ctx )
8187
82- recheckDoc := RecheckDoc {
83- PrimaryKey : pk ,
84- DataSize : dataSizes [i ],
85- }
88+ for _ , curThreadIndexes := range indexesPerThread {
89+ curThreadIndexes := curThreadIndexes
90+
91+ eg .Go (func () error {
92+ models := make ([]mongo.WriteModel , len (curThreadIndexes ))
93+ for m , i := range curThreadIndexes {
94+ pk := RecheckPrimaryKey {
95+ Generation : generation ,
96+ DatabaseName : dbNames [i ],
97+ CollectionName : collNames [i ],
98+ DocumentID : documentIDs [i ],
99+ }
100+
101+ // The filter must exclude DataSize; otherwise, if a failed comparison
102+ // and a change event happen on the same document for the same
103+ // generation, the 2nd insert will fail because a) its filter won’t
104+ // match anything, and b) it’ll try to insert a new document with the
105+ // same _id as the one that the 1st insert already created.
106+ filterDoc := bson.D {{"_id" , pk }}
107+
108+ recheckDoc := RecheckDoc {
109+ PrimaryKey : pk ,
110+ DataSize : dataSizes [i ],
111+ }
112+
113+ models [m ] = mongo .NewReplaceOneModel ().
114+ SetFilter (filterDoc ).
115+ SetReplacement (recheckDoc ).
116+ SetUpsert (true )
117+ }
118+
119+ retryer := retry .New (retry .DefaultDurationLimit )
120+ err := retryer .RunForTransientErrorsOnly (
121+ groupCtx ,
122+ verifier .logger ,
123+ func (_ * retry.Info ) error {
124+ _ , err := verifier .verificationDatabase ().Collection (recheckQueue ).BulkWrite (
125+ groupCtx ,
126+ models ,
127+ options .BulkWrite ().SetOrdered (false ),
128+ )
129+
130+ return err
131+ },
132+ )
86133
87- models = append (models ,
88- mongo .NewReplaceOneModel ().
89- SetFilter (filterDoc ).SetReplacement (recheckDoc ).SetUpsert (true ))
134+ return errors .Wrapf (err , "failed to persist %d recheck(s) for generation %d" , len (models ), generation )
135+ })
90136 }
91- _ , err := verifier .verificationDatabase ().Collection (recheckQueue ).BulkWrite (ctx , models )
92137
93- if err == nil {
94- verifier .logger .Debug ().Msgf ("Persisted %d recheck doc(s) for generation %d" , len (models ), generation )
138+ err := eg .Wait ()
139+
140+ if err != nil {
141+ return errors .Wrapf (
142+ err ,
143+ "failed to persist %d recheck(s) for generation %d" ,
144+ len (documentIDs ),
145+ generation ,
146+ )
95147 }
96148
97- return err
149+ verifier .logger .Debug ().
150+ Int ("generation" , generation ).
151+ Int ("count" , len (documentIDs )).
152+ Msg ("Persisted rechecks." )
153+
154+ return nil
98155}
99156
100157// ClearRecheckDocs deletes the previous generation’s recheck
0 commit comments