Skip to content

Commit 3c81778

Browse files
committed
Make GetNewClusterTime return a new time rather than an existing time.
This simplifies the change stream’s handling of the writesOff timestamp because the writesOff timestamp is separate from any change event.
1 parent a4086df commit 3c81778

File tree

8 files changed

+676
-70
lines changed

8 files changed

+676
-70
lines changed

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
226226
break
227227
}
228228

229-
if curTs.After(writesOffTs) {
229+
if !curTs.Before(writesOffTs) {
230230
verifier.logger.Debug().
231231
Interface("currentTimestamp", curTs).
232232
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/clustertime.go

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/10gen/migration-verifier/internal/retry"
88
"github.com/10gen/migration-verifier/internal/util"
99
"github.com/10gen/migration-verifier/mbson"
10+
"github.com/10gen/migration-verifier/option"
1011
"github.com/pkg/errors"
1112
"go.mongodb.org/mongo-driver/bson"
1213
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -17,8 +18,8 @@ import (
1718

1819
const opTimeKeyInServerResponse = "operationTime"
1920

20-
// GetNewClusterTime advances the cluster time and returns that time.
21-
// All shards’ cluster times will meet or exceed the returned time.
21+
// GetNewClusterTime creates a new cluster time, updates all shards’
22+
// cluster times to meet or exceed that time, then returns it.
2223
func GetNewClusterTime(
2324
ctx context.Context,
2425
logger *logger.Logger,
@@ -35,7 +36,12 @@ func GetNewClusterTime(
3536
logger,
3637
func(_ *retry.Info) error {
3738
var err error
38-
clusterTime, err = fetchClusterTime(ctx, client)
39+
clusterTime, err = runAppendOplogNote(
40+
ctx,
41+
client,
42+
"new ts",
43+
option.None[primitive.Timestamp](),
44+
)
3945
return err
4046
},
4147
)
@@ -52,7 +58,12 @@ func GetNewClusterTime(
5258
logger,
5359
func(_ *retry.Info) error {
5460
var err error
55-
_, err = syncClusterTimeAcrossShards(ctx, client, clusterTime)
61+
_, err = runAppendOplogNote(
62+
ctx,
63+
client,
64+
"sync ts",
65+
option.Some(clusterTime),
66+
)
5667
return err
5768
},
5869
)
@@ -65,46 +76,31 @@ func GetNewClusterTime(
6576
return clusterTime, nil
6677
}
6778

68-
// Use this when we just need the correct cluster time without
69-
// actually changing any shards’ oplogs.
70-
func fetchClusterTime(
79+
func runAppendOplogNote(
7180
ctx context.Context,
7281
client *mongo.Client,
82+
note string,
83+
maxClusterTimeOpt option.Option[primitive.Timestamp],
7384
) (primitive.Timestamp, error) {
74-
cmd, rawResponse, err := runAppendOplogNote(
75-
ctx,
76-
client,
77-
"expect StaleClusterTime error",
78-
primitive.Timestamp{1, 0},
79-
)
80-
81-
// We expect an error here; if we didn't get one then something is
82-
// amiss on the server.
83-
if err == nil {
84-
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
85+
cmd := bson.D{
86+
{"appendOplogNote", 1},
87+
{"data", bson.D{
88+
{"migration-verifier", note},
89+
}},
8590
}
8691

87-
if !util.IsStaleClusterTimeError(err) {
88-
return primitive.Timestamp{}, errors.Wrap(
89-
err,
90-
"unexpected error (expected StaleClusterTime) from request",
91-
)
92+
if maxClusterTime, has := maxClusterTimeOpt.Get(); has {
93+
cmd = append(cmd, bson.E{"maxClusterTime", maxClusterTime})
9294
}
9395

94-
return getOpTimeFromRawResponse(rawResponse)
95-
}
96+
resp := client.
97+
Database(
98+
"admin",
99+
options.Database().SetWriteConcern(writeconcern.Majority()),
100+
).
101+
RunCommand(ctx, cmd)
96102

97-
func syncClusterTimeAcrossShards(
98-
ctx context.Context,
99-
client *mongo.Client,
100-
maxTime primitive.Timestamp,
101-
) (primitive.Timestamp, error) {
102-
_, rawResponse, err := runAppendOplogNote(
103-
ctx,
104-
client,
105-
"syncing cluster time",
106-
maxTime,
107-
)
103+
rawResponse, err := resp.Raw()
108104

109105
// If any shard’s cluster time >= maxTime, the mongos will return a
110106
// StaleClusterTime error. This particular error doesn’t indicate a
@@ -119,36 +115,6 @@ func syncClusterTimeAcrossShards(
119115
return getOpTimeFromRawResponse(rawResponse)
120116
}
121117

122-
func runAppendOplogNote(
123-
ctx context.Context,
124-
client *mongo.Client,
125-
note string,
126-
maxClusterTime primitive.Timestamp,
127-
) (bson.D, bson.Raw, error) {
128-
cmd := bson.D{
129-
{"appendOplogNote", 1},
130-
{"maxClusterTime", maxClusterTime},
131-
{"data", bson.D{
132-
{"migration-verifier", note},
133-
}},
134-
}
135-
136-
resp := client.
137-
Database(
138-
"admin",
139-
options.Database().SetWriteConcern(writeconcern.Majority()),
140-
).
141-
RunCommand(ctx, cmd)
142-
143-
raw, err := resp.Raw()
144-
145-
return cmd, raw, errors.Wrapf(
146-
err,
147-
"command (%v) failed unexpectedly",
148-
cmd,
149-
)
150-
}
151-
152118
func getOpTimeFromRawResponse(rawResponse bson.Raw) (primitive.Timestamp, error) {
153119
// Get the `operationTime` from the response and return it.
154120
var optime primitive.Timestamp
Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,35 @@
11
package verifier
22

3-
import "context"
3+
import (
4+
"context"
45

5-
func (suite *IntegrationTestSuite) TestGetClusterTime() {
6+
"go.mongodb.org/mongo-driver/bson"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
"go.mongodb.org/mongo-driver/mongo"
9+
)
10+
11+
func (suite *IntegrationTestSuite) TestGetNewClusterTime() {
612
ctx := context.Background()
713
logger, _ := getLoggerAndWriter("stdout")
814

15+
sess, err := suite.srcMongoClient.StartSession()
16+
suite.Require().NoError(err)
17+
18+
_, err = suite.srcMongoClient.
19+
Database(suite.DBNameForTest()).
20+
Collection("mycoll").
21+
InsertOne(mongo.NewSessionContext(ctx, sess), bson.D{})
22+
suite.Require().NoError(err)
23+
24+
clusterTimeVal, err := sess.ClusterTime().LookupErr("$clusterTime", "clusterTime")
25+
suite.Require().NoError(err, "should extract cluster time from %+v", sess.ClusterTime())
26+
27+
clusterT, clusterI, ok := clusterTimeVal.TimestampOK()
28+
suite.Require().True(ok, "session cluster time (%s: %v) must be a timestamp", clusterTimeVal.Type, clusterTimeVal)
29+
930
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
1031
suite.Require().NoError(err)
1132

12-
suite.Assert().NotZero(ts, "timestamp should be nonzero")
33+
suite.Require().NotZero(ts, "timestamp should be nonzero")
34+
suite.Assert().True(ts.After(primitive.Timestamp{T: clusterT, I: clusterI}))
1335
}

option/bson.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package option
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"go.mongodb.org/mongo-driver/bson"
6+
"go.mongodb.org/mongo-driver/bson/bsontype"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
)
9+
10+
// MarshalBSONValue implements bson.ValueMarshaler.
11+
func (o Option[T]) MarshalBSONValue() (bsontype.Type, []byte, error) {
12+
val, exists := o.Get()
13+
if !exists {
14+
return bson.MarshalValue(primitive.Null{})
15+
}
16+
17+
return bson.MarshalValue(val)
18+
}
19+
20+
// UnmarshalBSONValue implements bson.ValueUnmarshaler.
21+
func (o *Option[T]) UnmarshalBSONValue(bType bsontype.Type, raw []byte) error {
22+
23+
switch bType {
24+
case bson.TypeNull:
25+
o.val = nil
26+
27+
default:
28+
valPtr := new(T)
29+
30+
err := bson.UnmarshalValue(bType, raw, &valPtr)
31+
if err != nil {
32+
return errors.Wrapf(err, "failed to unmarshal %T", *o)
33+
}
34+
35+
// This may not even be possible, but we should still check.
36+
if isNil(*valPtr) {
37+
return errors.Wrapf(err, "refuse to unmarshal nil %T value", *o)
38+
}
39+
40+
o.val = valPtr
41+
}
42+
43+
return nil
44+
}
45+
46+
// IsZero implements bsoncodec.Zeroer.
47+
func (o Option[T]) IsZero() bool {
48+
return o.IsNone()
49+
}

option/json.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package option
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
)
7+
8+
var _ json.Marshaler = &Option[int]{}
9+
var _ json.Unmarshaler = &Option[int]{}
10+
11+
// MarshalJSON encodes Option into json.
12+
func (o Option[T]) MarshalJSON() ([]byte, error) {
13+
val, exists := o.Get()
14+
if exists {
15+
return json.Marshal(val)
16+
}
17+
18+
return json.Marshal(nil)
19+
}
20+
21+
// UnmarshalJSON decodes Option from json.
22+
func (o *Option[T]) UnmarshalJSON(b []byte) error {
23+
if bytes.Equal(b, []byte("null")) {
24+
o.val = nil
25+
} else {
26+
val := *new(T)
27+
28+
err := json.Unmarshal(b, &val)
29+
if err != nil {
30+
return err
31+
}
32+
33+
o.val = &val
34+
}
35+
36+
return nil
37+
}

0 commit comments

Comments
 (0)