Skip to content

Commit ac7f51c

Browse files
author
Divjot Arora
authored
GODRIVER-1599 Expose batch length in Cursor (#402)
1 parent c335d0c commit ac7f51c

File tree

3 files changed

+143
-2
lines changed

3 files changed

+143
-2
lines changed

mongo/crud_examples_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,43 @@ func ExampleCursor_TryNext() {
708708
}
709709
}
710710

711+
func ExampleCursor_RemainingBatchLength() {
712+
// Because we're using a tailable cursor, this must be a handle to a capped collection.
713+
var coll *mongo.Collection
714+
715+
// Create a tailable await cursor. Specify the MaxAwaitTime option so requests to get more data will return if there
716+
// are no documents available after two seconds.
717+
findOpts := options.Find().
718+
SetCursorType(options.TailableAwait).
719+
SetMaxAwaitTime(2 * time.Second)
720+
cursor, err := coll.Find(context.TODO(), bson.D{}, findOpts)
721+
if err != nil {
722+
panic(err)
723+
}
724+
725+
for {
726+
// Iterate the cursor using TryNext.
727+
if cursor.TryNext(context.TODO()) {
728+
fmt.Println(cursor.Current)
729+
}
730+
731+
// Handle cursor errors or the cursor being closed by the server.
732+
if err = cursor.Err(); err != nil {
733+
panic(err)
734+
}
735+
if cursor.ID() == 0 {
736+
panic("cursor was unexpectedly closed by the server")
737+
}
738+
739+
// Use the RemainingBatchLength function to rate-limit the number of network requests the driver does. If the
740+
// current batch is empty, sleep for a short amount of time to let documents build up on the server before
741+
// the next TryNext call, which will do a network request.
742+
if cursor.RemainingBatchLength() == 0 {
743+
time.Sleep(100 * time.Millisecond)
744+
}
745+
}
746+
}
747+
711748
// ChangeStream examples
712749

