Skip to content

Commit 9b0b888

Browse files
committed
use fixed appendOplogNote logic
1 parent 5df3ecc commit 9b0b888

File tree

3 files changed

+30
-33
lines changed

3 files changed

+30
-33
lines changed

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
4949
if csRunning {
5050
verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit")
5151

52-
finalTs, err := GetClusterTime(
52+
finalTs, err := GetNewClusterTime(
5353
ctx,
5454
verifier.logger,
5555
verifier.srcClient,

internal/verifier/clustertime.go

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,25 @@ import (
1717

1818
const opTimeKeyInServerResponse = "operationTime"
1919

20-
// GetClusterTime returns the remote cluster’s cluster time.
21-
// In so doing it creates a new oplog entry across all shards.
22-
// The “note” will go into the cluster’s oplog, so keep it
23-
// short but meaningful.
24-
func GetClusterTime(
20+
// GetNewClusterTime advances the cluster time and returns that time.
21+
// All shards’ cluster times will meet or exceed the returned time.
22+
func GetNewClusterTime(
2523
ctx context.Context,
2624
logger *logger.Logger,
2725
client *mongo.Client,
2826
) (primitive.Timestamp, error) {
2927
retryer := retry.New(retry.DefaultDurationLimit)
3028

31-
var optime primitive.Timestamp
29+
var clusterTime primitive.Timestamp
3230

33-
// To get the cluster time, we submit a request to append an oplog note
34-
// but set an unreasonably-early maxClusterTime. All the shards will fail
35-
// the request but still send their current clusterTime to the mongos,
36-
// which will return the most recent of those. Thus we can fetch the
37-
// most recent cluster time without altering the oplog.
31+
// First we just fetch the latest cluster time without updating any
32+
// shards’ oplogs.
3833
err := retryer.RunForTransientErrorsOnly(
3934
ctx,
4035
logger,
4136
func(_ *retry.Info) error {
4237
var err error
43-
optime, err = fetchClusterTime(ctx, client)
38+
clusterTime, err = fetchClusterTime(ctx, client)
4439
return err
4540
},
4641
)
@@ -49,17 +44,15 @@ func GetClusterTime(
4944
return primitive.Timestamp{}, err
5045
}
5146

52-
// OPTIMIZATION FOR SHARDED CLUSTERS: We append another oplog entry to
53-
// bring all shards to a cluster time that’s *after* the optime that we’ll
54-
// return. That way any events *at*
55-
//
56-
// Since this is just an optimization, failures here are nonfatal.
47+
// fetchClusterTime() will have taught the mongos about the most current
48+
// shard’s cluster time. Now we tell that mongos to update all lagging
49+
// shards to that time.
5750
err = retryer.RunForTransientErrorsOnly(
5851
ctx,
5952
logger,
6053
func(_ *retry.Info) error {
6154
var err error
62-
optime, err = syncClusterTimeAcrossShards(ctx, client)
55+
clusterTime, err = syncClusterTimeAcrossShards(ctx, client, clusterTime)
6356
return err
6457
},
6558
)
@@ -69,7 +62,7 @@ func GetClusterTime(
6962
Msg("Failed to append oplog note; change stream may need extra time to finish.")
7063
}
7164

72-
return optime, nil
65+
return clusterTime, nil
7366
}
7467

7568
// Use this when we just need the correct cluster time without
@@ -82,7 +75,7 @@ func fetchClusterTime(
8275
ctx,
8376
client,
8477
"expect StaleClusterTime error",
85-
bson.E{"maxClusterTime", primitive.Timestamp{1, 0}},
78+
primitive.Timestamp{1, 0},
8679
)
8780

8881
// We expect an error here; if we didn't get one then something is
@@ -104,8 +97,14 @@ func fetchClusterTime(
10497
func syncClusterTimeAcrossShards(
10598
ctx context.Context,
10699
client *mongo.Client,
100+
maxTime primitive.Timestamp,
107101
) (primitive.Timestamp, error) {
108-
_, rawResponse, err := runAppendOplogNote(ctx, client, "syncing cluster time")
102+
_, rawResponse, err := runAppendOplogNote(
103+
ctx,
104+
client,
105+
"syncing cluster time",
106+
maxTime,
107+
)
109108

110109
if err != nil {
111110
return primitive.Timestamp{}, err
@@ -118,17 +117,15 @@ func runAppendOplogNote(
118117
ctx context.Context,
119118
client *mongo.Client,
120119
note string,
121-
extraPieces ...bson.E,
120+
maxClusterTime primitive.Timestamp,
122121
) (bson.D, bson.Raw, error) {
123-
cmd := append(
124-
bson.D{
125-
{"appendOplogNote", 1},
126-
{"data", bson.D{
127-
{"migration-verifier", note},
128-
}},
129-
},
130-
extraPieces...,
131-
)
122+
cmd := bson.D{
123+
{"appendOplogNote", 1},
124+
{"maxClusterTime", maxClusterTime},
125+
{"data", bson.D{
126+
{"migration-verifier", note},
127+
}},
128+
}
132129

133130
resp := client.
134131
Database(

internal/verifier/clustertime_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ func (suite *IntegrationTestSuite) TestGetClusterTime() {
66
ctx := context.Background()
77
logger, _ := getLoggerAndWriter("stdout")
88

9-
ts, err := GetClusterTime(ctx, logger, suite.srcMongoClient)
9+
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
1010
suite.Require().NoError(err)
1111

1212
suite.Assert().NotZero(ts, "timestamp should be nonzero")

0 commit comments

Comments
 (0)