Skip to content

Commit 75ad49d

Browse files
committed
WIP
1 parent ceab570 commit 75ad49d

30 files changed

+731
-599
lines changed

mongo/client_bulk_write.go

Lines changed: 386 additions & 309 deletions
Large diffs are not rendered by default.

x/mongo/driver/batch_cursor.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,29 @@ type CursorResponse struct {
7171
postBatchResumeToken bsoncore.Document
7272
}
7373

74-
// NewCursorResponse constructs a cursor response from the given response and
75-
// server. If the provided database response does not contain a cursor, it
76-
// returns ErrNoCursor.
77-
//
78-
// NewCursorResponse can be used within the ProcessResponse method for an operation.
79-
func NewCursorResponse(info ResponseInfo) (CursorResponse, error) {
80-
response := info.ServerResponse
74+
// ExtractCursorDocument retrieves cursor document from a database response. If the
75+
// provided response does not contain a cursor, it returns ErrNoCursor.
76+
func ExtractCursorDocument(response bsoncore.Document) (bsoncore.Document, error) {
8177
cur, err := response.LookupErr("cursor")
8278
if errors.Is(err, bsoncore.ErrElementNotFound) {
83-
return CursorResponse{}, ErrNoCursor
79+
return nil, ErrNoCursor
8480
}
8581
if err != nil {
86-
return CursorResponse{}, fmt.Errorf("error getting cursor from database response: %w", err)
82+
return nil, fmt.Errorf("error getting cursor from database response: %w", err)
8783
}
8884
curDoc, ok := cur.DocumentOK()
8985
if !ok {
90-
return CursorResponse{}, fmt.Errorf("cursor should be an embedded document but is BSON type %s", cur.Type)
86+
return nil, fmt.Errorf("cursor should be an embedded document but is BSON type %s", cur.Type)
9187
}
92-
elems, err := curDoc.Elements()
88+
return curDoc, nil
89+
}
90+
91+
// NewCursorResponse constructs a cursor response from the given cursor document
92+
// extracted from a database response.
93+
//
94+
// NewCursorResponse can be used within the ProcessResponse method for an operation.
95+
func NewCursorResponse(response bsoncore.Document, info ResponseInfo) (CursorResponse, error) {
96+
elems, err := response.Elements()
9397
if err != nil {
9498
return CursorResponse{}, fmt.Errorf("error getting elements from cursor: %w", err)
9599
}
@@ -115,15 +119,17 @@ func NewCursorResponse(info ResponseInfo) (CursorResponse, error) {
115119
curresp.Database = database
116120
curresp.Collection = collection
117121
case "id":
118-
curresp.ID, ok = elem.Value().Int64OK()
122+
id, ok := elem.Value().Int64OK()
119123
if !ok {
120124
return CursorResponse{}, fmt.Errorf("id should be an int64 but it is a BSON %s", elem.Value().Type)
121125
}
126+
curresp.ID = id
122127
case "postBatchResumeToken":
123-
curresp.postBatchResumeToken, ok = elem.Value().DocumentOK()
128+
token, ok := elem.Value().DocumentOK()
124129
if !ok {
125130
return CursorResponse{}, fmt.Errorf("post batch resume token should be a document but it is a BSON %s", elem.Value().Type)
126131
}
132+
curresp.postBatchResumeToken = token
127133
}
128134
}
129135

@@ -393,8 +399,8 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
393399
},
394400
Database: bc.database,
395401
Deployment: bc.getOperationDeployment(),
396-
ProcessResponseFn: func(_ context.Context, info ResponseInfo) error {
397-
response := info.ServerResponse
402+
ProcessResponseFn: func(_ context.Context, response bsoncore.Document, info ResponseInfo) error {
403+
// response := info.ServerResponse
398404
id, ok := response.Lookup("cursor", "id").Int64OK()
399405
if !ok {
400406
return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)

x/mongo/driver/batches.go

Lines changed: 67 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,66 +7,98 @@
77
package driver
88

99
import (
10-
"errors"
10+
"fmt"
11+
"io"
12+
"strconv"
1113

1214
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
15+
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
1316
)
1417

1518
// ErrDocumentTooLarge occurs when a document that is larger than the maximum size accepted by a
1619
// server is passed to an insert command.
17-
var ErrDocumentTooLarge = errors.New("an inserted document is too large")
20+
type ErrDocumentTooLarge int
21+
22+
func (e ErrDocumentTooLarge) Error() string {
23+
return fmt.Sprintf("document %d is too large", int(e))
24+
}
1825

1926
// Batches contains the necessary information to batch split an operation. This is only used for write
2027
// operations.
2128
type Batches struct {
2229
Identifier string
2330
Documents []bsoncore.Document
24-
Current []bsoncore.Document
2531
Ordered *bool
26-
}
2732

28-
// ClearBatch clears the Current batch. This must be called before AdvanceBatch will advance to the
29-
// next batch.
30-
func (b *Batches) ClearBatch() { b.Current = b.Current[:0] }
31-
32-
// AdvanceBatch splits the next batch using maxCount and targetBatchSize. This method will do nothing if
33-
// the current batch has not been cleared. We do this so that when this is called during execute we
34-
// can call it without first needing to check if we already have a batch, which makes the code
35-
// simpler and makes retrying easier.
36-
// The maxDocSize parameter is used to check that any one document is not too large. If the first document is bigger
37-
// than targetBatchSize but smaller than maxDocSize, a batch of size 1 containing that document will be created.
38-
func (b *Batches) AdvanceBatch(maxCount, targetBatchSize, maxDocSize int) error {
39-
if len(b.Current) > 0 {
40-
return nil
41-
}
33+
offset int
34+
}
4235

43-
if maxCount <= 0 {
44-
maxCount = 1
36+
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
37+
if b.End() {
38+
return 0, dst, io.EOF
4539
}
46-
47-
splitAfter := 0
48-
size := 0
49-
for i, doc := range b.Documents {
50-
if i == maxCount {
40+
l := len(dst)
41+
var idx int32
42+
dst = wiremessage.AppendMsgSectionType(dst, wiremessage.DocumentSequence)
43+
idx, dst = bsoncore.ReserveLength(dst)
44+
dst = append(dst, b.Identifier...)
45+
dst = append(dst, 0x00)
46+
size := len(dst) - l
47+
var n int
48+
for i := b.offset; i < len(b.Documents); i++ {
49+
if n == maxCount {
5150
break
5251
}
52+
doc := b.Documents[i]
5353
if len(doc) > maxDocSize {
54-
return ErrDocumentTooLarge
54+
return 0, dst[:l], ErrDocumentTooLarge(i)
5555
}
56-
if size+len(doc) > targetBatchSize {
56+
size += len(doc)
57+
if size >= totalSize {
5758
break
5859
}
60+
dst = append(dst, doc...)
61+
n++
62+
}
63+
dst = bsoncore.UpdateLength(dst, idx, int32(len(dst[idx:])))
64+
return n, dst, nil
65+
}
5966

67+
func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
68+
if b.End() {
69+
return 0, dst, io.EOF
70+
}
71+
l := len(dst)
72+
aidx, dst := bsoncore.AppendArrayElementStart(dst, b.Identifier)
73+
size := len(dst) - l
74+
var n int
75+
for i := b.offset; i < len(b.Documents); i++ {
76+
if n == maxCount {
77+
break
78+
}
79+
doc := b.Documents[i]
80+
if len(doc) > maxDocSize {
81+
return 0, dst[:l], ErrDocumentTooLarge(i)
82+
}
6083
size += len(doc)
61-
splitAfter++
84+
if size >= totalSize {
85+
break
86+
}
87+
dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc)
88+
n++
6289
}
90+
dst, _ = bsoncore.AppendArrayEnd(dst, aidx)
91+
return n, dst, nil
92+
}
6393

64-
// if there are no documents, take the first one.
65-
// this can happen if there is a document that is smaller than maxDocSize but greater than targetBatchSize.
66-
if splitAfter == 0 {
67-
splitAfter = 1
68-
}
94+
func (b *Batches) IsOrdered() *bool {
95+
return b.Ordered
96+
}
97+
98+
func (b *Batches) AdvanceBatches(n int) {
99+
b.offset += n
100+
}
69101

70-
b.Current, b.Documents = b.Documents[:splitAfter], b.Documents[splitAfter:]
71-
return nil
102+
func (b *Batches) End() bool {
103+
return len(b.Documents) <= b.offset
72104
}

x/mongo/driver/batches_test.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,7 @@
66

77
package driver
88

9-
import (
10-
"testing"
11-
12-
"github.com/google/go-cmp/cmp"
13-
"go.mongodb.org/mongo-driver/internal/assert"
14-
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
15-
)
16-
9+
/*
1710
func TestBatches(t *testing.T) {
1811
t.Run("ClearBatch", func(t *testing.T) {
1912
batches := &Batches{Identifier: "documents", Current: make([]bsoncore.Document, 2, 10)}
@@ -113,3 +106,4 @@ func TestBatches(t *testing.T) {
113106
})
114107
})
115108
}
109+
*/

0 commit comments

Comments
 (0)