Skip to content

Commit 3ce5165

Browse files
committed
fix
1 parent 42e8780 commit 3ce5165

File tree

4 files changed

+24
-20
lines changed

4 files changed

+24
-20
lines changed

internal/verifier/change_stream.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
223223
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
224224
}
225225

226-
if !curTs.Before(writesOffTs) {
226+
// writesOffTs refers to a real event (i.e., not appendOplogNote),
227+
// so we need to TryNext() if curTs == writesOffTs.
228+
if curTs.After(writesOffTs) {
227229
verifier.logger.Debug().
228230
Interface("currentTimestamp", curTs).
229231
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/clustertime.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/10gen/migration-verifier/internal/retry"
88
"github.com/10gen/migration-verifier/internal/util"
99
"github.com/10gen/migration-verifier/mbson"
10-
"github.com/10gen/migration-verifier/option"
1110
"github.com/pkg/errors"
1211
"go.mongodb.org/mongo-driver/bson"
1312
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -18,9 +17,10 @@ import (
1817

1918
const opTimeKeyInServerResponse = "operationTime"
2019

21-
// GetNewClusterTime advances the cluster time and returns that time.
22-
// All shards’ cluster times will meet or exceed the returned time.
23-
func GetNewClusterTime(
20+
// GetClusterTime get the current cluster time and returns that time.
21+
// All shards’ cluster times will also be updated to meet or exceed the
22+
// returned time.
23+
func GetClusterTime(
2424
ctx context.Context,
2525
logger *logger.Logger,
2626
client *mongo.Client,
@@ -36,7 +36,7 @@ func GetNewClusterTime(
3636
logger,
3737
func(_ *retry.Info) error {
3838
var err error
39-
clusterTime, err = fetchNewClusterTime(ctx, client)
39+
clusterTime, err = fetchClusterTime(ctx, client)
4040
return err
4141
},
4242
)
@@ -68,22 +68,27 @@ func GetNewClusterTime(
6868

6969
// Use this when we just need the correct cluster time without
7070
// actually changing any shards’ oplogs.
71-
func fetchNewClusterTime(
71+
func fetchClusterTime(
7272
ctx context.Context,
7373
client *mongo.Client,
7474
) (primitive.Timestamp, error) {
7575
cmd, rawResponse, err := runAppendOplogNote(
7676
ctx,
7777
client,
7878
"expect StaleClusterTime error",
79-
option.None[primitive.Timestamp](),
79+
primitive.Timestamp{1, 0},
8080
)
8181

82-
if err != nil {
83-
return primitive.Timestamp{}, errors.Wrapf(
82+
// We expect an error here; if we didn't get one then something is
83+
// amiss on the server.
84+
if err == nil {
85+
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
86+
}
87+
88+
if !util.IsStaleClusterTimeError(err) {
89+
return primitive.Timestamp{}, errors.Wrap(
8490
err,
85-
"unexpected error (expected StaleClusterTime) from request (%v)",
86-
cmd,
91+
"unexpected error (expected StaleClusterTime) from request",
8792
)
8893
}
8994

@@ -99,7 +104,7 @@ func syncClusterTimeAcrossShards(
99104
ctx,
100105
client,
101106
"syncing cluster time",
102-
option.Some(maxTime),
107+
maxTime,
103108
)
104109

105110
// If any shard’s cluster time >= maxTime, the mongos will return a
@@ -119,19 +124,16 @@ func runAppendOplogNote(
119124
ctx context.Context,
120125
client *mongo.Client,
121126
note string,
122-
maxClusterTimeOpt option.Option[primitive.Timestamp],
127+
maxClusterTime primitive.Timestamp,
123128
) (bson.D, bson.Raw, error) {
124129
cmd := bson.D{
125130
{"appendOplogNote", 1},
131+
{"maxClusterTime", maxClusterTime},
126132
{"data", bson.D{
127133
{"migration-verifier", note},
128134
}},
129135
}
130136

131-
if maxClusterTime, has := maxClusterTimeOpt.Get(); has {
132-
cmd = append(cmd, bson.E{"maxClusterTime", maxClusterTime})
133-
}
134-
135137
resp := client.
136138
Database(
137139
"admin",

internal/verifier/clustertime_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ func (suite *IntegrationTestSuite) TestGetClusterTime() {
66
ctx := context.Background()
77
logger, _ := getLoggerAndWriter("stdout")
88

9-
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
9+
ts, err := GetClusterTime(ctx, logger, suite.srcMongoClient)
1010
suite.Require().NoError(err)
1111

1212
suite.Assert().NotZero(ts, "timestamp should be nonzero")

internal/verifier/migration_verifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
241241
if verifier.writesOffTimestamp == nil {
242242
verifier.logger.Debug().Msg("Change stream still running. Signalling that writes are done.")
243243

244-
finalTs, err := GetNewClusterTime(
244+
finalTs, err := GetClusterTime(
245245
ctx,
246246
verifier.logger,
247247
verifier.srcClient,

0 commit comments

Comments
 (0)