Skip to content

Commit 7b19d5e

Browse files
authored
Merge pull request #159 from mongodb-labs/revert-148-REP-6766-batch-cursor-compare
Batch cursors effectively require a direct connection. That sacrifices the driver’s features to safeguard availability. We should find an alternative solution to the memory-efficiency problem.
2 parents b22c9c0 + fa5afd2 commit 7b19d5e

File tree

4 files changed

+57
-400
lines changed

4 files changed

+57
-400
lines changed

internal/verifier/compare.go

Lines changed: 52 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"iter"
87
"time"
98

109
"github.com/10gen/migration-verifier/chanutil"
@@ -13,7 +12,6 @@ import (
1312
"github.com/10gen/migration-verifier/internal/retry"
1413
"github.com/10gen/migration-verifier/internal/types"
1514
"github.com/10gen/migration-verifier/internal/util"
16-
"github.com/10gen/migration-verifier/mmongo/cursor"
1715
"github.com/10gen/migration-verifier/option"
1816
"github.com/pkg/errors"
1917
"go.mongodb.org/mongo-driver/v2/bson"
@@ -32,11 +30,6 @@ const (
3230
docKeyInHashedCompare = "k"
3331
)
3432

35-
type seqWithTs struct {
36-
seq iter.Seq2[bson.Raw, error]
37-
ts bson.Timestamp
38-
}
39-
4033
type docWithTs struct {
4134
doc bson.Raw
4235
ts bson.Timestamp
@@ -52,7 +45,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
5245
types.ByteCount,
5346
error,
5447
) {
55-
var srcChannel, dstChannel <-chan seqWithTs
48+
var srcChannel, dstChannel <-chan docWithTs
5649
var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error
5750

5851
results := []VerificationResult{}
@@ -107,7 +100,7 @@ func (verifier *Verifier) compareDocsFromChannels(
107100
workerNum int,
108101
fi *retry.FuncInfo,
109102
task *VerificationTask,
110-
srcChannel, dstChannel <-chan seqWithTs,
103+
srcChannel, dstChannel <-chan docWithTs,
111104
) (
112105
[]VerificationResult,
113106
types.DocumentCount,
@@ -211,7 +204,7 @@ func (verifier *Verifier) compareDocsFromChannels(
211204
for !srcClosed || !dstClosed {
212205
simpleTimerReset(readTimer, readTimeout)
213206

214-
var srcDocsWithTs, dstDocsWithTs seqWithTs
207+
var srcDocWithTs, dstDocWithTs docWithTs
215208

216209
eg, egCtx := contextplus.ErrGroup(ctx)
217210

@@ -226,13 +219,21 @@ func (verifier *Verifier) compareDocsFromChannels(
226219
"failed to read from source after %s",
227220
readTimeout,
228221
)
229-
case srcDocsWithTs, alive = <-srcChannel:
222+
case srcDocWithTs, alive = <-srcChannel:
230223
if !alive {
231224
srcClosed = true
232225
break
233226
}
234227

235228
fi.NoteSuccess("received document from source")
229+
230+
srcDocCount++
231+
srcByteCount += types.ByteCount(len(srcDocWithTs.doc))
232+
verifier.workerTracker.SetSrcCounts(
233+
workerNum,
234+
srcDocCount,
235+
srcByteCount,
236+
)
236237
}
237238

238239
return nil
@@ -250,7 +251,7 @@ func (verifier *Verifier) compareDocsFromChannels(
250251
"failed to read from destination after %s",
251252
readTimeout,
252253
)
253-
case dstDocsWithTs, alive = <-dstChannel:
254+
case dstDocWithTs, alive = <-dstChannel:
254255
if !alive {
255256
dstClosed = true
256257
break
@@ -270,72 +271,32 @@ func (verifier *Verifier) compareDocsFromChannels(
270271
)
271272
}
272273

273-
if srcDocsWithTs.seq != nil {
274-
for doc, err := range srcDocsWithTs.seq {
275-
if err != nil {
276-
return nil, 0, 0, errors.Wrapf(
277-
err,
278-
"reading batch of docs from source (task: %s)",
279-
task.PrimaryKey,
280-
)
281-
}
274+
if srcDocWithTs.doc != nil {
275+
err := handleNewDoc(srcDocWithTs, true)
282276

283-
srcDocCount++
284-
srcByteCount += types.ByteCount(len(doc))
285-
verifier.workerTracker.SetSrcCounts(
286-
workerNum,
287-
srcDocCount,
288-
srcByteCount,
289-
)
277+
if err != nil {
290278

291-
err := handleNewDoc(
292-
docWithTs{
293-
doc: doc,
294-
ts: srcDocsWithTs.ts,
295-
},
296-
true,
279+
return nil, 0, 0, errors.Wrapf(
280+
err,
281+
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
282+
namespace,
283+
task.PrimaryKey,
284+
srcDocWithTs.doc.Lookup("_id"),
297285
)
298-
299-
if err != nil {
300-
return nil, 0, 0, errors.Wrapf(
301-
err,
302-
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
303-
namespace,
304-
task.PrimaryKey,
305-
doc.Lookup("_id"),
306-
)
307-
}
308286
}
309-
310287
}
311288

312-
if dstDocsWithTs.seq != nil {
313-
for doc, err := range dstDocsWithTs.seq {
314-
if err != nil {
315-
return nil, 0, 0, errors.Wrapf(
316-
err,
317-
"reading batch of docs from destination (task: %s)",
318-
task.PrimaryKey,
319-
)
320-
}
289+
if dstDocWithTs.doc != nil {
290+
err := handleNewDoc(dstDocWithTs, false)
321291

322-
err := handleNewDoc(
323-
docWithTs{
324-
doc: doc,
325-
ts: dstDocsWithTs.ts,
326-
},
327-
false,
292+
if err != nil {
293+
return nil, 0, 0, errors.Wrapf(
294+
err,
295+
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
296+
namespace,
297+
task.PrimaryKey,
298+
dstDocWithTs.doc.Lookup("_id"),
328299
)
329-
330-
if err != nil {
331-
return nil, 0, 0, errors.Wrapf(
332-
err,
333-
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
334-
namespace,
335-
task.PrimaryKey,
336-
doc.Lookup("_id"),
337-
)
338-
}
339300
}
340301
}
341302
}
@@ -466,13 +427,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) {
466427
func (verifier *Verifier) getFetcherChannelsAndCallbacks(
467428
task *VerificationTask,
468429
) (
469-
<-chan seqWithTs,
470-
<-chan seqWithTs,
430+
<-chan docWithTs,
431+
<-chan docWithTs,
471432
func(context.Context, *retry.FuncInfo) error,
472433
func(context.Context, *retry.FuncInfo) error,
473434
) {
474-
srcChannel := make(chan seqWithTs)
475-
dstChannel := make(chan seqWithTs)
435+
srcChannel := make(chan docWithTs)
436+
dstChannel := make(chan docWithTs)
476437

477438
readSrcCallback := func(ctx context.Context, state *retry.FuncInfo) error {
478439
// We open a session here so that we can read the session’s cluster
@@ -549,44 +510,38 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
549510
}
550511

551512
func iterateCursorToChannel(
552-
ctx context.Context,
513+
sctx context.Context,
553514
state *retry.FuncInfo,
554-
myCursor *cursor.BatchCursor,
555-
writer chan<- seqWithTs,
515+
cursor *mongo.Cursor,
516+
writer chan<- docWithTs,
556517
) error {
557518
defer close(writer)
558519

559-
for {
560-
seq := myCursor.GetCurrentBatchIterator()
520+
sess := mongo.SessionFromContext(sctx)
561521

522+
for cursor.Next(sctx) {
562523
state.NoteSuccess("received a document")
563524

564-
ct, err := myCursor.GetClusterTime()
525+
clusterTime, err := util.GetClusterTimeFromSession(sess)
565526
if err != nil {
566-
return errors.Wrap(err, "reading cluster time from batch")
527+
return errors.Wrap(err, "reading cluster time from session")
567528
}
568529

569530
err = chanutil.WriteWithDoneCheck(
570-
ctx,
531+
sctx,
571532
writer,
572-
seqWithTs{
573-
seq: seq,
574-
ts: ct,
533+
docWithTs{
534+
doc: slices.Clone(cursor.Current),
535+
ts: clusterTime,
575536
},
576537
)
577538

578539
if err != nil {
579-
return errors.Wrapf(err, "sending iterator to compare thread")
580-
}
581-
582-
if myCursor.IsFinished() {
583-
return nil
584-
}
585-
586-
if err := myCursor.GetNext(ctx); err != nil {
587-
return errors.Wrap(err, "failed to iterate cursor")
540+
return errors.Wrapf(err, "sending document to compare thread")
588541
}
589542
}
543+
544+
return errors.Wrap(cursor.Err(), "failed to iterate cursor")
590545
}
591546

592547
func getMapKey(docKeyValues []bson.RawValue) string {
@@ -600,13 +555,8 @@ func getMapKey(docKeyValues []bson.RawValue) string {
600555
return keyBuffer.String()
601556
}
602557

603-
func (verifier *Verifier) getDocumentsCursor(
604-
ctx context.Context,
605-
collection *mongo.Collection,
606-
clusterInfo *util.ClusterInfo,
607-
startAtTs *bson.Timestamp,
608-
task *VerificationTask,
609-
) (*cursor.BatchCursor, error) {
558+
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo,
559+
startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
610560
var findOptions bson.D
611561
runCommandOptions := options.RunCmd()
612562
var andPredicates bson.A
@@ -723,16 +673,7 @@ func (verifier *Verifier) getDocumentsCursor(
723673
}
724674
}
725675

726-
c, err := cursor.New(
727-
collection.Database(),
728-
collection.Database().RunCommand(ctx, cmd, runCommandOptions),
729-
)
730-
731-
if err == nil {
732-
c.SetSession(mongo.SessionFromContext(ctx))
733-
}
734-
735-
return c, err
676+
return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions)
736677
}
737678

738679
func transformPipelineForToHashedIndexKey(

internal/verifier/migration_verifier_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/10gen/migration-verifier/internal/types"
2727
"github.com/10gen/migration-verifier/internal/util"
2828
"github.com/10gen/migration-verifier/mbson"
29-
"github.com/10gen/migration-verifier/mseq"
3029
"github.com/10gen/migration-verifier/mslices"
3130
"github.com/cespare/permute/v2"
3231
"github.com/rs/zerolog"
@@ -1151,15 +1150,13 @@ func TestVerifierCompareDocs(t *testing.T) {
11511150

11521151
namespace := "testdb.testns"
11531152

1154-
makeDocChannel := func(docs []bson.D) <-chan seqWithTs {
1155-
theChan := make(chan seqWithTs, len(docs))
1153+
makeDocChannel := func(docs []bson.D) <-chan docWithTs {
1154+
theChan := make(chan docWithTs, len(docs))
11561155

11571156
for d, doc := range docs {
1158-
theChan <- seqWithTs{
1159-
seq: mseq.FromSliceWithNilErr(
1160-
mslices.Of(testutil.MustMarshal(doc)),
1161-
),
1162-
ts: bson.Timestamp{1, uint32(d)},
1157+
theChan <- docWithTs{
1158+
doc: testutil.MustMarshal(doc),
1159+
ts: bson.Timestamp{1, uint32(d)},
11631160
}
11641161
}
11651162

0 commit comments

Comments
 (0)