Skip to content

Commit ceab570

Browse files
committed
WIP
1 parent 982b622 commit ceab570

34 files changed

+704
-433
lines changed

mongo/client_bulk_write.go

Lines changed: 242 additions & 194 deletions
Large diffs are not rendered by default.

mongo/client_bulk_write_models.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) MongoDB, Inc. 2017-present.
1+
// Copyright (C) MongoDB, Inc. 2024-present.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License"); you may
44
// not use this file except in compliance with the License. You may obtain

mongo/errors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,10 +614,10 @@ type ClientBulkWriteException struct {
614614
TopLevelError *WriteError
615615

616616
// The write concern errors that occurred.
617-
WriteConcernErrors []*WriteConcernError
617+
WriteConcernErrors []WriteConcernError
618618

619619
// The write errors that occurred during individual operation execution.
620-
WriteErrors map[int64]WriteError
620+
WriteErrors map[int]WriteError
621621

622622
PartialResult *ClientBulkWriteResult
623623
}

mongo/integration/crud_prose_test.go

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@ import (
1010
"bytes"
1111
"context"
1212
"errors"
13+
"strings"
1314
"testing"
1415

1516
"go.mongodb.org/mongo-driver/bson"
17+
"go.mongodb.org/mongo-driver/event"
1618
"go.mongodb.org/mongo-driver/internal/assert"
19+
"go.mongodb.org/mongo-driver/internal/require"
1720
"go.mongodb.org/mongo-driver/mongo"
1821
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
1922
"go.mongodb.org/mongo-driver/mongo/options"
23+
"go.mongodb.org/mongo-driver/mongo/writeconcern"
2024
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2125
)
2226

