Skip to content

Commit be2638e

Browse files
committed
change stream test
1 parent 250a659 commit be2638e

File tree

3 files changed

+124
-49
lines changed

3 files changed

+124
-49
lines changed

internal/testutil/testutil.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package testutil
22

33
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/pkg/errors"
48
"go.mongodb.org/mongo-driver/bson"
59
"go.mongodb.org/mongo-driver/mongo"
10+
"go.mongodb.org/mongo-driver/mongo/options"
11+
"go.mongodb.org/mongo-driver/mongo/readconcern"
612
)
713

814
// Marshal wraps `bsonMarshal` with a panic on failure.
@@ -34,3 +40,64 @@ func convertDocsToAnys(docs []bson.D) []any {
3440

3541
return anys
3642
}
43+
44+
func KillApplicationChangeStreams(
45+
ctx context.Context,
46+
t *testing.T,
47+
client *mongo.Client,
48+
appName string,
49+
) error {
50+
// Kill verifier’s change stream.
51+
cursor, err := client.Database(
52+
"admin",
53+
options.Database().SetReadConcern(readconcern.Local()),
54+
).Aggregate(
55+
ctx,
56+
mongo.Pipeline{
57+
{
58+
{"$currentOp", bson.D{
59+
{"idleCursors", true},
60+
}},
61+
},
62+
{
63+
{"$match", bson.D{
64+
{"clientMetadata.application.name", appName},
65+
{"command.collection", "$cmd.aggregate"},
66+
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
67+
bson.D{{"$type", "object"}},
68+
},
69+
}},
70+
},
71+
},
72+
)
73+
if err != nil {
74+
return errors.Wrapf(err, "failed to list %#q's change streams", appName)
75+
}
76+
77+
ops := []struct {
78+
Opid any
79+
}{}
80+
err = cursor.All(ctx, &ops)
81+
if err != nil {
82+
return errors.Wrapf(err, "failed to decode %#q's change streams", appName)
83+
}
84+
85+
for _, op := range ops {
86+
t.Logf("Killing change stream op %+v", op.Opid)
87+
88+
err :=
89+
client.Database("admin").RunCommand(
90+
ctx,
91+
bson.D{
92+
{"killOp", 1},
93+
{"op", op.Opid},
94+
},
95+
).Err()
96+
97+
if err != nil {
98+
return errors.Wrapf(err, "failed to kill change stream with opId %#q", op.Opid)
99+
}
100+
}
101+
102+
return nil
103+
}

internal/verifier/change_stream_test.go

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"go.mongodb.org/mongo-driver/bson"
1616
"go.mongodb.org/mongo-driver/bson/primitive"
1717
"go.mongodb.org/mongo-driver/mongo"
18-
"go.mongodb.org/mongo-driver/mongo/options"
19-
"go.mongodb.org/mongo-driver/mongo/readconcern"
2018
)
2119

2220
func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
@@ -428,6 +426,52 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
428426
)
429427
}
430428

429+
func (suite *IntegrationTestSuite) TestWritesOffCursorKilledResilience() {
430+
ctx := suite.Context()
431+
432+
coll := suite.srcMongoClient.
433+
Database(suite.DBNameForTest()).
434+
Collection("mycoll")
435+
436+
suite.Require().NoError(
437+
coll.Database().CreateCollection(
438+
ctx,
439+
coll.Name(),
440+
),
441+
)
442+
443+
suite.Require().NoError(
444+
suite.dstMongoClient.
445+
Database(coll.Database().Name()).
446+
CreateCollection(
447+
ctx,
448+
coll.Name(),
449+
),
450+
)
451+
452+
for range 100 {
453+
verifier := suite.BuildVerifier()
454+
455+
docs := lo.RepeatBy(1_000, func(_ int) bson.D { return bson.D{} })
456+
_, err := coll.InsertMany(
457+
ctx,
458+
lo.ToAnySlice(docs),
459+
)
460+
suite.Require().NoError(err)
461+
462+
suite.Require().NoError(verifier.WritesOff(ctx))
463+
464+
suite.Require().NoError(
465+
testutil.KillApplicationChangeStreams(
466+
suite.Context(),
467+
suite.T(),
468+
suite.srcMongoClient,
469+
clientAppName,
470+
),
471+
)
472+
}
473+
}
474+
431475
func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
432476
ctx := suite.Context()
433477

@@ -445,54 +489,16 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
445489
// wait for generation 0 to end
446490
suite.Require().NoError(verifierRunner.AwaitGenerationEnd())
447491

448-
const mvName = "Migration Verifier"
449-
450-
// Kill verifier’s change stream.
451-
cursor, err := suite.srcMongoClient.Database(
452-
"admin",
453-
options.Database().SetReadConcern(readconcern.Local()),
454-
).Aggregate(
455-
ctx,
456-
mongo.Pipeline{
457-
{
458-
{"$currentOp", bson.D{
459-
{"idleCursors", true},
460-
}},
461-
},
462-
{
463-
{"$match", bson.D{
464-
{"clientMetadata.application.name", mvName},
465-
{"command.collection", "$cmd.aggregate"},
466-
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
467-
bson.D{{"$type", "object"}},
468-
},
469-
}},
470-
},
471-
},
492+
suite.Require().NoError(
493+
testutil.KillApplicationChangeStreams(
494+
suite.Context(),
495+
suite.T(),
496+
suite.srcMongoClient,
497+
clientAppName,
498+
),
472499
)
473-
suite.Require().NoError(err)
474-
475-
var ops []bson.Raw
476-
suite.Require().NoError(cursor.All(ctx, &ops))
477-
478-
for _, cursorRaw := range ops {
479-
opId, err := cursorRaw.LookupErr("opid")
480-
suite.Require().NoError(err, "should get opid from op")
481-
482-
suite.T().Logf("Killing change stream op %+v", opId)
483-
484-
suite.Require().NoError(
485-
suite.srcMongoClient.Database("admin").RunCommand(
486-
ctx,
487-
bson.D{
488-
{"killOp", 1},
489-
{"op", opId},
490-
},
491-
).Err(),
492-
)
493-
}
494500

495-
_, err = coll.InsertOne(
501+
_, err := coll.InsertOne(
496502
ctx,
497503
bson.D{{"_id", "after kill"}},
498504
)

internal/verifier/migration_verifier.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ const (
6868
okSymbol = "\u2705" // white heavy check mark
6969
infoSymbol = "\u24d8" // circled Latin small letter I
7070
notOkSymbol = "\u2757" // heavy exclamation mark symbol
71+
72+
clientAppName = "Migration Verifier"
7173
)
7274

7375
type whichCluster string
@@ -221,7 +223,7 @@ func (verifier *Verifier) ConfigureReadConcern(setting ReadConcernSetting) {
221223
}
222224

223225
func (verifier *Verifier) getClientOpts(uri string) *options.ClientOptions {
224-
appName := "Migration Verifier"
226+
appName := clientAppName
225227
opts := &options.ClientOptions{
226228
AppName: &appName,
227229
}

0 commit comments

Comments
 (0)