Skip to content

Commit 4509091

Browse files
committed
rework cluster-time-getting
1 parent 8c1af46 commit 4509091

File tree

3 files changed

+22
-102
lines changed

3 files changed

+22
-102
lines changed

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
4949
if csRunning {
5050
verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit")
5151

52-
finalTs, err := GetClusterTime(
52+
finalTs, err := GetNewClusterTime(
5353
ctx,
5454
verifier.logger,
5555
verifier.srcClient,

internal/verifier/clustertime.go

Lines changed: 20 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ package verifier
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/10gen/migration-verifier/internal/logger"
87
"github.com/10gen/migration-verifier/internal/retry"
9-
"github.com/10gen/migration-verifier/internal/util"
108
"github.com/10gen/migration-verifier/mbson"
119
"github.com/pkg/errors"
1210
"go.mongodb.org/mongo-driver/bson"
@@ -16,11 +14,11 @@ import (
1614

1715
const opTimeKeyInServerResponse = "operationTime"
1816

19-
// GetClusterTime returns the remote cluster’s cluster time.
20-
// In so doing it creates a new oplog entry across all shards.
21-
// The “note” will go into the cluster’s oplog, so keep it
22-
// short but meaningful.
23-
func GetClusterTime(
17+
// GetNewClusterTime advances the remote cluster’s cluster time an returns
18+
// that time. In sharded clusters this advancement happens across all shards,
19+
// which (usefully!) equalizes the shards’ cluster time and triggers them to
20+
// output all events before then.
21+
func GetNewClusterTime(
2422
ctx context.Context,
2523
logger *logger.Logger,
2624
client *mongo.Client,
@@ -29,125 +27,47 @@ func GetClusterTime(
2927

3028
var optime primitive.Timestamp
3129

32-
// To get the cluster time, we submit a request to append an oplog note
33-
// but set an unreasonably-early maxClusterTime. All the shards will fail
34-
// the request but still send their current clusterTime to the mongos,
35-
// which will return the most recent of those. Thus we can fetch the
36-
// most recent cluster time without altering the oplog.
3730
err := retryer.RunForTransientErrorsOnly(
3831
ctx,
3932
logger,
4033
func(_ *retry.Info) error {
4134
var err error
4235
optime, err = syncClusterTimeAcrossShards(ctx, client)
43-
if err != nil {
44-
err = fmt.Errorf("%w: %w", retry.CustomTransientErr, err)
45-
}
4636
return err
4737
},
4838
)
4939

50-
if err != nil {
51-
return primitive.Timestamp{}, err
52-
}
53-
54-
// OPTIMIZATION FOR SHARDED CLUSTERS: We append another oplog entry to
55-
// bring all shards to a cluster time that’s *after* the optime that we’ll
56-
// return. That way any events *at*
57-
//
58-
// Since this is just an optimization, failures here are nonfatal.
59-
err = retryer.RunForTransientErrorsOnly(
60-
ctx,
61-
logger,
62-
func(_ *retry.Info) error {
63-
var err error
64-
optime, err = syncClusterTimeAcrossShards(ctx, client)
65-
if err != nil {
66-
err = fmt.Errorf("%w: %w", retry.CustomTransientErr, err)
67-
}
68-
return err
69-
},
70-
)
71-
if err != nil {
72-
// This isn't serious enough even to warn on, so leave it at info-level.
73-
logger.Info().Err(err).
74-
Msg("Failed to append oplog note; change stream may need extra time to finish.")
75-
}
76-
77-
return optime, nil
40+
return optime, err
7841
}
7942

80-
// Use this when we just need the correct cluster time without
81-
// actually changing any shards’ oplogs.
82-
func fetchClusterTime(
43+
func syncClusterTimeAcrossShards(
8344
ctx context.Context,
8445
client *mongo.Client,
8546
) (primitive.Timestamp, error) {
86-
cmd, rawResponse, err := runAppendOplogNote(
87-
ctx,
88-
client,
89-
"expect StaleClusterTime error",
90-
bson.E{"maxClusterTime", primitive.Timestamp{1, 0}},
91-
)
92-
93-
// We expect an error here; if we didn't get one then something is
94-
// amiss on the server.
95-
if err == nil {
96-
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
47+
cmd := bson.D{
48+
{"appendOplogNote", 1},
49+
{"data", bson.D{
50+
{"migration-verifier", "syncing cluster time"},
51+
}},
9752
}
9853

99-
if !util.IsStaleClusterTimeError(err) {
100-
return primitive.Timestamp{}, errors.Wrap(
101-
err,
102-
"unexpected error (expected StaleClusterTime) from request",
103-
)
104-
}
54+
resp := client.Database("admin").RunCommand(ctx, cmd)
10555

106-
return getOpTimeFromRawResponse(rawResponse)
107-
}
108-
109-
func syncClusterTimeAcrossShards(
110-
ctx context.Context,
111-
client *mongo.Client,
112-
) (primitive.Timestamp, error) {
113-
_, rawResponse, err := runAppendOplogNote(ctx, client, "syncing cluster time")
56+
rawResponse, err := resp.Raw()
11457

11558
if err != nil {
116-
return primitive.Timestamp{}, err
59+
return primitive.Timestamp{}, errors.Wrapf(
60+
err,
61+
"command (%v) failed unexpectedly",
62+
cmd,
63+
)
11764
}
11865

11966
return getOpTimeFromRawResponse(rawResponse)
12067
}
12168

122-
func runAppendOplogNote(
123-
ctx context.Context,
124-
client *mongo.Client,
125-
note string,
126-
extraPieces ...bson.E,
127-
) (bson.D, bson.Raw, error) {
128-
cmd := append(
129-
bson.D{
130-
{"appendOplogNote", 1},
131-
{"data", bson.D{
132-
{"migration-verifier", note},
133-
}},
134-
},
135-
extraPieces...,
136-
)
137-
138-
resp := client.Database("admin").RunCommand(ctx, cmd)
139-
140-
raw, err := resp.Raw()
141-
142-
return cmd, raw, errors.Wrapf(
143-
err,
144-
"command (%v) failed unexpectedly",
145-
cmd,
146-
)
147-
}
148-
14969
func getOpTimeFromRawResponse(rawResponse bson.Raw) (primitive.Timestamp, error) {
150-
// Get the `operationTime` from the response and return it.
70+
// Return the response’s `operationTime`.
15171
var optime primitive.Timestamp
15272

15373
found, err := mbson.RawLookup(rawResponse, &optime, opTimeKeyInServerResponse)

internal/verifier/clustertime_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ func (suite *IntegrationTestSuite) TestGetClusterTime() {
66
ctx := context.Background()
77
logger, _ := getLoggerAndWriter("stdout")
88

9-
ts, err := GetClusterTime(ctx, logger, suite.srcMongoClient)
9+
ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient)
1010
suite.Require().NoError(err)
1111

1212
suite.Assert().NotZero(ts, "timestamp should be nonzero")

0 commit comments

Comments
 (0)