Skip to content

Commit 5df3ecc

Browse files
committed
Revert "rework cluster-time-getting"
This reverts commit 4509091.
1 parent d25eb42 commit 5df3ecc

File tree

3 files changed

+97
-24
lines changed

3 files changed

+97
-24
lines changed

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
4949
if csRunning {
5050
verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit")
5151

52-
finalTs, err := GetNewClusterTime(
52+
finalTs, err := GetClusterTime(
5353
ctx,
5454
verifier.logger,
5555
verifier.srcClient,

internal/verifier/clustertime.go

Lines changed: 95 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/10gen/migration-verifier/internal/logger"
77
"github.com/10gen/migration-verifier/internal/retry"
8+
"github.com/10gen/migration-verifier/internal/util"
89
"github.com/10gen/migration-verifier/mbson"
910
"github.com/pkg/errors"
1011
"go.mongodb.org/mongo-driver/bson"
@@ -16,11 +17,11 @@ import (
1617

1718
const opTimeKeyInServerResponse = "operationTime"
1819

19-
// GetNewClusterTime advances the remote cluster’s cluster time an returns
20-
// that time. In sharded clusters this advancement happens across all shards,
21-
// which (usefully!) equalizes the shards’ cluster time and triggers them to
22-
// output all events before then.
23-
func GetNewClusterTime(
20+
// GetClusterTime returns the remote cluster’s cluster time.
21+
// In so doing it creates a new oplog entry across all shards.
22+
// The “note” will go into the cluster’s oplog, so keep it
23+
// short but meaningful.
24+
func GetClusterTime(
2425
ctx context.Context,
2526
logger *logger.Logger,
2627
client *mongo.Client,
@@ -29,7 +30,31 @@ func GetNewClusterTime(
2930

3031
var optime primitive.Timestamp
3132

33+
// To get the cluster time, we submit a request to append an oplog note
34+
// but set an unreasonably-early maxClusterTime. All the shards will fail
35+
// the request but still send their current clusterTime to the mongos,
36+
// which will return the most recent of those. Thus we can fetch the
37+
// most recent cluster time without altering the oplog.
3238
err := retryer.RunForTransientErrorsOnly(
39+
ctx,
40+
logger,
41+
func(_ *retry.Info) error {
42+
var err error
43+
optime, err = fetchClusterTime(ctx, client)
44+
return err
45+
},
46+
)
47+
48+
if err != nil {
49+
return primitive.Timestamp{}, err
50+
}
51+
52+
// OPTIMIZATION FOR SHARDED CLUSTERS: We append another oplog entry to
53+
// bring all shards to a cluster time that’s *after* the optime that we’ll
54+
// return. That way any events *at*
55+
//
56+
// Since this is just an optimization, failures here are nonfatal.
57+
err = retryer.RunForTransientErrorsOnly(
3358
ctx,
3459
logger,
3560
func(_ *retry.Info) error {
@@ -38,43 +63,91 @@ func GetNewClusterTime(
3863
return err
3964
},
4065
)
66+
if err != nil {
67+
// This isn't serious enough even to warn on, so leave it at info-level.
68+
logger.Info().Err(err).
69+
Msg("Failed to append oplog note; change stream may need extra time to finish.")
70+
}
71+
72+
return optime, nil
73+
}
74+
75+
// Use this when we just need the correct cluster time without
76+
// actually changing any shards’ oplogs.
77+
func fetchClusterTime(
78+
ctx context.Context,
79+
client *mongo.Client,
80+
) (primitive.Timestamp, error) {
81+
cmd, rawResponse, err := runAppendOplogNote(
82+
ctx,
83+
client,
84+
"expect StaleClusterTime error",
85+
bson.E{"maxClusterTime", primitive.Timestamp{1, 0}},
86+
)
87+
88+
// We expect an error here; if we didn't get one then something is
89+
// amiss on the server.
90+
if err == nil {
91+
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
92+
}
93+
94+
if !util.IsStaleClusterTimeError(err) {
95+
return primitive.Timestamp{}, errors.Wrap(
96+
err,
97+
"unexpected error (expected StaleClusterTime) from request",
98+
)
99+
}
41100

42-
return optime, err
101+
return getOpTimeFromRawResponse(rawResponse)
43102
}
44103

45104
func syncClusterTimeAcrossShards(
46105
ctx context.Context,
47106
client *mongo.Client,
48107
) (primitive.Timestamp, error) {
49-
cmd := bson.D{
50-
{"appendOplogNote", 1},
51-
{"data", bson.D{
52-
{"migration-verifier", "syncing cluster time"},
53-
}},
108+
_, rawResponse, err := runAppendOplogNote(ctx, client, "syncing cluster time")
109+
110+
if err != nil {
111+
return primitive.Timestamp{}, err
54112
}
55113

114+
return getOpTimeFromRawResponse(rawResponse)
115+
}
116+
117+
func runAppendOplogNote(
118+
ctx context.Context,
119+
client *mongo.Client,
120+
note string,
121+
extraPieces ...bson.E,
122+
) (bson.D, bson.Raw, error) {
123+
cmd := append(
124+
bson.D{
125+
{"appendOplogNote", 1},
126+
{"data", bson.D{
127+
{"migration-verifier", note},
128+
}},
129+
},
130+
extraPieces...,
131+
)
132+
56133
resp := client.
57134
Database(
58135
"admin",
59136
options.Database().SetWriteConcern(writeconcern.Majority()),
60137
).
61138
RunCommand(ctx, cmd)
62139

63-
rawResponse, err := resp.Raw()
140+
raw, err := resp.Raw()
64141

65-
if err != nil {
66-
return primitive.Timestamp{}, errors.Wrapf(
67-
err,
68-
"command (%v) failed unexpectedly",
69-
cmd,
70-
)
71-
}
72-
73-
return getOpTimeFromRawResponse(rawResponse)
142+
return cmd, raw, errors.Wrapf(
143+
err,
144+
"command (%v) failed unexpectedly",
145+
cmd,
146+
)
74147
}
75148

76149
func getOpTimeFromRawResponse(rawResponse bson.Raw) (primitive.Timestamp, error) {
77-
// Return the response’s `operationTime`.
150+
// Get the `operationTime` from the response and return it.
78151
var optime primitive.Timestamp
79152

80153
found, err := mbson.RawLookup(rawResponse, &optime, opTimeKeyInServerResponse)

internal/verifier/clustertime_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ func (suite *IntegrationTestSuite) TestGetClusterTime() {
66
ctx := context.Background()
77
logger, _ := getLoggerAndWriter("stdout")
88

9-
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
9+
ts, err := GetClusterTime(ctx, logger, suite.srcMongoClient)
1010
suite.Require().NoError(err)
1111

1212
suite.Assert().NotZero(ts, "timestamp should be nonzero")

0 commit comments

Comments
 (0)