Skip to content

Commit f9c344a

Browse files
committed
REP-5318 Make GetNewClusterTime return a new time.
Previously this function actually returned the most recent cluster time rather than creating a new cluster time. This changeset makes it now create a new time instead. This simplifies the change stream’s handling of the writesOff timestamp because the writesOff timestamp is separate from any change event.
1 parent 365482d commit f9c344a

File tree

3 files changed

+58
-70
lines changed

3 files changed

+58
-70
lines changed

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
226226
break
227227
}
228228

229-
if curTs.After(writesOffTs) {
229+
if !curTs.Before(writesOffTs) {
230230
verifier.logger.Debug().
231231
Interface("currentTimestamp", curTs).
232232
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/clustertime.go

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/10gen/migration-verifier/internal/retry"
88
"github.com/10gen/migration-verifier/internal/util"
99
"github.com/10gen/migration-verifier/mbson"
10+
"github.com/10gen/migration-verifier/option"
1011
"github.com/pkg/errors"
1112
"go.mongodb.org/mongo-driver/bson"
1213
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -17,8 +18,8 @@ import (
1718

1819
const opTimeKeyInServerResponse = "operationTime"
1920

20-
// GetNewClusterTime advances the cluster time and returns that time.
21-
// All shards’ cluster times will meet or exceed the returned time.
21+
// GetNewClusterTime creates a new cluster time, updates all shards’
22+
// cluster times to meet or exceed that time, then returns it.
2223
func GetNewClusterTime(
2324
ctx context.Context,
2425
logger *logger.Logger,
@@ -35,7 +36,12 @@ func GetNewClusterTime(
3536
logger,
3637
func(_ *retry.Info) error {
3738
var err error
38-
clusterTime, err = fetchClusterTime(ctx, client)
39+
clusterTime, err = runAppendOplogNote(
40+
ctx,
41+
client,
42+
"new ts",
43+
option.None[primitive.Timestamp](),
44+
)
3945
return err
4046
},
4147
)
@@ -52,7 +58,12 @@ func GetNewClusterTime(
5258
logger,
5359
func(_ *retry.Info) error {
5460
var err error
55-
_, err = syncClusterTimeAcrossShards(ctx, client, clusterTime)
61+
_, err = runAppendOplogNote(
62+
ctx,
63+
client,
64+
"sync ts",
65+
option.Some(clusterTime),
66+
)
5667
return err
5768
},
5869
)
@@ -65,46 +76,31 @@ func GetNewClusterTime(
6576
return clusterTime, nil
6677
}
6778

68-
// Use this when we just need the correct cluster time without
69-
// actually changing any shards’ oplogs.
70-
func fetchClusterTime(
79+
func runAppendOplogNote(
7180
ctx context.Context,
7281
client *mongo.Client,
82+
note string,
83+
maxClusterTimeOpt option.Option[primitive.Timestamp],
7384
) (primitive.Timestamp, error) {
74-
cmd, rawResponse, err := runAppendOplogNote(
75-
ctx,
76-
client,
77-
"expect StaleClusterTime error",
78-
primitive.Timestamp{1, 0},
79-
)
80-
81-
// We expect an error here; if we didn't get one then something is
82-
// amiss on the server.
83-
if err == nil {
84-
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
85+
cmd := bson.D{
86+
{"appendOplogNote", 1},
87+
{"data", bson.D{
88+
{"migration-verifier", note},
89+
}},
8590
}
8691

87-
if !util.IsStaleClusterTimeError(err) {
88-
return primitive.Timestamp{}, errors.Wrap(
89-
err,
90-
"unexpected error (expected StaleClusterTime) from request",
91-
)
92+
if maxClusterTime, has := maxClusterTimeOpt.Get(); has {
93+
cmd = append(cmd, bson.E{"maxClusterTime", maxClusterTime})
9294
}
9395

94-
return getOpTimeFromRawResponse(rawResponse)
95-
}
96+
resp := client.
97+
Database(
98+
"admin",
99+
options.Database().SetWriteConcern(writeconcern.Majority()),
100+
).
101+
RunCommand(ctx, cmd)
96102

97-
func syncClusterTimeAcrossShards(
98-
ctx context.Context,
99-
client *mongo.Client,
100-
maxTime primitive.Timestamp,
101-
) (primitive.Timestamp, error) {
102-
_, rawResponse, err := runAppendOplogNote(
103-
ctx,
104-
client,
105-
"syncing cluster time",
106-
maxTime,
107-
)
103+
rawResponse, err := resp.Raw()
108104

109105
// If any shard’s cluster time >= maxTime, the mongos will return a
110106
// StaleClusterTime error. This particular error doesn’t indicate a
@@ -119,36 +115,6 @@ func syncClusterTimeAcrossShards(
119115
return getOpTimeFromRawResponse(rawResponse)
120116
}
121117

122-
func runAppendOplogNote(
123-
ctx context.Context,
124-
client *mongo.Client,
125-
note string,
126-
maxClusterTime primitive.Timestamp,
127-
) (bson.D, bson.Raw, error) {
128-
cmd := bson.D{
129-
{"appendOplogNote", 1},
130-
{"maxClusterTime", maxClusterTime},
131-
{"data", bson.D{
132-
{"migration-verifier", note},
133-
}},
134-
}
135-
136-
resp := client.
137-
Database(
138-
"admin",
139-
options.Database().SetWriteConcern(writeconcern.Majority()),
140-
).
141-
RunCommand(ctx, cmd)
142-
143-
raw, err := resp.Raw()
144-
145-
return cmd, raw, errors.Wrapf(
146-
err,
147-
"command (%v) failed unexpectedly",
148-
cmd,
149-
)
150-
}
151-
152118
func getOpTimeFromRawResponse(rawResponse bson.Raw) (primitive.Timestamp, error) {
153119
// Get the `operationTime` from the response and return it.
154120
var optime primitive.Timestamp
Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,35 @@
11
package verifier
22

3-
import "context"
3+
import (
4+
"context"
45

5-
func (suite *IntegrationTestSuite) TestGetClusterTime() {
6+
"go.mongodb.org/mongo-driver/bson"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
"go.mongodb.org/mongo-driver/mongo"
9+
)
10+
11+
func (suite *IntegrationTestSuite) TestGetNewClusterTime() {
612
ctx := context.Background()
713
logger, _ := getLoggerAndWriter("stdout")
814

15+
sess, err := suite.srcMongoClient.StartSession()
16+
suite.Require().NoError(err)
17+
18+
_, err = suite.srcMongoClient.
19+
Database(suite.DBNameForTest()).
20+
Collection("mycoll").
21+
InsertOne(mongo.NewSessionContext(ctx, sess), bson.D{})
22+
suite.Require().NoError(err)
23+
24+
clusterTimeVal, err := sess.ClusterTime().LookupErr("$clusterTime", "clusterTime")
25+
suite.Require().NoError(err, "should extract cluster time from %+v", sess.ClusterTime())
26+
27+
clusterT, clusterI, ok := clusterTimeVal.TimestampOK()
28+
suite.Require().True(ok, "session cluster time (%s: %v) must be a timestamp", clusterTimeVal.Type, clusterTimeVal)
29+
930
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
1031
suite.Require().NoError(err)
1132

12-
suite.Assert().NotZero(ts, "timestamp should be nonzero")
33+
suite.Require().NotZero(ts, "timestamp should be nonzero")
34+
suite.Assert().True(ts.After(primitive.Timestamp{T: clusterT, I: clusterI}))
1335
}

0 commit comments

Comments
 (0)