Skip to content

Commit 45924f3

Browse files
committed
Add failing changestream test for db and client
Change-Id: I5b0c82d80b45f13f36d9a7aca8418e3a5719264e
1 parent 79e6c40 commit 45924f3

File tree

1 file changed

+108
-1
lines changed

1 file changed

+108
-1
lines changed

mongo/change_stream_test.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"os"
1212
"testing"
13+
"time"
1314

1415
"github.com/mongodb/mongo-go-driver/bson"
1516
"github.com/mongodb/mongo-go-driver/bson/primitive"
@@ -288,7 +289,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
288289
t.Run("TestTrackResumeToken", func(t *testing.T) {
289290
// Stream must continuously track last seen resumeToken
290291

291-
coll, stream := createCollectionStream(t, "TrackTokenDB", "TrackTokenColl", Pipeline{})
292+
coll, stream := createCollectionStream(t, "TrackTokenDB", "TrackTokenColl", nil)
292293
defer closeCursor(stream)
293294

294295
cs := stream.(*changeStream)
@@ -560,4 +561,110 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
560561
// There's another test: ChangeStream will resume after a killCursors command is issued for its child cursor.
561562
// But, killCursors was already used to cause an error for the ResumeOnce test, so this does not need to be tested
562563
// again.
564+
565+
t.Run("Decode Doesn't Panic", func(t *testing.T) {
566+
skipIfBelow36(t)
567+
if os.Getenv("TOPOLOGY") != "replica_set" {
568+
t.Skip()
569+
}
570+
571+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
572+
defer cancel()
573+
client := createTestClient(t)
574+
client.writeConcern = wcMajority
575+
db := client.Database("changestream-decode-doesnt-panic")
576+
err := db.Drop(ctx)
577+
testhelpers.RequireNil(t, err, "error dropping db: %s", err)
578+
579+
t.Run("collection", func(t *testing.T) {
580+
coll := db.Collection("random-collection-one")
581+
coll.writeConcern = wcMajority
582+
_, err = coll.InsertOne(ctx, collectionStartingDoc) // create collection on server for 3.6
583+
584+
stream, err := coll.Watch(ctx, Pipeline{})
585+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
586+
defer stream.Close(ctx)
587+
588+
_, err = coll.InsertOne(ctx, bson.D{{"pi", 3.14159}})
589+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
590+
591+
if stream.Next(ctx) {
592+
var res bson.D
593+
err := stream.Decode(&res)
594+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
595+
if len(res) == 0 {
596+
t.Errorf("result is empty, was expecting change document")
597+
}
598+
}
599+
testhelpers.RequireNil(t, stream.Err(), "error while reading stream: %v", err)
600+
})
601+
t.Run("database", func(t *testing.T) {
602+
version, err := getServerVersion(createTestDatabase(t, nil))
603+
testhelpers.RequireNil(t, err, "error getting server version: %s", err)
604+
605+
if compareVersions(t, version, "4.0") < 0 {
606+
t.Skip("skipping for version < 4.0")
607+
}
608+
609+
coll := db.Collection("random-collection-one")
610+
coll.writeConcern = wcMajority
611+
_, err = coll.InsertOne(ctx, collectionStartingDoc) // create collection on server for 3.6
612+
613+
stream, err := db.Watch(ctx, Pipeline{})
614+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
615+
defer stream.Close(ctx)
616+
617+
_, err = coll.InsertOne(ctx, bson.D{{"pi", 3.14159}})
618+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
619+
620+
defer func() {
621+
if err := recover(); err != nil {
622+
t.Errorf("panic while attempting to decode: %v", err)
623+
}
624+
}()
625+
if stream.Next(ctx) {
626+
var res bson.D
627+
err := stream.Decode(&res)
628+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
629+
if len(res) == 0 {
630+
t.Errorf("result is empty, was expecting change document")
631+
}
632+
}
633+
testhelpers.RequireNil(t, stream.Err(), "error while reading stream: %v", err)
634+
})
635+
t.Run("client", func(t *testing.T) {
636+
version, err := getServerVersion(createTestDatabase(t, nil))
637+
testhelpers.RequireNil(t, err, "error getting server version: %s", err)
638+
639+
if compareVersions(t, version, "4.0") < 0 {
640+
t.Skip("skipping for version < 4.0")
641+
}
642+
643+
coll := db.Collection("random-collection-one")
644+
coll.writeConcern = wcMajority
645+
_, err = coll.InsertOne(ctx, collectionStartingDoc) // create collection on server for 3.6
646+
647+
stream, err := client.Watch(ctx, Pipeline{})
648+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
649+
defer stream.Close(ctx)
650+
651+
_, err = coll.InsertOne(ctx, bson.D{{"pi", 3.14159}})
652+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
653+
654+
defer func() {
655+
if err := recover(); err != nil {
656+
t.Errorf("panic while attempting to decode: %v", err)
657+
}
658+
}()
659+
if stream.Next(ctx) {
660+
var res bson.D
661+
err := stream.Decode(&res)
662+
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
663+
if len(res) == 0 {
664+
t.Errorf("result is empty, was expecting change document")
665+
}
666+
}
667+
testhelpers.RequireNil(t, stream.Err(), "error while reading stream: %v", err)
668+
})
669+
})
563670
}

0 commit comments

Comments
 (0)