Skip to content

Commit eb7746f

Browse files
committed
new cluster time
1 parent 3ce5165 commit eb7746f

File tree

3 files changed

+36
-14
lines changed

3 files changed

+36
-14
lines changed

internal/verifier/clustertime.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/10gen/migration-verifier/internal/retry"
88
"github.com/10gen/migration-verifier/internal/util"
99
"github.com/10gen/migration-verifier/mbson"
10+
"github.com/10gen/migration-verifier/option"
1011
"github.com/pkg/errors"
1112
"go.mongodb.org/mongo-driver/bson"
1213
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -17,10 +18,9 @@ import (
1718

1819
const opTimeKeyInServerResponse = "operationTime"
1920

20-
// GetClusterTime get the current cluster time and returns that time.
21-
// All shards’ cluster times will also be updated to meet or exceed the
22-
// returned time.
23-
func GetClusterTime(
21+
// GetClusterTime creates a new cluster time, updates all shards’
22+
// cluster times to meet or exceed that time, then returns it.
23+
func GetNewClusterTime(
2424
ctx context.Context,
2525
logger *logger.Logger,
2626
client *mongo.Client,
@@ -36,7 +36,7 @@ func GetClusterTime(
3636
logger,
3737
func(_ *retry.Info) error {
3838
var err error
39-
clusterTime, err = fetchClusterTime(ctx, client)
39+
clusterTime, err = fetchNewClusterTime(ctx, client)
4040
return err
4141
},
4242
)
@@ -68,15 +68,15 @@ func GetClusterTime(
6868

6969
// Use this when we just need the correct cluster time without
7070
// actually changing any shards’ oplogs.
71-
func fetchClusterTime(
71+
func fetchNewClusterTime(
7272
ctx context.Context,
7373
client *mongo.Client,
7474
) (primitive.Timestamp, error) {
7575
cmd, rawResponse, err := runAppendOplogNote(
7676
ctx,
7777
client,
7878
"expect StaleClusterTime error",
79-
primitive.Timestamp{1, 0},
79+
option.None[primitive.Timestamp](),
8080
)
8181

8282
// We expect an error here; if we didn't get one then something is
@@ -104,7 +104,7 @@ func syncClusterTimeAcrossShards(
104104
ctx,
105105
client,
106106
"syncing cluster time",
107-
maxTime,
107+
option.Some(maxTime),
108108
)
109109

110110
// If any shard’s cluster time >= maxTime, the mongos will return a
@@ -124,16 +124,19 @@ func runAppendOplogNote(
124124
ctx context.Context,
125125
client *mongo.Client,
126126
note string,
127-
maxClusterTime primitive.Timestamp,
127+
maxClusterTimeOpt option.Option[primitive.Timestamp],
128128
) (bson.D, bson.Raw, error) {
129129
cmd := bson.D{
130130
{"appendOplogNote", 1},
131-
{"maxClusterTime", maxClusterTime},
132131
{"data", bson.D{
133132
{"migration-verifier", note},
134133
}},
135134
}
136135

136+
if maxClusterTime, has := maxClusterTimeOpt.Get(); has {
137+
cmd = append(cmd, bson.E{"maxClusterTime", maxClusterTime})
138+
}
139+
137140
resp := client.
138141
Database(
139142
"admin",
Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,32 @@
11
package verifier
22

3-
import "context"
3+
import (
4+
"context"
5+
6+
"go.mongodb.org/mongo-driver/bson"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
)
49

510
func (suite *IntegrationTestSuite) TestGetClusterTime() {
611
ctx := context.Background()
712
logger, _ := getLoggerAndWriter("stdout")
813

9-
ts, err := GetClusterTime(ctx, logger, suite.srcMongoClient)
14+
sess, err := suite.srcMongoClient.StartSession()
15+
suite.Require().NoError(err)
16+
17+
_, err = suite.srcMongoClient.
18+
Database(suite.DBNameForTest()).
19+
Collection("mycoll").
20+
InsertOne(ctx, bson.D{})
21+
clusterTimeVal, err := sess.ClusterTime().LookupErr("$clusterTime", "clusterTime")
22+
suite.Require().NoError(err)
23+
24+
clusterT, clusterI, ok := clusterTimeVal.TimestampOK()
25+
suite.Require().True(ok, "session cluster time (%s: %v) must be a timestamp", clusterTimeVal.Type, clusterTimeVal)
26+
27+
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
1028
suite.Require().NoError(err)
1129

12-
suite.Assert().NotZero(ts, "timestamp should be nonzero")
30+
suite.Require().NotZero(ts, "timestamp should be nonzero")
31+
suite.Assert().True(ts.After(primitive.Timestamp{T: clusterT, I: clusterI}))
1332
}

internal/verifier/migration_verifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
241241
if verifier.writesOffTimestamp == nil {
242242
verifier.logger.Debug().Msg("Change stream still running. Signalling that writes are done.")
243243

244-
finalTs, err := GetClusterTime(
244+
finalTs, err := GetNewClusterTime(
245245
ctx,
246246
verifier.logger,
247247
verifier.srcClient,

0 commit comments

Comments
 (0)