Skip to content

Commit ff4c961

Browse files
committed
Add bsoncore.DocumentSequence for BatchCursor
Add the bsoncore.DocumentSequence type and use it in BatchCursor. GODRIVER-800 Change-Id: I3b95cce0e7693559d9b79541156622cb1ab3b977
1 parent 77cce54 commit ff4c961

File tree

11 files changed

+684
-147
lines changed

11 files changed

+684
-147
lines changed

mongo/batch_cursor.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package mongo
22

33
import (
44
"context"
5+
6+
"github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
57
)
68

79
// batchCursor is the interface implemented by types that can provide batches of document results.
@@ -13,14 +15,9 @@ type batchCursor interface {
1315
// Next returns true if there is a batch available.
1416
Next(context.Context) bool
1517

16-
// Batch appends the current batch of documents to dst. RequiredBytes can be used to determine
17-
// the length of the current batch of documents.
18-
//
19-
// If there is no batch available, this method should do nothing.
20-
Batch(dst []byte) []byte
21-
22-
// RequiredBytes returns the number of bytes required fo rthe current batch.
23-
RequiredBytes() int
18+
// Batch will return a DocumentSequence for the current batch of documents. The returned
19+
// DocumentSequence is only valid until the next call to Next or Close.
20+
Batch() *bsoncore.DocumentSequence
2421

2522
// Err returns the last error encountered.
2623
Err() error

mongo/cursor.go

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ package mongo
99
import (
1010
"context"
1111
"errors"
12+
"io"
1213

1314
"github.com/mongodb/mongo-go-driver/bson"
1415
"github.com/mongodb/mongo-go-driver/bson/bsoncodec"
16+
"github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
1517
"github.com/mongodb/mongo-go-driver/x/mongo/driver"
1618
)
1719

@@ -44,8 +46,7 @@ type Cursor struct {
4446
Current bson.Raw
4547

4648
bc batchCursor
47-
pos int
48-
batch []byte
49+
batch *bsoncore.DocumentSequence
4950
registry *bsoncodec.Registry
5051

5152
err error
@@ -58,7 +59,7 @@ func newCursor(bc batchCursor, registry *bsoncodec.Registry) (*Cursor, error) {
5859
if bc == nil {
5960
return nil, errors.New("batch cursor must not be nil")
6061
}
61-
return &Cursor{bc: bc, pos: 0, batch: make([]byte, 0, 256), registry: registry}, nil
62+
return &Cursor{bc: bc, registry: registry}, nil
6263
}
6364

6465
func newEmptyCursor() *Cursor {
@@ -68,42 +69,26 @@ func newEmptyCursor() *Cursor {
6869
// ID returns the ID of this cursor.
6970
func (c *Cursor) ID() int64 { return c.bc.ID() }
7071

71-
func (c *Cursor) advanceCurrentDocument() bool {
72-
if len(c.batch[c.pos:]) < 4 {
73-
c.err = errors.New("could not read next document: insufficient bytes")
74-
return false
75-
}
76-
length := (int(c.batch[c.pos]) | int(c.batch[c.pos+1])<<8 | int(c.batch[c.pos+2])<<16 | int(c.batch[c.pos+3])<<24)
77-
if len(c.batch[c.pos:]) < length {
78-
c.err = errors.New("could not read next document: insufficient bytes")
79-
return false
80-
}
81-
if len(c.Current) > 4 {
82-
c.Current[0], c.Current[1], c.Current[2], c.Current[3] = 0x00, 0x00, 0x00, 0x00 // Invalidate the current document
83-
}
84-
c.Current = c.batch[c.pos : c.pos+length]
85-
c.pos += length
86-
return true
87-
}
88-
8972
// Next gets the next result from this cursor. Returns true if there were no errors and the next
9073
// result is available for decoding.
9174
func (c *Cursor) Next(ctx context.Context) bool {
9275
if ctx == nil {
9376
ctx = context.Background()
9477
}
95-
if c.pos < len(c.batch) {
96-
return c.advanceCurrentDocument()
78+
doc, err := c.batch.Next()
79+
switch err {
80+
case nil:
81+
c.Current = bson.Raw(doc)
82+
return true
83+
case io.EOF: // Need to do a getMore
84+
default:
85+
c.err = err
86+
return false
9787
}
9888

99-
// clear the batch
100-
c.batch = c.batch[:0]
101-
c.pos = 0
102-
c.Current = c.Current[:0]
103-
10489
// call the Next method in a loop until at least one document is returned in the next batch or
10590
// the context times out.
106-
for len(c.batch) == 0 {
91+
for {
10792
// If we don't have a next batch
10893
if !c.bc.Next(ctx) {
10994
// Do we have an error? If so we return false.
@@ -119,10 +104,18 @@ func (c *Cursor) Next(ctx context.Context) bool {
119104
continue
120105
}
121106

122-
c.batch = c.bc.Batch(c.batch[:0])
107+
c.batch = c.bc.Batch()
108+
doc, err = c.batch.Next()
109+
switch err {
110+
case nil:
111+
c.Current = bson.Raw(doc)
112+
return true
113+
case io.EOF: // Empty batch so we continue
114+
default:
115+
c.err = err
116+
return false
117+
}
123118
}
124-
125-
return c.advanceCurrentDocument()
126119
}
127120

128121
// Decode will decode the current document into val.

x/bsonx/bsoncore/document_sequence.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package bsoncore
2+
3+
import (
4+
"errors"
5+
"io"
6+
7+
"github.com/mongodb/mongo-go-driver/bson/bsontype"
8+
)
9+
10+
// DocumentSequenceStyle is used to represent how a document sequence is laid out in a slice of
11+
// bytes.
12+
type DocumentSequenceStyle uint32
13+
14+
// These constants are the valid styles for a DocumentSequence.
15+
const (
16+
_ DocumentSequenceStyle = iota
17+
SequenceStyle
18+
ArrayStyle
19+
)
20+
21+
// DocumentSequence represents a sequence of documents. The Style field indicates how the documents
22+
// are laid out inside of the Data field.
23+
type DocumentSequence struct {
24+
Style DocumentSequenceStyle
25+
Data []byte
26+
Pos int
27+
}
28+
29+
// ErrCorruptedDocument is returned when a full document couldn't be read from the sequence.
30+
var ErrCorruptedDocument = errors.New("invalid DocumentSequence: corrupted document")
31+
32+
// ErrNonDocument is returned when a DocumentSequence contains a non-document BSON value.
33+
var ErrNonDocument = errors.New("invalid DocumentSequence: a non-document value was found in sequence")
34+
35+
// ErrInvalidDocumentSequenceStyle is returned when an unknown DocumentSequenceStyle is set on a
36+
// DocumentSequence.
37+
var ErrInvalidDocumentSequenceStyle = errors.New("invalid DocumentSequenceStyle")
38+
39+
// DocumentCount returns the number of documents in the sequence.
40+
func (ds *DocumentSequence) DocumentCount() int {
41+
if ds == nil {
42+
return 0
43+
}
44+
switch ds.Style {
45+
case SequenceStyle:
46+
var count int
47+
var ok bool
48+
rem := ds.Data
49+
for len(rem) > 0 {
50+
_, rem, ok = ReadDocument(rem)
51+
if !ok {
52+
return 0
53+
}
54+
count++
55+
}
56+
return count
57+
case ArrayStyle:
58+
_, rem, ok := ReadLength(ds.Data)
59+
if !ok {
60+
return 0
61+
}
62+
63+
var count int
64+
for len(rem) > 1 {
65+
_, rem, ok = ReadElement(rem)
66+
if !ok {
67+
return 0
68+
}
69+
count++
70+
}
71+
return count
72+
default:
73+
return 0
74+
}
75+
}
76+
77+
//ResetIterator resets the iteration point for the Next method to the beginning of the document
78+
//sequence.
79+
func (ds *DocumentSequence) ResetIterator() {
80+
if ds == nil {
81+
return
82+
}
83+
ds.Pos = 0
84+
}
85+
86+
// documents returns a slice of the documents. If nil either the Data field is also nil or could not
87+
// be properly read.
88+
func (ds *DocumentSequence) documents() ([]Document, error) {
89+
if ds == nil {
90+
return nil, nil
91+
}
92+
switch ds.Style {
93+
case SequenceStyle:
94+
rem := ds.Data
95+
var docs []Document
96+
var doc Document
97+
var ok bool
98+
for {
99+
doc, rem, ok = ReadDocument(rem)
100+
if !ok {
101+
if len(rem) == 0 {
102+
break
103+
}
104+
return nil, ErrCorruptedDocument
105+
}
106+
docs = append(docs, doc)
107+
}
108+
return docs, nil
109+
case ArrayStyle:
110+
if len(ds.Data) == 0 {
111+
return nil, nil
112+
}
113+
vals, err := Document(ds.Data).Values()
114+
if err != nil {
115+
return nil, ErrCorruptedDocument
116+
}
117+
docs := make([]Document, 0, len(vals))
118+
for _, v := range vals {
119+
if v.Type != bsontype.EmbeddedDocument {
120+
return nil, ErrNonDocument
121+
}
122+
docs = append(docs, v.Data)
123+
}
124+
return docs, nil
125+
default:
126+
return nil, ErrInvalidDocumentSequenceStyle
127+
}
128+
}
129+
130+
// Next retrieves the next document from this sequence and returns it. This method will return
131+
// io.EOF when it has reached the end of the sequence.
132+
func (ds *DocumentSequence) Next() (Document, error) {
133+
if ds == nil || ds.Pos >= len(ds.Data) {
134+
return nil, io.EOF
135+
}
136+
switch ds.Style {
137+
case SequenceStyle:
138+
doc, _, ok := ReadDocument(ds.Data[ds.Pos:])
139+
if !ok {
140+
return nil, ErrCorruptedDocument
141+
}
142+
ds.Pos += len(doc)
143+
return doc, nil
144+
case ArrayStyle:
145+
if ds.Pos < 4 {
146+
if len(ds.Data) < 4 {
147+
return nil, ErrCorruptedDocument
148+
}
149+
ds.Pos = 4 // Skip the length of the document
150+
}
151+
if len(ds.Data[ds.Pos:]) == 1 && ds.Data[ds.Pos] == 0x00 {
152+
return nil, io.EOF // At the end of the document
153+
}
154+
elem, _, ok := ReadElement(ds.Data[ds.Pos:])
155+
if !ok {
156+
return nil, ErrCorruptedDocument
157+
}
158+
ds.Pos += len(elem)
159+
val := elem.Value()
160+
if val.Type != bsontype.EmbeddedDocument {
161+
return nil, ErrNonDocument
162+
}
163+
return val.Data, nil
164+
default:
165+
return nil, ErrInvalidDocumentSequenceStyle
166+
}
167+
}

0 commit comments

Comments
 (0)