@@ -408,3 +412,270 @@ func TestErrorsCodeNamePropagated(t *testing.T) {
408412
assert.Equal(mt, expectedCodeName, wce.Name, "expected code name %q, got %q", expectedCodeName, wce.Name)
409413
})
410414
}
415+
416+
func TestClientBulkWrite(t *testing.T) {
417+
mtOpts := mtest.NewOptions().MinServerVersion("8.0").AtlasDataLake(false).CreateClient(false)
418+
mt := mtest.New(t, mtOpts)
419+
420+
mt.Run("input with greater than maxWriteBatchSize", func(mt *mtest.T) {
421+
var opsCnt []int
422+
monitor := &event.CommandMonitor{
423+
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
424+
if e.CommandName == "bulkWrite" {
425+
v := e.Command.Lookup("ops")
426+
elems, err := v.Array().Elements()
427+
require.NoError(mt, err, "monitor error")
428+
opsCnt = append(opsCnt, len(elems))
429+
}
430+
},
431+
}
432+
mt.ResetClient(options.Client().SetMonitor(monitor))
433+
var hello struct {
434+
MaxWriteBatchSize int
435+
}
436+
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
437+
models := &mongo.ClientWriteModels{}
438+
for i := 0; i < hello.MaxWriteBatchSize+1; i++ {
439+
models.
440+
AppendInsertOne(&mongo.ClientInsertOneModel{
441+
Namespace: "db.coll",
442+
Document: bson.D{{"a", "b"}},
443+
})
444+
}
445+
result, err := mt.Client.BulkWrite(context.Background(), models)
446+
require.NoError(mt, err, "BulkWrite error")
447+
assert.Equal(mt, hello.MaxWriteBatchSize+1, int(result.InsertedCount), "InsertedCount expected to be %d", hello.MaxWriteBatchSize+1)
448+
require.Len(mt, opsCnt, 2, "expected 2 bulkWrite commands")
449+
assert.Equal(mt, hello.MaxWriteBatchSize, opsCnt[0], "the length of firstEvent.command.ops is %d", hello.MaxWriteBatchSize)
450+
assert.Equal(mt, 1, opsCnt[1], "the length of secondEvent.command.ops is 1")
451+
})
452+
453+
mt.Run("input with greater than maxMessageSizeBytes", func(mt *mtest.T) {
454+
var opsCnt []int
455+
monitor := &event.CommandMonitor{
456+
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
457+
if e.CommandName == "bulkWrite" {
458+
v := e.Command.Lookup("ops")
459+
elems, err := v.Array().Elements()
460+
require.NoError(mt, err, "monitor error")
461+
opsCnt = append(opsCnt, len(elems))
462+
}
463+
},
464+
}
465+
mt.ResetClient(options.Client().SetMonitor(monitor))
466+
var hello struct {
467+
MaxBsonObjectSize int
468+
MaxMessageSizeBytes int
469+
}
470+
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
471+
models := &mongo.ClientWriteModels{}
472+
numModels := hello.MaxMessageSizeBytes/hello.MaxBsonObjectSize + 1
473+
for i := 0; i < numModels; i++ {
474+
models.
475+
AppendInsertOne(&mongo.ClientInsertOneModel{
476+
Namespace: "db.coll",
477+
Document: bson.D{{"a", strings.Repeat("b", hello.MaxBsonObjectSize-500)}},
478+
})
479+
}
480+
result, err := mt.Client.BulkWrite(context.Background(), models)
481+
require.NoError(mt, err, "BulkWrite error")
482+
assert.Equal(mt, numModels, int(result.InsertedCount), "InsertedCount expected to be %d", numModels)
483+
require.Len(mt, opsCnt, 2, "expected 2 bulkWrite commands")
484+
assert.Equal(mt, numModels-1, opsCnt[0], "the length of firstEvent.command.ops is %d", numModels-1)
485+
assert.Equal(mt, 1, opsCnt[1], "the length of secondEvent.command.ops is 1")
486+
})
487+
488+
mt.Run("bulkWrite collects WriteConcernErrors across batches", func(mt *mtest.T) {
489+
mt.ResetClient(options.Client().SetRetryWrites(false))
490+
var hello struct {
491+
MaxWriteBatchSize int
492+
}
493+
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
494+
495+
mt.SetFailPoint(mtest.FailPoint{
496+
ConfigureFailPoint: "failCommand",
497+
Mode: mtest.FailPointMode{
498+
Times: 2,
499+
},
500+
Data: mtest.FailPointData{
501+
FailCommands: []string{"bulkWrite"},
502+
WriteConcernError: &mtest.WriteConcernErrorData{
503+
Code: 91,
504+
Errmsg: "Replication is being shut down",
505+
},
506+
},
507+
})
508+
509+
models := &mongo.ClientWriteModels{}
510+
for i := 0; i < hello.MaxWriteBatchSize+1; i++ {
511+
models.
512+
AppendInsertOne(&mongo.ClientInsertOneModel{
513+
Namespace: "db.coll",
514+
Document: bson.D{{"a", "b"}},
515+
})
516+
}
517+
_, err := mt.Client.BulkWrite(context.Background(), models)
518+
require.Error(mt, err)
519+
bwe, ok := err.(mongo.ClientBulkWriteException)
520+
require.True(mt, ok, "expected a BulkWriteException, got %T", err)
521+
assert.Len(mt, bwe.WriteConcernErrors, 2, "expected 2 writeConcernErrors")
522+
require.NotNil(mt, bwe.PartialResult)
523+
assert.Equal(mt, hello.MaxWriteBatchSize+1, int(bwe.PartialResult.InsertedCount),
524+
"InsertedCount expected to be %d", hello.MaxWriteBatchSize+1)
525+
})
526+
527+
mt.Run("bulkWrite handles individual WriteErrors across batches", func(mt *mtest.T) {
528+
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
529+
err := coll.Drop(context.Background())
530+
require.NoError(mt, err, "Drop error")
531+
_, err = coll.InsertOne(context.Background(), bson.D{{"_id", 1}})
532+
require.NoError(mt, err, "InsertOne error")
533+
534+
var eventCnt int
535+
monitor := &event.CommandMonitor{
536+
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
537+
if e.CommandName == "bulkWrite" {
538+
eventCnt++
539+
}
540+
},
541+
}
542+
543+
var hello struct {
544+
MaxWriteBatchSize int
545+
}
546+
err = mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello)
547+
require.NoError(mt, err, "Hello error")
548+
models := &mongo.ClientWriteModels{}
549+
for i := 0; i < hello.MaxWriteBatchSize+1; i++ {
550+
models.
551+
AppendInsertOne(&mongo.ClientInsertOneModel{
552+
Namespace: "db.coll",
553+
Document: bson.D{{"_id", 1}},
554+
})
555+
}
556+
557+
mt.Run("Unordered", func(mt *mtest.T) {
558+
eventCnt = 0
559+
mt.ResetClient(options.Client().SetMonitor(monitor))
560+
_, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetOrdered(false))
561+
require.Error(mt, err)
562+
bwe, ok := err.(mongo.ClientBulkWriteException)
563+
require.True(mt, ok, "expected a BulkWriteException, got %T", err)
564+
assert.Len(mt, bwe.WriteErrors, hello.MaxWriteBatchSize+1, "expected %d writeErrors, got %d", hello.MaxWriteBatchSize+1, len(bwe.WriteErrors))
565+
require.Equal(mt, 2, eventCnt, "expected 2 bulkWrite commands, got %d", eventCnt)
566+
})
567+
mt.Run("Ordered", func(mt *mtest.T) {
568+
eventCnt = 0
569+
mt.ResetClient(options.Client().SetMonitor(monitor))
570+
_, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetOrdered(true))
571+
require.Error(mt, err)
572+
bwe, ok := err.(mongo.ClientBulkWriteException)
573+
require.True(mt, ok, "expected a BulkWriteException, got %T", err)
574+
assert.Len(mt, bwe.WriteErrors, 1, "expected %d writeErrors, got %d", 1, len(bwe.WriteErrors))
575+
require.Equal(mt, 1, eventCnt, "expected 1 bulkWrite commands, got %d", eventCnt)
576+
})
577+
})
578+
579+
mt.Run("bulkWrite handles a cursor requiring a getMore", func(mt *mtest.T) {
580+
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
581+
coll.Drop(context.Background())
582+
583+
var getMoreCalled bool
584+
monitor := &event.CommandMonitor{
585+
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
586+
if e.CommandName == "getMore" {
587+
getMoreCalled = true
588+
}
589+
},
590+
}
591+
mt.ResetClient(options.Client().SetMonitor(monitor))
592+
var hello struct {
593+
MaxBsonObjectSize int
594+
}
595+
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
596+
upsert := true
597+
models := (&mongo.ClientWriteModels{}).
598+
AppendUpdateOne(&mongo.ClientUpdateOneModel{
599+
Namespace: "db.coll",
600+
Filter: bson.D{{"_id", strings.Repeat("a", hello.MaxBsonObjectSize/2)}},
601+
Update: bson.D{{"$set", bson.D{{"x", 1}}}},
602+
Upsert: &upsert,
603+
}).
604+
AppendUpdateOne(&mongo.ClientUpdateOneModel{
605+
Namespace: "db.coll",
606+
Filter: bson.D{{"_id", strings.Repeat("b", hello.MaxBsonObjectSize/2)}},
607+
Update: bson.D{{"$set", bson.D{{"x", 1}}}},
608+
Upsert: &upsert,
609+
})
610+
result, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetVerboseResults(true))
611+
require.NoError(mt, err, "BulkWrite error")
612+
assert.Equal(mt, 2, int(result.UpsertedCount), "InsertedCount expected to be %d, got %d", 2, result.UpsertedCount)
613+
assert.Len(mt, result.UpdateResults, 2, "expected %d UpdateResults, got %d", 2, len(result.UpdateResults))
614+
assert.True(mt, getMoreCalled, "the getMore was not called")
615+
})
616+
617+
mt.Run("bulkWrite handles a cursor requiring a getMore within a transaction", func(mt *mtest.T) {
618+
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
619+
coll.Drop(context.Background())
620+
621+
var getMoreCalled bool
622+
monitor := &event.CommandMonitor{
623+
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
624+
if e.CommandName == "getMore" {
625+
getMoreCalled = true
626+
}
627+
},
628+
}
629+
mt.ResetClient(options.Client().SetMonitor(monitor))
630+
var hello struct {
631+
MaxBsonObjectSize int
632+
}
633+
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
634+
session, err := mt.Client.StartSession()
635+
require.NoError(mt, err, "StartSession error")
636+
defer session.EndSession(context.Background())
637+
upsert := true
638+
models := (&mongo.ClientWriteModels{}).
639+
AppendUpdateOne(&mongo.ClientUpdateOneModel{
640+
Namespace: "db.coll",
641+
Filter: bson.D{{"_id", strings.Repeat("a", hello.MaxBsonObjectSize/2)}},
642+
Update: bson.D{{"$set", bson.D{{"x", 1}}}},
643+
Upsert: &upsert,
644+
}).
645+
AppendUpdateOne(&mongo.ClientUpdateOneModel{
646+
Namespace: "db.coll",
647+
Filter: bson.D{{"_id", strings.Repeat("b", hello.MaxBsonObjectSize/2)}},
648+
Update: bson.D{{"$set", bson.D{{"x", 1}}}},
649+
Upsert: &upsert,
650+
})
651+
result, err := session.WithTransaction(context.TODO(), func(ctx mongo.SessionContext) (interface{}, error) {
652+
return mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetVerboseResults(true))
653+
})
654+
require.NoError(mt, err, "BulkWrite error")
655+
cbwResult, ok := result.(*mongo.ClientBulkWriteResult)
656+
require.True(mt, ok, "expected a ClientBulkWriteResult")
657+
assert.Equal(mt, 2, int(cbwResult.UpsertedCount), "InsertedCount expected to be %d, got %d", 2, cbwResult.UpsertedCount)
658+
assert.Len(mt, cbwResult.UpdateResults, 2, "expected %d UpdateResults, got %d", 2, len(cbwResult.UpdateResults))
659+
assert.True(mt, getMoreCalled, "the getMore was not called")
660+
})
661+
662+
mt.Run("bulkWrite handles a getMore error", func(mt *mtest.T) {
663+
})
664+
665+
mt.Run("bulkWrite returns error for unacknowledged too-large insert", func(mt *mtest.T) {
666+
mt.ResetClient(options.Client())
667+
var hello struct {
668+
MaxBsonObjectSize int
669+
}
670+
require.NoError(mt, mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello), "Hello error")
671+
models := (&mongo.ClientWriteModels{}).
672+
AppendInsertOne(&mongo.ClientInsertOneModel{
673+
Namespace: "db.coll",
674+
Document: bson.D{{"a", strings.Repeat("b", hello.MaxBsonObjectSize)}},
675+
})
676+
result, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetWriteConcern(writeconcern.Unacknowledged()))
677+
require.NoError(mt, err, "BulkWrite error")
678+
assert.Equal(mt, 2, int(result.UpsertedCount), "InsertedCount expected to be %d, got %d", 2, result.UpsertedCount)
679+
assert.Len(mt, result.UpdateResults, 2, "expected %d UpdateResults, got %d", 2, len(result.UpdateResults))
680+
})
681+
}

