Skip to content

Commit 917b5c0

Browse files
committed
tests against change stream resumption
1 parent 22cbc00 commit 917b5c0

File tree

1 file changed

+48
-1
lines changed

1 file changed

+48
-1
lines changed

internal/verifier/change_stream_test.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package verifier
33
import (
44
"context"
55
"testing"
6+
"time"
67

8+
"github.com/pkg/errors"
79
"github.com/stretchr/testify/require"
810
"go.mongodb.org/mongo-driver/bson"
911
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -49,11 +51,17 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
4951
Collection("testColl").
5052
InsertOne(
5153
ctx,
52-
bson.D{{"_id", 0}},
54+
bson.D{{"_id", "heyhey"}},
5355
)
5456
suite.Require().NoError(err)
5557

5658
verifier2 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
59+
60+
suite.Require().Empty(
61+
suite.fetchVerifierRechecks(ctx, verifier2),
62+
"no rechecks should be enqueued before starting change stream",
63+
)
64+
5765
err = verifier2.StartChangeStream(ctx)
5866
suite.Require().NoError(err)
5967

@@ -64,6 +72,45 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
6472
*verifier2.srcStartAtTs,
6573
"verifier2's change stream should be 1 increment further than verifier1's",
6674
)
75+
76+
recheckDocs := []bson.M{}
77+
78+
require.Eventually(
79+
suite.T(),
80+
func() bool {
81+
recheckDocs = suite.fetchVerifierRechecks(ctx, verifier2)
82+
83+
return len(recheckDocs) > 0
84+
},
85+
time.Minute,
86+
500*time.Millisecond,
87+
"the verifier should enqueue a recheck",
88+
)
89+
90+
suite.Assert().Equal(
91+
bson.M{
92+
"db": "testDb",
93+
"coll": "testColl",
94+
"generation": int32(0),
95+
"docID": "heyhey",
96+
},
97+
recheckDocs[0]["_id"],
98+
"recheck doc should have expected ID",
99+
)
100+
}
101+
102+
func (suite *MultiSourceVersionTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
103+
recheckDocs := []bson.M{}
104+
105+
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
106+
cursor, err := recheckColl.Find(ctx, bson.D{})
107+
108+
if !errors.Is(err, mongo.ErrNoDocuments) {
109+
suite.Require().NoError(err)
110+
suite.Require().NoError(cursor.All(ctx, &recheckDocs))
111+
}
112+
113+
return recheckDocs
67114
}
68115

69116
func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() {

0 commit comments

Comments
 (0)