Skip to content

Commit 45d22f8

Browse files
committed
WIP
1 parent 63a973b commit 45d22f8

File tree

8 files changed

+129
-110
lines changed

8 files changed

+129
-110
lines changed

mongo/client_bulk_write.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,16 @@ func (mb *modelBatches) IsOrdered() *bool {
161161

162162
func (mb *modelBatches) AdvanceBatches(n int) {
163163
mb.offset += n
164+
if mb.offset > len(mb.models) {
165+
mb.offset = len(mb.models)
166+
}
164167
}
165168

166-
func (mb *modelBatches) End() bool {
167-
return len(mb.models) <= mb.offset
169+
func (mb *modelBatches) Size() int {
170+
if mb.offset > len(mb.models) {
171+
return 0
172+
}
173+
return len(mb.models) - mb.offset
168174
}
169175

170176
func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
@@ -181,7 +187,7 @@ func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, to
181187
dst = append(dst, doc...)
182188
return dst
183189
},
184-
appendEnd: func(dst []byte, idx, length int32) []byte {
190+
updateLength: func(dst []byte, idx, length int32) []byte {
185191
dst = bsoncore.UpdateLength(dst, idx, length)
186192
return dst
187193
},
@@ -193,7 +199,7 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total
193199
fn := functionSet{
194200
appendStart: bsoncore.AppendArrayElementStart,
195201
appendDocument: bsoncore.AppendDocumentElement,
196-
appendEnd: func(dst []byte, idx, _ int32) []byte {
202+
updateLength: func(dst []byte, idx, _ int32) []byte {
197203
dst, _ = bsoncore.AppendArrayEnd(dst, idx)
198204
return dst
199205
},
@@ -204,11 +210,11 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total
204210
type functionSet struct {
205211
appendStart func([]byte, string) (int32, []byte)
206212
appendDocument func([]byte, string, []byte) []byte
207-
appendEnd func([]byte, int32, int32) []byte
213+
updateLength func([]byte, int32, int32) []byte
208214
}
209215

210216
func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
211-
if mb.End() {
217+
if mb.Size() == 0 {
212218
return 0, dst, io.EOF
213219
}
214220

@@ -344,8 +350,8 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
344350
return 0, dst[:l], nil
345351
}
346352

347-
dst = fn.appendEnd(dst, opsIdx, int32(len(dst[opsIdx:])))
348-
nsDst = fn.appendEnd(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
353+
dst = fn.updateLength(dst, opsIdx, int32(len(dst[opsIdx:])))
354+
nsDst = fn.updateLength(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
349355
dst = append(dst, nsDst...)
350356

351357
mb.retryMode = driver.RetryNone
@@ -417,7 +423,10 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
417423
ok := true
418424
for cursor.Next(ctx) {
419425
var cur cursorInfo
420-
cursor.Decode(&cur)
426+
err = cursor.Decode(&cur)
427+
if err != nil {
428+
return err
429+
}
421430
if int(cur.Idx) >= len(mb.cursorHandlers) {
422431
continue
423432
}
@@ -483,7 +492,7 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
483492
result.ModifiedCount = int64(*cur.NModified)
484493
}
485494
if cur.Upserted != nil {
486-
result.UpsertedID = (*cur.Upserted).ID
495+
result.UpsertedID = cur.Upserted.ID
487496
}
488497
mb.result.UpdateResults[int(cur.Idx)] = result
489498
if err := cur.extractError(); err != nil {

mongo/integration/client_side_encryption_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func TestClientSideEncryptionCustomCrypt(t *testing.T) {
396396
"expected 0 calls to DecryptExplicit, got %v", cc.numDecryptExplicitCalls)
397397
assert.Equal(mt, cc.numCloseCalls, 0,
398398
"expected 0 calls to Close, got %v", cc.numCloseCalls)
399-
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 2,
399+
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 1,
400400
"expected 2 calls to BypassAutoEncryption, got %v", cc.numBypassAutoEncryptionCalls)
401401
})
402402
}

mongo/integration/crud_prose_test.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func TestClientBulkWrite(t *testing.T) {
420420
mt.Run("input with greater than maxWriteBatchSize", func(mt *mtest.T) {
421421
var opsCnt []int
422422
monitor := &event.CommandMonitor{
423-
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
423+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
424424
if e.CommandName == "bulkWrite" {
425425
v := e.Command.Lookup("ops")
426426
elems, err := v.Array().Elements()
@@ -453,7 +453,7 @@ func TestClientBulkWrite(t *testing.T) {
453453
mt.Run("input with greater than maxMessageSizeBytes", func(mt *mtest.T) {
454454
var opsCnt []int
455455
monitor := &event.CommandMonitor{
456-
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
456+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
457457
if e.CommandName == "bulkWrite" {
458458
v := e.Command.Lookup("ops")
459459
elems, err := v.Array().Elements()
@@ -533,7 +533,7 @@ func TestClientBulkWrite(t *testing.T) {
533533

534534
var eventCnt int
535535
monitor := &event.CommandMonitor{
536-
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
536+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
537537
if e.CommandName == "bulkWrite" {
538538
eventCnt++
539539
}
@@ -578,11 +578,12 @@ func TestClientBulkWrite(t *testing.T) {
578578

579579
mt.Run("bulkWrite handles a cursor requiring a getMore", func(mt *mtest.T) {
580580
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
581-
coll.Drop(context.Background())
581+
err := coll.Drop(context.Background())
582+
require.NoError(mt, err, "Drop error")
582583

583584
var getMoreCalled bool
584585
monitor := &event.CommandMonitor{
585-
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
586+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
586587
if e.CommandName == "getMore" {
587588
getMoreCalled = true
588589
}
@@ -592,7 +593,8 @@ func TestClientBulkWrite(t *testing.T) {
592593
var hello struct {
593594
MaxBsonObjectSize int
594595
}
595-
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
596+
err = mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello)
597+
require.NoError(mt, err, "Hello error")
596598
upsert := true
597599
models := (&mongo.ClientWriteModels{}).
598600
AppendUpdateOne(&mongo.ClientUpdateOneModel{
@@ -616,11 +618,12 @@ func TestClientBulkWrite(t *testing.T) {
616618

617619
mt.Run("bulkWrite handles a cursor requiring a getMore within a transaction", func(mt *mtest.T) {
618620
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
619-
coll.Drop(context.Background())
621+
err := coll.Drop(context.Background())
622+
require.NoError(mt, err, "Drop error")
620623

621624
var getMoreCalled bool
622625
monitor := &event.CommandMonitor{
623-
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
626+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
624627
if e.CommandName == "getMore" {
625628
getMoreCalled = true
626629
}
@@ -630,7 +633,8 @@ func TestClientBulkWrite(t *testing.T) {
630633
var hello struct {
631634
MaxBsonObjectSize int
632635
}
633-
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
636+
err = mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello)
637+
require.NoError(mt, err, "Hello error")
634638
session, err := mt.Client.StartSession()
635639
require.NoError(mt, err, "StartSession error")
636640
defer session.EndSession(context.Background())
@@ -648,7 +652,7 @@ func TestClientBulkWrite(t *testing.T) {
648652
Update: bson.D{{"$set", bson.D{{"x", 1}}}},
649653
Upsert: &upsert,
650654
})
651-
result, err := session.WithTransaction(context.TODO(), func(ctx mongo.SessionContext) (interface{}, error) {
655+
result, err := session.WithTransaction(context.Background(), func(mongo.SessionContext) (interface{}, error) {
652656
return mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetVerboseResults(true))
653657
})
654658
require.NoError(mt, err, "BulkWrite error")
@@ -659,7 +663,7 @@ func TestClientBulkWrite(t *testing.T) {
659663
assert.True(mt, getMoreCalled, "the getMore was not called")
660664
})
661665

662-
mt.Run("bulkWrite handles a getMore error", func(mt *mtest.T) {
666+
mt.Run("bulkWrite handles a getMore error", func(_ *mtest.T) {
663667
})
664668

665669
mt.Run("bulkWrite returns error for unacknowledged too-large insert", func(mt *mtest.T) {

mongo/integration/unified/client_operation_execution.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
240240

241241
resBuilder = bsoncore.NewDocumentBuilder()
242242
for k, v := range res.DeleteResults {
243-
resBuilder.AppendDocument(strconv.Itoa(int(k)),
243+
resBuilder.AppendDocument(strconv.Itoa(k),
244244
bsoncore.NewDocumentBuilder().
245245
AppendInt64("deletedCount", v.DeletedCount).
246246
Build(),
@@ -254,7 +254,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
254254
if err != nil {
255255
return nil, err
256256
}
257-
resBuilder.AppendDocument(strconv.Itoa(int(k)),
257+
resBuilder.AppendDocument(strconv.Itoa(k),
258258
bsoncore.NewDocumentBuilder().
259259
AppendValue("insertedId", bsoncore.Value{Type: t, Data: d}).
260260
Build(),
@@ -274,7 +274,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
274274
}
275275
b.AppendValue("upsertedId", bsoncore.Value{Type: t, Data: d})
276276
}
277-
resBuilder.AppendDocument(strconv.Itoa(int(k)), b.Build())
277+
resBuilder.AppendDocument(strconv.Itoa(k), b.Build())
278278
}
279279
rawBuilder.AppendDocument("updateResults", resBuilder.Build())
280280

x/mongo/driver/batch_cursor.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,7 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
399399
},
400400
Database: bc.database,
401401
Deployment: bc.getOperationDeployment(),
402-
ProcessResponseFn: func(_ context.Context, response bsoncore.Document, info ResponseInfo) error {
403-
// response := info.ServerResponse
402+
ProcessResponseFn: func(_ context.Context, response bsoncore.Document, _ ResponseInfo) error {
404403
id, ok := response.Lookup("cursor", "id").Int64OK()
405404
if !ok {
406405
return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)

x/mongo/driver/batches.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ type Batches struct {
2424
offset int
2525
}
2626

27-
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
28-
if b.End() {
27+
// AppendBatchSequence appends dst with document sequence of batches as long as the limits of max count, max
28+
// document size, or total size allows. It returns the number of batches appended, the new appended slice, and
29+
// any error raised. It returns the origenal input slice if nothing can be appends within the limits.
30+
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) {
31+
if b.Size() == 0 {
2932
return 0, dst, io.EOF
3033
}
3134
l := len(dst)
@@ -34,7 +37,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSiz
3437
idx, dst = bsoncore.ReserveLength(dst)
3538
dst = append(dst, b.Identifier...)
3639
dst = append(dst, 0x00)
37-
size := len(dst) - l
40+
var size int
3841
var n int
3942
for i := b.offset; i < len(b.Documents); i++ {
4043
if n == maxCount {
@@ -45,7 +48,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSiz
4548
break
4649
}
4750
size += len(doc)
48-
if size >= totalSize {
51+
if size > maxDocSize {
4952
break
5053
}
5154
dst = append(dst, doc...)
@@ -58,13 +61,16 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSiz
5861
return n, dst, nil
5962
}
6063

61-
func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
62-
if b.End() {
64+
// AppendBatchArray appends dst with array of batches as long as the limits of max count, max document size, or
65+
// total size allows. It returns the number of batches appended, the new appended slice, and any error raised. It
66+
// returns the origenal input slice if nothing can be appends within the limits.
67+
func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) {
68+
if b.Size() == 0 {
6369
return 0, dst, io.EOF
6470
}
6571
l := len(dst)
6672
aidx, dst := bsoncore.AppendArrayElementStart(dst, b.Identifier)
67-
size := len(dst) - l
73+
var size int
6874
var n int
6975
for i := b.offset; i < len(b.Documents); i++ {
7076
if n == maxCount {
@@ -75,7 +81,7 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize i
7581
break
7682
}
7783
size += len(doc)
78-
if size >= totalSize {
84+
if size > maxDocSize {
7985
break
8086
}
8187
dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc)
@@ -92,14 +98,23 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize i
9298
return n, dst, nil
9399
}
94100

101+
// IsOrdered indicates if the batches are ordered.
95102
func (b *Batches) IsOrdered() *bool {
96103
return b.Ordered
97104
}
98105

106+
// AdvanceBatches advances the batches with the given input.
99107
func (b *Batches) AdvanceBatches(n int) {
100108
b.offset += n
109+
if b.offset > len(b.Documents) {
110+
b.offset = len(b.Documents)
111+
}
101112
}
102113

103-
func (b *Batches) End() bool {
104-
return len(b.Documents) <= b.offset
114+
// Size returns the size of batches remained.
115+
func (b *Batches) Size() int {
116+
if b.offset > len(b.Documents) {
117+
return 0
118+
}
119+
return len(b.Documents) - b.offset
105120
}

0 commit comments

Comments
 (0)