Skip to content

Commit bf97abe

Browse files
author
iwysiu
committed
GODRIVER-1238 Closing a change stream without reading it returns an error
Change-Id: Ic366961626eb469d39d3bc10b9675d8e3331274e
1 parent b10ab43 commit bf97abe

File tree

3 files changed

+17
-16
lines changed

3 files changed

+17
-16
lines changed

mongo/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func (cs *ChangeStream) Close(ctx context.Context) error {
433433
ctx = context.Background()
434434
}
435435

436-
closeImplicitSession(cs.sess)
436+
defer closeImplicitSession(cs.sess)
437437

438438
if cs.cursor == nil {
439439
return nil // cursor is already closed

mongo/change_stream_spec_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ func TestChangeStreamSpec(t *testing.T) {
6969
}
7070
}
7171

72-
func closeCursor(stream *ChangeStream) {
73-
_ = stream.Close(ctx)
72+
func closeCursor(t *testing.T, stream *ChangeStream) {
73+
err := stream.Close(ctx)
74+
testhelpers.RequireNil(t, err, "error closing ChangeStream: %v", err)
7475
}
7576

7677
func getStreamOptions(t *testing.T, test *csTest) *options.ChangeStreamOptions {
@@ -260,7 +261,7 @@ func runCsTestFile(t *testing.T, globalClient *Client, path string) {
260261
return
261262
}
262263

263-
defer closeCursor(cursor) // end implicit session
264+
defer closeCursor(t, cursor) // end implicit session
264265

265266
// run operations
266267
for _, op := range test.Operations {

mongo/change_stream_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
336336
// Stream must continuously track last seen resumeToken
337337

338338
coll, stream := createCollectionStream(t, "TrackTokenDB", "TrackTokenColl", nil)
339-
defer closeCursor(stream)
339+
defer closeCursor(t, stream)
340340

341341
coll.writeConcern = wcMajority
342342
_, err := coll.InsertOne(ctx, doc1)
@@ -361,7 +361,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
361361
}
362362

363363
coll, stream := createCollectionStream(t, "MissingTokenDB", "MissingTokenColl", pipeline)
364-
defer closeCursor(stream)
364+
defer closeCursor(t, stream)
365365

366366
coll.writeConcern = wcMajority
367367
_, err := coll.InsertOne(ctx, doc1)
@@ -382,7 +382,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
382382
// pipeline and options, except for the addition/update of a resumeToken.
383383

384384
coll, stream := createMonitoredStream(t, "ResumeOnceDB", "ResumeOnceColl", nil)
385-
defer closeCursor(stream)
385+
defer closeCursor(t, stream)
386386
startCmd := (<-startedChan).Command
387387
startPipeline := startCmd.Lookup("pipeline").Array()
388388

@@ -447,7 +447,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
447447
for _, tc := range tests {
448448
t.Run(tc.name, func(t *testing.T) {
449449
_, stream := createMonitoredStream(t, "ResumeOnceDB", "ResumeOnceColl", nil)
450-
defer closeCursor(stream)
450+
defer closeCursor(t, stream)
451451
cs := stream
452452
cs.cursor = &errorBatchCursor{
453453
errCode: tc.errCode,
@@ -475,7 +475,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
475475
// Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not
476476

477477
_, stream := createCollectionStream(t, "CursorNotClosedDB", "CursorNotClosedColl", nil)
478-
defer closeCursor(stream)
478+
defer closeCursor(t, stream)
479479
cs := stream
480480

481481
if cs.sess.Terminated {
@@ -499,7 +499,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
499499
}
500500

501501
coll, stream := createMonitoredStream(t, "NoExceptionsDB", "NoExceptionsColl", nil)
502-
defer closeCursor(stream)
502+
defer closeCursor(t, stream)
503503
cs := stream
504504

505505
// kill cursor to force a resumable error
@@ -549,7 +549,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
549549
}
550550

551551
_, stream := createMonitoredStream(t, "IncludeTimeDB", "IncludeTimeColl", nil)
552-
defer closeCursor(stream)
552+
defer closeCursor(t, stream)
553553
cs := stream
554554

555555
// kill cursor to force a resumable error
@@ -699,7 +699,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
699699
// Test that the underlying cursor is advanced after a resumeable error occurs.
700700

701701
coll, stream := createCollectionStream(t, "ResumeNextDB", "ResumeNextColl", nil)
702-
defer closeCursor(stream)
702+
defer closeCursor(t, stream)
703703
ensureResumeToken(t, coll, stream)
704704

705705
// kill the stream's underlying cursor to force a resumeable error
@@ -751,7 +751,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
751751
}
752752
token := stream.ResumeToken()
753753
testhelpers.RequireNotNil(t, token, "got nil token")
754-
closeCursor(stream)
754+
closeCursor(t, stream)
755755

756756
cases := []struct {
757757
name string
@@ -771,7 +771,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
771771
drainChannels()
772772
stream, err := coll.Watch(ctx, Pipeline{}, tc.opts)
773773
testhelpers.RequireNil(t, err, "error restarting stream: %v", err)
774-
defer closeCursor(stream)
774+
defer closeCursor(t, stream)
775775
aggPbrt, _ := getAggregateInfo(t)
776776

777777
compareResumeTokens(t, stream, tc.expectedInitialToken)
@@ -814,7 +814,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
814814
}
815815
token := stream.ResumeToken()
816816
testhelpers.RequireNotNil(t, token, "got nil resume token")
817-
closeCursor(stream)
817+
closeCursor(t, stream)
818818

819819
cases := []struct {
820820
name string
@@ -829,7 +829,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
829829
t.Run(tc.name, func(t *testing.T) {
830830
stream, err := coll.Watch(ctx, Pipeline{}, tc.opts)
831831
testhelpers.RequireNil(t, err, "error restarting stream: %v", err)
832-
defer closeCursor(stream)
832+
defer closeCursor(t, stream)
833833
compareResumeTokens(t, stream, tc.expectedInitialToken)
834834

835835
// if the stream is not expected to have any results, do not try calling Next

0 commit comments

Comments
 (0)