Skip to content

Commit d1e3bb8

Browse files
committed
fix ordering assertion
1 parent 3f37acd commit d1e3bb8

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

internal/verifier/change_stream_test.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,16 @@ func TestChangeStreamFilter(t *testing.T) {
3030
}, verifier.GetChangeStreamFilter())
3131
}
3232

33+
// TestChangeStreamResumability creates a verifier, starts its change stream,
34+
// terminates that verifier, updates the source cluster, starts a new
35+
// verifier with change stream, and confirms that things look as they should.
3336
func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
34-
var startTs primitive.Timestamp
3537
func() {
3638
verifier1 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
3739
ctx, cancel := context.WithCancel(context.Background())
3840
defer cancel()
3941
err := verifier1.StartChangeStream(ctx)
4042
suite.Require().NoError(err)
41-
42-
suite.Require().NotNil(verifier1.srcStartAtTs)
43-
startTs = *verifier1.srcStartAtTs
4443
}()
4544

4645
ctx, cancel := context.WithCancel(context.Background())
@@ -62,15 +61,16 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
6261
"no rechecks should be enqueued before starting change stream",
6362
)
6463

64+
newTime := suite.getClusterTime(ctx, suite.srcMongoClient)
65+
6566
err = verifier2.StartChangeStream(ctx)
6667
suite.Require().NoError(err)
6768

6869
suite.Require().NotNil(verifier2.srcStartAtTs)
6970

70-
suite.Assert().Equal(
71-
primitive.Timestamp{T: startTs.T, I: 1 + startTs.I},
72-
*verifier2.srcStartAtTs,
73-
"verifier2's change stream should be 1 increment further than verifier1's",
71+
suite.Assert().False(
72+
verifier2.srcStartAtTs.After(newTime),
73+
"verifier2's change stream should be no later than this new session",
7474
)
7575

7676
recheckDocs := []bson.M{}
@@ -99,6 +99,19 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
9999
)
100100
}
101101

102+
func (suite *MultiSourceVersionTestSuite) getClusterTime(ctx context.Context, client *mongo.Client) primitive.Timestamp {
103+
sess, err := client.StartSession()
104+
suite.Require().NoError(err, "should start session")
105+
106+
sctx := mongo.NewSessionContext(ctx, sess)
107+
suite.Require().NoError(sess.Client().Ping(sctx, nil))
108+
109+
newTime, err := getClusterTimeFromSession(sess)
110+
suite.Require().NoError(err, "should fetch cluster time")
111+
112+
return newTime
113+
}
114+
102115
func (suite *MultiSourceVersionTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
103116
recheckDocs := []bson.M{}
104117

0 commit comments

Comments
 (0)