66
77 "github.com/stretchr/testify/require"
88 "go.mongodb.org/mongo-driver/bson"
9+ "go.mongodb.org/mongo-driver/bson/primitive"
910 "go.mongodb.org/mongo-driver/mongo"
1011)
1112
@@ -27,6 +28,42 @@ func TestChangeStreamFilter(t *testing.T) {
2728 }, verifier .GetChangeStreamFilter ())
2829}
2930
31+ func (suite * MultiSourceVersionTestSuite ) TestChangeStreamResumability () {
32+ var startTs primitive.Timestamp
33+ func () {
34+ verifier1 := buildVerifier (suite .T (), suite .srcMongoInstance , suite .dstMongoInstance , suite .metaMongoInstance )
35+ ctx , cancel := context .WithCancel (context .Background ())
36+ defer cancel ()
37+ err := verifier1 .StartChangeStream (ctx )
38+ suite .Require ().NoError (err )
39+
40+ suite .Require ().NotNil (verifier1 .srcStartAtTs )
41+ startTs = * verifier1 .srcStartAtTs
42+ }()
43+
44+ ctx , cancel := context .WithCancel (context .Background ())
45+ defer cancel ()
46+
47+ _ , err := suite .srcMongoClient .
48+ Database ("testDb" ).
49+ Collection ("testColl" ).
50+ InsertOne (
51+ ctx ,
52+ bson.D {{"_id" , 0 }},
53+ )
54+ suite .Require ().NoError (err )
55+
56+ verifier2 := buildVerifier (suite .T (), suite .srcMongoInstance , suite .dstMongoInstance , suite .metaMongoInstance )
57+ err = verifier2 .StartChangeStream (ctx )
58+ suite .Require ().NoError (err )
59+
60+ suite .Require ().NotNil (verifier2 .srcStartAtTs )
61+ suite .Assert ().Equal (
62+ startTs ,
63+ * verifier2 .srcStartAtTs ,
64+ )
65+ }
66+
3067func (suite * MultiSourceVersionTestSuite ) TestStartAtTimeNoChanges () {
3168 verifier := buildVerifier (suite .T (), suite .srcMongoInstance , suite .dstMongoInstance , suite .metaMongoInstance )
3269 ctx , cancel := context .WithCancel (context .Background ())
0 commit comments