713750
func ExampleChangeStream_Next() {

mongo/cursor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Cursor struct {
2828

2929
bc batchCursor
3030
batch *bsoncore.DocumentSequence
31+
batchLength int
3132
registry *bsoncodec.Registry
3233
clientSession *session.Client
3334

@@ -53,6 +54,10 @@ func newCursorWithSession(bc batchCursor, registry *bsoncodec.Registry, clientSe
5354
if bc.ID() == 0 {
5455
c.closeImplicitSession()
5556
}
57+
58+
// Initialize just the batchLength here so RemainingBatchLength will return an accurate result. The actual batch
59+
// will be pulled up by the first Next/TryNext call.
60+
c.batchLength = c.bc.Batch().DocumentCount()
5661
return c, nil
5762
}
5863

@@ -102,6 +107,8 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
102107
doc, err := c.batch.Next()
103108
switch err {
104109
case nil:
110+
// Consume the next document in the current batch.
111+
c.batchLength--
105112
c.Current = bson.Raw(doc)
106113
return true
107114
case io.EOF: // Need to do a getMore
@@ -138,10 +145,13 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
138145
c.closeImplicitSession()
139146
}
140147

148+
// Use the new batch to update the batch and batchLength fields. Consume the first document in the batch.
141149
c.batch = c.bc.Batch()
150+
c.batchLength = c.batch.DocumentCount()
142151
doc, err = c.batch.Next()
143152
switch err {
144153
case nil:
154+
c.batchLength--
145155
c.Current = bson.Raw(doc)
146156
return true
147157
case io.EOF: // Empty batch so we continue
@@ -208,6 +218,12 @@ func (c *Cursor) All(ctx context.Context, results interface{}) error {
208218
return nil
209219
}
210220

221+
// RemainingBatchLength returns the number of documents left in the current batch. If this returns zero, the subsequent
222+
// call to Next or TryNext will do a network request to fetch the next batch.
223+
func (c *Cursor) RemainingBatchLength() int {
224+
return c.batchLength
225+
}
226+
211227
// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value,
212228
// the next empty index in the slice, and an error if one occurs.
213229
func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence,

mongo/integration/cursor_test.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package integration
99
import (
1010
"context"
1111
"testing"
12+
"time"
1213

1314
"go.mongodb.org/mongo-driver/bson"
1415
"go.mongodb.org/mongo-driver/internal/testutil/assert"
@@ -24,6 +25,7 @@ const (
2425
func TestCursor(t *testing.T) {
2526
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
2627
defer mt.Close()
28+
cappedCollectionOpts := bson.D{{"capped", true}, {"size", 64 * 1024}}
2729

2830
// server versions 2.6 and 3.0 use OP_GET_MORE so this works on >= 3.2
2931
mt.RunOpts("cursor is killed on server", mtest.NewOptions().MinServerVersion("3.2"), func(mt *mtest.T) {
@@ -53,8 +55,7 @@ func TestCursor(t *testing.T) {
5355
defer cursor.Close(mtest.Background)
5456
tryNextExistingBatchTest(mt, cursor)
5557
})
56-
cappedOpts := bson.D{{"capped", true}, {"size", 64 * 1024}}
57-
mt.RunOpts("one getMore sent", mtest.NewOptions().CollectionCreateOptions(cappedOpts), func(mt *mtest.T) {
58+
mt.RunOpts("one getMore sent", mtest.NewOptions().CollectionCreateOptions(cappedCollectionOpts), func(mt *mtest.T) {
5859
// If the current batch is empty, TryNext should send one getMore and return.
5960

6061
// insert a document because a tailable cursor will only have a non-zero ID if the initial Find matches
@@ -82,6 +83,88 @@ func TestCursor(t *testing.T) {
8283
tryNextGetmoreError(mt, cursor)
8384
})
8485
})
86+
mt.RunOpts("RemainingBatchLength", noClientOpts, func(mt *mtest.T) {
87+
cappedMtOpts := mtest.NewOptions().CollectionCreateOptions(cappedCollectionOpts)
88+
mt.RunOpts("first batch is non empty", cappedMtOpts, func(mt *mtest.T) {
89+
// Test that the cursor reports the correct value for RemainingBatchLength at various execution points if
90+
// the first batch from the server is non-empty.
91+
92+
initCollection(mt, mt.Coll)
93+
94+
// Create a tailable await cursor with a low cursor timeout.
95+
batchSize := 2
96+
findOpts := options.Find().
97+
SetBatchSize(int32(batchSize)).
98+
SetCursorType(options.TailableAwait).
99+
SetMaxAwaitTime(100 * time.Millisecond)
100+
cursor, err := mt.Coll.Find(mtest.Background, bson.D{}, findOpts)
101+
assert.Nil(mt, err, "Find error: %v", err)
102+
defer cursor.Close(mtest.Background)
103+
104+
mt.ClearEvents()
105+
106+
// The initial batch length should be equal to the batchSize. Do batchSize Next calls to exhaust the current
107+
// batch and assert that no getMore was done.
108+
assertCursorBatchLength(mt, cursor, batchSize)
109+
for i := 0; i < batchSize; i++ {
110+
prevLength := cursor.RemainingBatchLength()
111+
if !cursor.Next(mtest.Background) {
112+
mt.Fatalf("expected Next to return true on index %d; cursor err: %v", i, cursor.Err())
113+
}
114+
115+
// Each successful Next call should decrement batch length by 1.
116+
assertCursorBatchLength(mt, cursor, prevLength-1)
117+
}
118+
evt := mt.GetStartedEvent()
119+
assert.Nil(mt, evt, "expected no events, got %v", evt)
120+
121+
// The batch is exhaused, so the batch length should be 0. Do one Next call, which should do a getMore and
122+
// fetch batchSize more documents. The batch length after the call should be (batchSize-1) because Next consumes
123+
// one document.
124+
assertCursorBatchLength(mt, cursor, 0)
125+
126+
assert.True(mt, cursor.Next(mtest.Background), "expected Next to return true; cursor err: %v", cursor.Err())
127+
evt = mt.GetStartedEvent()
128+
assert.NotNil(mt, evt, "expected CommandStartedEvent, got nil")
129+
assert.Equal(mt, "getMore", evt.CommandName, "expected command %q, got %q", "getMore", evt.CommandName)
130+
131+
assertCursorBatchLength(mt, cursor, batchSize-1)
132+
})
133+
mt.RunOpts("first batch is empty", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
134+
// Test that the cursor reports the correct value for RemainingBatchLength if the first batch is empty.
135+
// Using a mock deployment simplifies this test becuase the server won't create a valid cursor if the
136+
// collection is empty when the find is run.
137+
138+
cursorID := int64(50)
139+
ns := mt.DB.Name() + "." + mt.Coll.Name()
140+
getMoreBatch := []bson.D{
141+
{{"x", 1}},
142+
{{"x", 2}},
143+
}
144+
145+
// Create mock responses.
146+
find := mtest.CreateCursorResponse(cursorID, ns, mtest.FirstBatch)
147+
getMore := mtest.CreateCursorResponse(cursorID, ns, mtest.NextBatch, getMoreBatch...)
148+
killCursors := mtest.CreateSuccessResponse()
149+
mt.AddMockResponses(find, getMore, killCursors)
150+
151+
cursor, err := mt.Coll.Find(mtest.Background, bson.D{})
152+
assert.Nil(mt, err, "Find error: %v", err)
153+
defer cursor.Close(mtest.Background)
154+
mt.ClearEvents()
155+
156+
for {
157+
if cursor.TryNext(mtest.Background) {
158+
break
159+
}
160+
161+
assert.Nil(mt, cursor.Err(), "cursor error: %v", err)
162+
assertCursorBatchLength(mt, cursor, 0)
163+
}
164+
// TryNext consumes one document so the remaining batch size should be len(getMoreBatch)-1.
165+
assertCursorBatchLength(mt, cursor, len(getMoreBatch)-1)
166+
})
167+
})
85168
}
86169

87170
type tryNextCursor interface {
@@ -133,3 +216,8 @@ func tryNextGetmoreError(mt *mtest.T, cursor tryNextCursor) {
133216
err := cursor.Err()
134217
assert.NotNil(mt, err, "expected change stream error, got nil")
135218
}
219+
220+
func assertCursorBatchLength(mt *mtest.T, cursor *mongo.Cursor, expected int) {
221+
batchLen := cursor.RemainingBatchLength()
222+
assert.Equal(mt, expected, batchLen, "expected remaining batch length %d, got %d", expected, batchLen)
223+
}

0 commit comments

Comments
 (0)