Skip to content

Commit 1d05e22

Browse files
committed
save
1 parent 140512b commit 1d05e22

File tree

3 files changed

+117
-17
lines changed

3 files changed

+117
-17
lines changed

internal/verifier/change_stream.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
6666
for i, changeEvent := range batch {
6767
if changeEvent.ClusterTime != nil &&
6868
(verifier.lastChangeEventTime == nil ||
69-
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
69+
verifier.lastChangeEventTime.Before(*changeEvent.ClusterTime)) {
7070
verifier.lastChangeEventTime = changeEvent.ClusterTime
7171
}
7272
switch changeEvent.OpType {
@@ -223,11 +223,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
223223
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
224224
}
225225

226-
if curTs == writesOffTs || curTs.After(writesOffTs) {
226+
if !curTs.Before(writesOffTs) {
227227
verifier.logger.Debug().
228228
Interface("currentTimestamp", curTs).
229229
Interface("writesOffTimestamp", writesOffTs).
230-
Msg("Change stream has reached the writesOff timestamp. Shutting down.")
230+
Msg("Change stream has passed the writesOff timestamp. Shutting down.")
231231

232232
break
233233
}

internal/verifier/change_stream_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"go.mongodb.org/mongo-driver/bson"
1212
"go.mongodb.org/mongo-driver/bson/primitive"
1313
"go.mongodb.org/mongo-driver/mongo"
14+
"go.mongodb.org/mongo-driver/mongo/options"
15+
"go.mongodb.org/mongo-driver/mongo/readconcern"
1416
)
1517

1618
func TestChangeStreamFilter(t *testing.T) {
@@ -241,6 +243,106 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
241243
)
242244
}
243245

246+
func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
247+
ctx := suite.Context()
248+
249+
verifier := suite.BuildVerifier()
250+
251+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
252+
coll := db.Collection("mycoll")
253+
suite.Require().NoError(
254+
db.CreateCollection(ctx, coll.Name()),
255+
)
256+
257+
// start verifier
258+
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
259+
260+
// wait for generation 0 to end
261+
verifierRunner.AwaitGenerationEnd()
262+
263+
const mvName = "Migration Verifier"
264+
265+
// Kill verifier’s change stream.
266+
cursor, err := suite.srcMongoClient.Database(
267+
"admin",
268+
options.Database().SetReadConcern(readconcern.Local()),
269+
).Aggregate(
270+
ctx,
271+
mongo.Pipeline{
272+
{
273+
{"$currentOp", bson.D{
274+
{"idleCursors", true},
275+
}},
276+
},
277+
{
278+
{"$match", bson.D{
279+
{"clientMetadata.application.name", mvName},
280+
{"command.collection", "$cmd.aggregate"},
281+
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
282+
bson.D{{"$type", "object"}},
283+
},
284+
}},
285+
},
286+
},
287+
)
288+
suite.Require().NoError(err)
289+
290+
var ops []bson.Raw
291+
suite.Require().NoError(cursor.All(ctx, &ops))
292+
293+
/*
294+
_, err = coll.InsertOne(
295+
ctx,
296+
bson.D{{"_id", "before kill"}},
297+
)
298+
suite.Require().NoError(err)
299+
*/
300+
301+
for _, cursorRaw := range ops {
302+
opId, err := cursorRaw.LookupErr("opid")
303+
suite.Require().NoError(err, "should get opid from op")
304+
305+
suite.T().Logf("Killing change stream op %+v", opId)
306+
307+
suite.Require().NoError(
308+
suite.srcMongoClient.Database("admin").RunCommand(
309+
ctx,
310+
bson.D{
311+
{"killOp", 1},
312+
{"op", opId},
313+
},
314+
).Err(),
315+
)
316+
}
317+
318+
sess, err := suite.srcMongoClient.StartSession()
319+
suite.Require().NoError(err)
320+
_, err = coll.InsertOne(
321+
mongo.NewSessionContext(ctx, sess),
322+
bson.D{{"_id", "after kill"}},
323+
)
324+
suite.Require().NoError(err)
325+
326+
suite.T().Logf("cluster time after insert: %+v", sess.ClusterTime())
327+
328+
suite.Require().NoError(verifier.WritesOff(ctx))
329+
330+
suite.Require().NoError(verifierRunner.Await())
331+
332+
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
333+
ctx,
334+
verifier.verificationTaskCollection(),
335+
verificationTaskVerifyDocuments,
336+
verifier.generation,
337+
)
338+
suite.Require().NoError(err)
339+
340+
suite.Assert().Zero(incompleteTasks, "no incomplete tasks")
341+
suite.Require().Len(failedTasks, 1, "expect one failed task")
342+
343+
suite.T().Logf("task: %+v", failedTasks[0])
344+
}
345+
244346
func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
245347
ctx := suite.Context()
246348

internal/verifier/clustertime.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func GetNewClusterTime(
3535
logger,
3636
func(_ *retry.Info) error {
3737
var err error
38-
clusterTime, err = fetchClusterTime(ctx, client)
38+
clusterTime, err = fetchNewClusterTime(ctx, client)
3939
return err
4040
},
4141
)
@@ -67,27 +67,22 @@ func GetNewClusterTime(
6767

6868
// Use this when we just need the correct cluster time without
6969
// actually changing any shards’ oplogs.
70-
func fetchClusterTime(
70+
func fetchNewClusterTime(
7171
ctx context.Context,
7272
client *mongo.Client,
7373
) (primitive.Timestamp, error) {
7474
cmd, rawResponse, err := runAppendOplogNote(
7575
ctx,
7676
client,
7777
"expect StaleClusterTime error",
78-
primitive.Timestamp{1, 0},
78+
option.None[primitive.Timestamp](),
7979
)
8080

81-
// We expect an error here; if we didn't get one then something is
82-
// amiss on the server.
83-
if err == nil {
84-
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
85-
}
86-
87-
if !util.IsStaleClusterTimeError(err) {
88-
return primitive.Timestamp{}, errors.Wrap(
81+
if err != nil {
82+
return primitive.Timestamp{}, errors.Wrapf(
8983
err,
90-
"unexpected error (expected StaleClusterTime) from request",
84+
"unexpected error (expected StaleClusterTime) from request (%v)",
85+
cmd,
9186
)
9287
}
9388

@@ -123,16 +118,19 @@ func runAppendOplogNote(
123118
ctx context.Context,
124119
client *mongo.Client,
125120
note string,
126-
maxClusterTime primitive.Timestamp,
121+
maxClusterTimeOpt option.Option[primitive.Timestamp],
127122
) (bson.D, bson.Raw, error) {
128123
cmd := bson.D{
129124
{"appendOplogNote", 1},
130-
{"maxClusterTime", maxClusterTime},
131125
{"data", bson.D{
132126
{"migration-verifier", note},
133127
}},
134128
}
135129

130+
if maxClusterTime, has := maxClusterTimeOpt.Get(); has {
131+
cmd = append(cmd, bson.E{"maxClusterTime", maxClusterTime})
132+
}
133+
136134
resp := client.
137135
Database(
138136
"admin",

0 commit comments

Comments
 (0)