mongo/options/clientbulkwriteoptions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) MongoDB, Inc. 2017-present.
1+
// Copyright (C) MongoDB, Inc. 2024-present.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License"); you may
44
// not use this file except in compliance with the License. You may obtain

mongo/results.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ type ClientBulkWriteResult struct {
3232
UpsertedCount int64
3333

3434
// A map of operation index to the _id of each inserted document.
35-
InsertResults map[int64]ClientInsertResult
35+
InsertResults map[int]ClientInsertResult
3636

3737
// A map of operation index to the _id of each updated document.
38-
UpdateResults map[int64]ClientUpdateResult
38+
UpdateResults map[int]ClientUpdateResult
3939

4040
// A map of operation index to the _id of each deleted document.
41-
DeleteResults map[int64]ClientDeleteResult
41+
DeleteResults map[int]ClientDeleteResult
4242
}
4343

4444
// ClientInsertResult is the result type returned by a client-level bulk write of InsertOne operation.

x/mongo/driver/batch_cursor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
393393
},
394394
Database: bc.database,
395395
Deployment: bc.getOperationDeployment(),
396-
ProcessResponseFn: func(info ResponseInfo) error {
396+
ProcessResponseFn: func(_ context.Context, info ResponseInfo) error {
397397
response := info.ServerResponse
398398
id, ok := response.Lookup("cursor", "id").Int64OK()
399399
if !ok {

x/mongo/driver/batches.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ type Batches struct {
2525
Ordered *bool
2626
}
2727

28-
// Valid returns true if Batches contains both an identifier and the length of Documents is greater
29-
// than zero.
30-
func (b *Batches) Valid() bool { return b != nil && b.Identifier != "" && len(b.Documents) > 0 }
31-
3228
// ClearBatch clears the Current batch. This must be called before AdvanceBatch will advance to the
3329
// next batch.
3430
func (b *Batches) ClearBatch() { b.Current = b.Current[:0] }

x/mongo/driver/batches_test.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,6 @@ import (
1515
)
1616

1717
func TestBatches(t *testing.T) {
18-
t.Run("Valid", func(t *testing.T) {
19-
testCases := []struct {
20-
name string
21-
batches *Batches
22-
want bool
23-
}{
24-
{"nil", nil, false},
25-
{"missing identifier", &Batches{}, false},
26-
{"no documents", &Batches{Identifier: "documents"}, false},
27-
{"valid", &Batches{Identifier: "documents", Documents: make([]bsoncore.Document, 5)}, true},
28-
}
29-
30-
for _, tc := range testCases {
31-
t.Run(tc.name, func(t *testing.T) {
32-
want := tc.want
33-
got := tc.batches.Valid()
34-
if got != want {
35-
t.Errorf("Did not get expected result from Valid. got %t; want %t", got, want)
36-
}
37-
})
38-
}
39-
})
4018
t.Run("ClearBatch", func(t *testing.T) {
4119
batches := &Batches{Identifier: "documents", Current: make([]bsoncore.Document, 2, 10)}
4220
if len(batches.Current) != 2 {

0 commit comments

Comments
 (0)