Skip to content

Commit 50cf0c2

Browse files
authored
GODRIVER-2388 Improved Bulk Write API. (#1884)
1 parent 0b2f755 commit 50cf0c2

File tree

87 files changed

+16178
-2352
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+16178
-2352
lines changed

internal/driverutil/operation.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ const (
2828
ListIndexesOp = "listIndexes" // ListIndexesOp is the name for listing indexes
2929
ListDatabasesOp = "listDatabases" // ListDatabasesOp is the name for listing databases
3030
UpdateOp = "update" // UpdateOp is the name for updating
31+
BulkWriteOp = "bulkWrite" // BulkWriteOp is the name for client-level bulk write
3132
)

internal/integration/client_side_encryption_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,8 @@ func TestClientSideEncryptionCustomCrypt(t *testing.T) {
394394
"expected 0 calls to DecryptExplicit, got %v", cc.numDecryptExplicitCalls)
395395
assert.Equal(mt, cc.numCloseCalls, 0,
396396
"expected 0 calls to Close, got %v", cc.numCloseCalls)
397-
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 2,
398-
"expected 2 calls to BypassAutoEncryption, got %v", cc.numBypassAutoEncryptionCalls)
397+
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 1,
398+
"expected 1 call to BypassAutoEncryption, got %v", cc.numBypassAutoEncryptionCalls)
399399
})
400400
}
401401

internal/integration/client_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.mongodb.org/mongo-driver/v2/mongo"
2929
"go.mongodb.org/mongo-driver/v2/mongo/options"
3030
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
31+
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
3132
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
3233
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
3334
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage"
@@ -720,6 +721,128 @@ func TestClient(t *testing.T) {
720721
})
721722
}
722723

