@@ -3,7 +3,6 @@ package verifier
33import (
44 "bytes"
55 "context"
6- "reflect"
76
87 "github.com/10gen/migration-verifier/internal/types"
98 "github.com/pkg/errors"
@@ -74,88 +73,45 @@ func (verifier *Verifier) compareDocsFromChannels(
7473 srcCache := map [string ]bson.Raw {}
7574 dstCache := map [string ]bson.Raw {}
7675
77- sCases := []reflect.SelectCase {
78- {
79- Dir : reflect .SelectRecv ,
80- Chan : reflect .ValueOf (ctx .Done ()),
81- },
82- {
83- Dir : reflect .SelectRecv ,
84- Chan : reflect .ValueOf (srcChannel ),
85- },
86- {
87- Dir : reflect .SelectRecv ,
88- Chan : reflect .ValueOf (dstChannel ),
89- },
90- }
91-
92- // Additional data points to parallel the sCases.
93- // These must be kept in the same order.
94- sDetails := []struct {
95- OurMap map [string ]bson.Raw
96- TheirMap map [string ]bson.Raw
97- IsSrc bool
98- }{
99- {}, // ctx.Done()
100- {
101- IsSrc : true ,
102- OurMap : srcCache ,
103- TheirMap : dstCache ,
104- },
105- {
106- OurMap : dstCache ,
107- TheirMap : srcCache ,
108- },
109- }
110-
111- // sCases always includes ctx.Done() as its first element.
112- // If no other channels are left, then we’re done.
113- for len (sCases ) > 1 {
114- chosen , recv , alive := reflect .Select (sCases )
115-
116- if chosen == 0 {
117- return nil , 0 , 0 , ctx .Err ()
118- }
119-
120- if ! alive {
121- sCases = slices .Delete (sCases , chosen , 1 + chosen )
122- sDetails = slices .Delete (sDetails , chosen , 1 + chosen )
123- continue
124- }
125-
126- doc := (recv .Interface ()).(bson.Raw )
76+ // This is the core document-handling logic. It either:
77+ //
78+ // a) caches the new document if its mapKey is unseen, or
79+ // b) compares the new doc against its previously-received, cached
80+ // counterpart and records any mismatch.
81+ handleNewDoc := func (doc bson.Raw , isSrc bool ) error {
82+ mapKey := getMapKey (doc , mapKeyFieldNames )
12783
128- details := sDetails [ chosen ]
84+ var ourMap , theirMap map [ string ]bson. Raw
12985
130- if details .IsSrc {
131- docCount ++
132- byteCount += types .ByteCount (len (doc ))
86+ if isSrc {
87+ ourMap = srcCache
88+ theirMap = dstCache
89+ } else {
90+ ourMap = dstCache
91+ theirMap = srcCache
13392 }
134-
135- mapKey := getMapKey (doc , mapKeyFieldNames )
136-
13793 // See if we've already cached a document with this
13894 // mapKey from the other channel.
139- theirDoc , exists := details . TheirMap [mapKey ]
95+ theirDoc , exists := theirMap [mapKey ]
14096
14197 // If there is no such cached document, then cache the newly-received
14298 // document in our map then proceed to the next document.
14399 //
144100 // (We'll remove the cache entry when/if the other channel yields a
145101 // document with the same mapKey.)
146102 if ! exists {
147- details . OurMap [mapKey ] = doc
148- continue
103+ ourMap [mapKey ] = doc
104+ return nil
149105 }
150106
151107 // We have two documents! First we remove the cache entry. This saves
152108 // memory, but more importantly, it lets us know, once we exhaust the
153109 // channels, which documents were missing on one side or the other.
154- delete (details . TheirMap , mapKey )
110+ delete (theirMap , mapKey )
155111
156112 // Now we determine which document came from whom.
157113 var srcDoc , dstDoc bson.Raw
158- if details . IsSrc {
114+ if isSrc {
159115 srcDoc = doc
160116 dstDoc = theirDoc
161117 } else {
@@ -166,10 +122,50 @@ func (verifier *Verifier) compareDocsFromChannels(
166122 // Finally we compare the documents and save any mismatch report(s).
167123 mismatches , err := verifier .compareOneDocument (srcDoc , dstDoc , namespace )
168124 if err != nil {
169- return nil , 0 , 0 , errors .Wrap (err , "failed to compare documents" )
125+ return errors .Wrap (err , "failed to compare documents" )
170126 }
171127
172128 results = append (results , mismatches ... )
129+
130+ return nil
131+ }
132+
133+ var srcClosed , dstClosed bool
134+ var err error
135+
136+ // We always read src & dst back & forth. This ensures that, if one side
137+ // lags the other significantly, we won’t keep caching the faster side’s
138+ // documents and thus consume more & more memory.
139+ for err == nil && (! srcClosed || ! dstClosed ) {
140+ if ! srcClosed {
141+ select {
142+ case <- ctx .Done ():
143+ return nil , 0 , 0 , ctx .Err ()
144+ case doc , alive := <- srcChannel :
145+ if ! alive {
146+ srcClosed = true
147+ break
148+ }
149+ docCount ++
150+ byteCount += types .ByteCount (len (doc ))
151+
152+ err = handleNewDoc (doc , true )
153+ }
154+ }
155+
156+ if ! dstClosed {
157+ select {
158+ case <- ctx .Done ():
159+ return nil , 0 , 0 , ctx .Err ()
160+ case doc , alive := <- dstChannel :
161+ if ! alive {
162+ dstClosed = true
163+ break
164+ }
165+
166+ err = handleNewDoc (doc , false )
167+ }
168+ }
173169 }
174170
175171 // We got here because both srcChannel and dstChannel are closed,
0 commit comments