Skip to content

Commit fdda312

Browse files
Update Cursor.Next to call getMore in a loop
GODRIVER-490 Change-Id: Id6538bb914a7538781c8270d323efd572033ca3f
1 parent 9b79475 commit fdda312

File tree

4 files changed

+265
-10
lines changed

4 files changed

+265
-10
lines changed

core/integration/cursor_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package integration
8+
9+
import (
10+
"bytes"
11+
"context"
12+
"testing"
13+
"time"
14+
15+
"github.com/mongodb/mongo-go-driver/bson"
16+
"github.com/mongodb/mongo-go-driver/core/command"
17+
"github.com/mongodb/mongo-go-driver/core/description"
18+
"github.com/mongodb/mongo-go-driver/core/option"
19+
"github.com/mongodb/mongo-go-driver/core/writeconcern"
20+
"github.com/mongodb/mongo-go-driver/internal/testutil"
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
func TestTailableCursorLoopsUntilDocsAvailable(t *testing.T) {
25+
server, err := testutil.Topology(t).SelectServer(context.Background(), description.WriteSelector())
26+
noerr(t, err)
27+
28+
// create capped collection
29+
createCmd := bson.NewDocument(
30+
bson.EC.String("create", testutil.ColName(t)),
31+
bson.EC.Boolean("capped", true),
32+
bson.EC.Int32("size", 1000))
33+
_, err = testutil.RunCommand(t, server.Server, dbName, createCmd)
34+
35+
conn, err := server.Connection(context.Background())
36+
noerr(t, err)
37+
38+
// Insert a document
39+
d := bson.NewDocument(bson.EC.Int32("_id", 1), bson.EC.Timestamp("ts", 5, 0))
40+
wc := writeconcern.New(writeconcern.WMajority())
41+
testutil.AutoInsertDocs(t, wc, d)
42+
43+
rdr, err := d.MarshalBSON()
44+
noerr(t, err)
45+
46+
// find that document, setting cursor type to TAILABLEAWAIT
47+
cursor, err := (&command.Find{
48+
NS: command.Namespace{DB: dbName, Collection: testutil.ColName(t)},
49+
Filter: bson.NewDocument(bson.EC.SubDocument("ts", bson.NewDocument(bson.EC.Timestamp("$gte", 5, 0)))),
50+
Opts: []option.FindOptioner{
51+
option.OptLimit(0),
52+
option.OptBatchSize(1),
53+
option.OptCursorType(option.TailableAwait)},
54+
}).RoundTrip(context.Background(), server.SelectedDescription(), server, conn)
55+
noerr(t, err)
56+
57+
// assert that there is a document returned
58+
assert.True(t, cursor.Next(context.Background()), "Cursor should have a next result")
59+
60+
// make sure it's the right document
61+
var next = make(bson.Reader, 1024)
62+
err = cursor.Decode(next)
63+
noerr(t, err)
64+
65+
if !bytes.Equal(next[:len(rdr)], rdr) {
66+
t.Errorf("Did not get expected document. got %v; want %v", bson.Reader(next[:len(rdr)]), bson.Reader(rdr))
67+
}
68+
69+
// insert another document in 500 MS
70+
d = bson.NewDocument(bson.EC.Int32("_id", 2), bson.EC.Timestamp("ts", 6, 0))
71+
72+
rdr, err = d.MarshalBSON()
73+
noerr(t, err)
74+
75+
go func() {
76+
time.Sleep(time.Millisecond * 500)
77+
testutil.AutoInsertDocs(t, wc, d)
78+
}()
79+
80+
// context with timeout so test fails if loop does not work as expected
81+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
82+
defer cancel()
83+
84+
// assert that there is another document returned
85+
// cursor.Next should loop calling getMore until a document becomes available (in 500 ms)
86+
assert.True(t, cursor.Next(ctx), "Cursor should have a next result")
87+
88+
noerr(t, cursor.Err())
89+
90+
// make sure it's the right document the second time
91+
next = make(bson.Reader, 1024)
92+
err = cursor.Decode(next)
93+
noerr(t, err)
94+
95+
if !bytes.Equal(next[:len(rdr)], rdr) {
96+
t.Errorf("Did not get expected document. got %v; want %v", bson.Reader(next[:len(rdr)]), bson.Reader(rdr))
97+
}
98+
}

core/topology/cursor.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,14 @@ func (c *cursor) Next(ctx context.Context) bool {
107107
}
108108

109109
c.getMore(ctx)
110-
if c.err != nil {
111-
return false
112-
}
113110

114-
if c.batch.Len() == 0 {
115-
return false
111+
// call the getMore command in a loop until at least one document is returned in the next batch
112+
for c.batch.Len() == 0 {
113+
if c.err != nil || (c.id == 0 && c.batch.Len() == 0) {
114+
return false
115+
}
116+
117+
c.getMore(ctx)
116118
}
117119

118120
return true

core/topology/cursor_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@
77
package topology
88

99
import (
10+
"context"
11+
"errors"
1012
"testing"
1113

1214
"github.com/mongodb/mongo-go-driver/bson"
15+
"github.com/mongodb/mongo-go-driver/core/connection"
16+
"github.com/mongodb/mongo-go-driver/core/description"
17+
"github.com/mongodb/mongo-go-driver/core/wiremessage"
18+
"github.com/mongodb/mongo-go-driver/internal"
1319
"github.com/stretchr/testify/assert"
1420
)
1521

@@ -29,3 +35,151 @@ func TestCursorNextDoesNotPanicIfContextisNil(t *testing.T) {
2935
})
3036
assert.True(t, iterNext)
3137
}
38+
39+
func TestCursorLoopsUntilDocAvailable(t *testing.T) {
40+
// Next should loop until at least one doc is available
41+
// Here, the mock pool and connection implementations (below) write
42+
// empty batch responses a few times before returning a non-empty batch
43+
44+
s := createDefaultConnectedServer(t, false)
45+
c := cursor{
46+
id: 1,
47+
batch: bson.NewArray(),
48+
server: s,
49+
}
50+
51+
assert.True(t, c.Next(nil))
52+
}
53+
54+
func TestCursorReturnsFalseOnContextCancellation(t *testing.T) {
55+
// Next should return false if an error occurs
56+
// here the error is the Context being cancelled
57+
58+
s := createDefaultConnectedServer(t, false)
59+
c := cursor{
60+
id: 1,
61+
batch: bson.NewArray(),
62+
server: s,
63+
}
64+
65+
ctx, cancel := context.WithCancel(context.Background())
66+
67+
cancel()
68+
69+
assert.False(t, c.Next(ctx))
70+
}
71+
72+
func TestCursorNextReturnsFalseIfErrorOccurred(t *testing.T) {
73+
// Next should return false if an error occurs
74+
// here the error is an invalid namespace (""."")
75+
76+
s := createDefaultConnectedServer(t, true)
77+
c := cursor{
78+
id: 1,
79+
batch: bson.NewArray(),
80+
server: s,
81+
}
82+
assert.False(t, c.Next(nil))
83+
}
84+
85+
func TestCursorNextReturnsFalseIfResIdZeroAndNoMoreDocs(t *testing.T) {
86+
// Next should return false if the cursor id is 0 and there are no documents in the next batch
87+
88+
c := cursor{id: 0, batch: bson.NewArray()}
89+
assert.False(t, c.Next(nil))
90+
}
91+
92+
func createDefaultConnectedServer(t *testing.T, willErr bool) *Server {
93+
s, err := ConnectServer(nil, "127.0.0.1")
94+
s.pool = &mockPool{t: t, willErr: willErr}
95+
if err != nil {
96+
assert.Fail(t, "Server creation failed")
97+
}
98+
99+
return s
100+
}
101+
102+
func createOKBatchReplyDoc(id int64, batchDocs *bson.Array) *bson.Document {
103+
return bson.NewDocument(
104+
bson.EC.Int32("ok", 1),
105+
bson.EC.SubDocument(
106+
"cursor",
107+
bson.NewDocument(
108+
bson.EC.Int64("id", id),
109+
bson.EC.Array("nextBatch", batchDocs))))
110+
}
111+
112+
// Mock Pool implementation
113+
type mockPool struct {
114+
t *testing.T
115+
willErr bool
116+
writes int // the number of wire messages written so far
117+
}
118+
119+
func (m *mockPool) Get(ctx context.Context) (connection.Connection, *description.Server, error) {
120+
m.writes++
121+
return &mockConnection{willErr: m.willErr, writes: m.writes}, nil, nil
122+
}
123+
124+
func (*mockPool) Connect(ctx context.Context) error {
125+
return nil
126+
}
127+
128+
func (*mockPool) Disconnect(ctx context.Context) error {
129+
return nil
130+
}
131+
132+
func (*mockPool) Drain() error {
133+
return nil
134+
}
135+
136+
// Mock Connection implementation that
137+
type mockConnection struct {
138+
t *testing.T
139+
willErr bool
140+
writes int // the number of wire messages written so far
141+
}
142+
143+
// this mock will not actually write anything
144+
func (*mockConnection) WriteWireMessage(ctx context.Context, wm wiremessage.WireMessage) error {
145+
select {
146+
case <-ctx.Done():
147+
return errors.New("intentional mock error")
148+
default:
149+
return nil
150+
}
151+
}
152+
153+
// mock a read by returning an empty cursor result until
154+
func (m *mockConnection) ReadWireMessage(ctx context.Context) (wiremessage.WireMessage, error) {
155+
if m.writes < 4 {
156+
// write empty batch
157+
d := createOKBatchReplyDoc(2, bson.NewArray())
158+
159+
return internal.MakeReply(m.t, d), nil
160+
} else if m.willErr {
161+
// write error
162+
return nil, errors.New("intentional mock error")
163+
} else {
164+
// write non-empty batch
165+
d := createOKBatchReplyDoc(2, bson.NewArray(bson.VC.String("a")))
166+
167+
return internal.MakeReply(m.t, d), nil
168+
}
169+
}
170+
171+
func (*mockConnection) Close() error {
172+
return nil
173+
}
174+
175+
func (*mockConnection) Expired() bool {
176+
return false
177+
}
178+
179+
func (*mockConnection) Alive() bool {
180+
return true
181+
}
182+
183+
func (*mockConnection) ID() string {
184+
return ""
185+
}

mongo/change_stream_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,12 @@ func TestChangeStream_resumeAfterKillCursors(t *testing.T) {
256256
_, err = killCursors.RoundTrip(context.Background(), ss.Description(), conn)
257257
require.NoError(t, err)
258258

259-
require.False(t, changes.Next(context.Background()))
260-
require.NoError(t, changes.Err())
261-
262-
_, err = coll.InsertOne(context.Background(), bson.NewDocument(bson.EC.Int32("x", 1)))
263-
require.NoError(t, err)
259+
// insert a document after blocking call to getNextChange below
260+
go func() {
261+
time.Sleep(time.Millisecond * 500)
262+
_, err = coll.InsertOne(context.Background(), bson.NewDocument(bson.EC.Int32("x", 1)))
263+
require.NoError(t, err)
264+
}()
264265

265266
getNextChange(changes)
266267
require.NoError(t, changes.Decode(bson.NewDocument()))

0 commit comments

Comments
 (0)