724+
func TestClient_BulkWrite(t *testing.T) {
725+
mt := mtest.New(t, noClientOpts)
726+
727+
mtBulkWriteOpts := mtest.NewOptions().MinServerVersion("8.0").AtlasDataLake(false).ClientType(mtest.Pinned)
728+
mt.RunOpts("bulk write with nil filter", mtBulkWriteOpts, func(mt *mtest.T) {
729+
mt.Parallel()
730+
731+
testCases := []struct {
732+
name string
733+
writes []mongo.ClientBulkWrite
734+
errorString string
735+
}{
736+
{
737+
name: "DeleteOne",
738+
writes: []mongo.ClientBulkWrite{{
739+
Database: "foo",
740+
Collection: "bar",
741+
Model: mongo.NewClientDeleteOneModel(),
742+
}},
743+
errorString: "delete filter cannot be nil",
744+
},
745+
{
746+
name: "DeleteMany",
747+
writes: []mongo.ClientBulkWrite{{
748+
Database: "foo",
749+
Collection: "bar",
750+
Model: mongo.NewClientDeleteManyModel(),
751+
}},
752+
errorString: "delete filter cannot be nil",
753+
},
754+
{
755+
name: "UpdateOne",
756+
writes: []mongo.ClientBulkWrite{{
757+
Database: "foo",
758+
Collection: "bar",
759+
Model: mongo.NewClientUpdateOneModel(),
760+
}},
761+
errorString: "update filter cannot be nil",
762+
},
763+
{
764+
name: "UpdateMany",
765+
writes: []mongo.ClientBulkWrite{{
766+
Database: "foo",
767+
Collection: "bar",
768+
Model: mongo.NewClientUpdateManyModel(),
769+
}},
770+
errorString: "update filter cannot be nil",
771+
},
772+
}
773+
for _, tc := range testCases {
774+
tc := tc
775+
776+
mt.Run(tc.name, func(mt *mtest.T) {
777+
mt.Parallel()
778+
779+
_, err := mt.Client.BulkWrite(context.Background(), tc.writes)
780+
require.EqualError(mt, err, tc.errorString)
781+
})
782+
}
783+
})
784+
mt.RunOpts("bulk write with write concern", mtBulkWriteOpts, func(mt *mtest.T) {
785+
mt.Parallel()
786+
787+
testCases := []struct {
788+
name string
789+
opts *options.ClientBulkWriteOptionsBuilder
790+
want bool
791+
}{
792+
{
793+
name: "unacknowledged",
794+
opts: options.ClientBulkWrite().SetWriteConcern(writeconcern.Unacknowledged()).SetOrdered(false),
795+
want: false,
796+
},
797+
{
798+
name: "acknowledged",
799+
want: true,
800+
},
801+
}
802+
for _, tc := range testCases {
803+
tc := tc
804+
805+
mt.Run(tc.name, func(mt *mtest.T) {
806+
mt.Parallel()
807+
808+
insertOneModel := mongo.NewClientInsertOneModel().SetDocument(bson.D{{"x", 1}})
809+
writes := []mongo.ClientBulkWrite{{
810+
Database: "foo",
811+
Collection: "bar",
812+
Model: insertOneModel,
813+
}}
814+
res, err := mt.Client.BulkWrite(context.Background(), writes, tc.opts)
815+
require.NoError(mt, err, "BulkWrite error: %v", err)
816+
require.NotNil(mt, res, "expected a ClientBulkWriteResult")
817+
assert.Equal(mt, res.Acknowledged, tc.want, "expected Acknowledged: %v, got: %v", tc.want, res.Acknowledged)
818+
})
819+
}
820+
})
821+
var bulkWrites int
822+
cmdMonitor := &event.CommandMonitor{
823+
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
824+
if evt.CommandName == "bulkWrite" {
825+
bulkWrites++
826+
}
827+
},
828+
}
829+
clientOpts := options.Client().SetMonitor(cmdMonitor)
830+
mt.RunOpts("bulk write with large messages", mtBulkWriteOpts.ClientOptions(clientOpts), func(mt *mtest.T) {
831+
mt.Parallel()
832+
833+
document := bson.D{{"largeField", strings.Repeat("a", 16777216-100)}} // Adjust size to account for BSON overhead
834+
writes := []mongo.ClientBulkWrite{
835+
{"db", "x", mongo.NewClientInsertOneModel().SetDocument(document)},
836+
{"db", "x", mongo.NewClientInsertOneModel().SetDocument(document)},
837+
{"db", "x", mongo.NewClientInsertOneModel().SetDocument(document)},
838+
}
839+
840+
_, err := mt.Client.BulkWrite(context.Background(), writes)
841+
require.NoError(t, err)
842+
assert.Equal(t, 2, bulkWrites, "expected %d bulkWrites, got %d", 2, bulkWrites)
843+
})
844+
}
845+
723846
func TestClient_BSONOptions(t *testing.T) {
724847
t.Parallel()
725848

internal/integration/collection_test.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestCollection(t *testing.T) {
167167
mt.Run("large document batches", func(mt *mtest.T) {
168168
mt.Parallel()
169169

170-
docs := []interface{}{create16MBDocument(mt), create16MBDocument(mt)}
170+
docs := []interface{}{create16MBDocument(mt), create16MBDocument(mt), create16MBDocument(mt)}
171171
_, err := mt.Coll.InsertMany(context.Background(), docs)
172172
assert.Nil(mt, err, "InsertMany error: %v", err)
173173
evt := mt.GetStartedEvent()
@@ -1715,6 +1715,46 @@ func TestCollection(t *testing.T) {
17151715
})
17161716
}
17171717
})
1718+
mt.Run("error on nil filter", func(mt *mtest.T) {
1719+
mt.Parallel()
1720+
1721+
testCases := []struct {
1722+
name string
1723+
model mongo.WriteModel
1724+
errorString string
1725+
}{
1726+
{
1727+
name: "DeleteOne",
1728+
model: mongo.NewDeleteOneModel(),
1729+
errorString: "delete filter cannot be nil",
1730+
},
1731+
{
1732+
name: "DeleteMany",
1733+
model: mongo.NewDeleteManyModel(),
1734+
errorString: "delete filter cannot be nil",
1735+
},
1736+
{
1737+
name: "UpdateOne",
1738+
model: mongo.NewUpdateOneModel().SetUpdate(bson.D{{"$set", bson.D{{"x", 1}}}}),
1739+
errorString: "update filter cannot be nil",
1740+
},
1741+
{
1742+
name: "UpdateMany",
1743+
model: mongo.NewUpdateManyModel().SetUpdate(bson.D{{"$set", bson.D{{"x", 1}}}}),
1744+
errorString: "update filter cannot be nil",
1745+
},
1746+
}
1747+
for _, tc := range testCases {
1748+
tc := tc
1749+
1750+
mt.Run(tc.name, func(mt *mtest.T) {
1751+
mt.Parallel()
1752+
1753+
_, err := mt.Coll.BulkWrite(context.Background(), []mongo.WriteModel{tc.model})
1754+
assert.EqualError(mt, err, tc.errorString)
1755+
})
1756+
}
1757+
})
17181758
mt.Run("correct model in errors", func(mt *mtest.T) {
17191759
models := []mongo.WriteModel{
17201760
mongo.NewUpdateOneModel().SetFilter(bson.M{}).SetUpdate(bson.M{

0 commit comments

Comments
 (0)