Skip to content

Commit f1b81d6

Browse files
committed
cleanup
1 parent b287ccf commit f1b81d6

File tree

7 files changed

+90
-46
lines changed

7 files changed

+90
-46
lines changed

internal/driverutil/operation.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ const (
2828
ListIndexesOp = "listIndexes" // ListIndexesOp is the name for listing indexes
2929
ListDatabasesOp = "listDatabases" // ListDatabasesOp is the name for listing databases
3030
UpdateOp = "update" // UpdateOp is the name for updating
31+
BulkWriteOp = "bulkWrite" // BulkWriteOp is the name for client-level bulk write
3132
)

mongo/client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,9 +851,10 @@ func (c *Client) createBaseCursorOptions() driver.CursorOptions {
851851
}
852852
}
853853

854-
// BulkWrite performs a client-levelbulk write operation.
854+
// BulkWrite performs a client-level bulk write operation.
855855
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
856856
opts ...*options.ClientBulkWriteOptions) (*ClientBulkWriteResult, error) {
857+
// TODO: Remove once DRIVERS-2888 is implemented.
857858
if c.isAutoEncryptionSet {
858859
return nil, errors.New("bulkWrite does not currently support automatic encryption")
859860
}
@@ -886,6 +887,9 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
886887
wc = bwo.WriteConcern
887888
}
888889
if !writeconcern.AckWrite(wc) {
890+
if bwo.Ordered == nil || *bwo.Ordered {
891+
return nil, errors.New("cannot request unacknowledged write concern and ordered writes")
892+
}
889893
sess = nil
890894
}
891895

@@ -908,6 +912,8 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
908912
}
909913
if bwo.VerboseResults == nil || !(*bwo.VerboseResults) {
910914
op.errorsOnly = true
915+
} else if !writeconcern.AckWrite(wc) {
916+
return nil, errors.New("cannot request unacknowledged write concern and verbose results")
911917
}
912918
if err = op.execute(ctx); err != nil {
913919
return nil, replaceErrors(err)

mongo/client_bulk_write.go

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.mongodb.org/mongo-driver/bson"
1616
"go.mongodb.org/mongo-driver/bson/bsoncodec"
1717
"go.mongodb.org/mongo-driver/bson/primitive"
18+
"go.mongodb.org/mongo-driver/internal/driverutil"
1819
"go.mongodb.org/mongo-driver/mongo/description"
1920
"go.mongodb.org/mongo-driver/mongo/options"
2021
"go.mongodb.org/mongo-driver/mongo/writeconcern"
@@ -24,6 +25,10 @@ import (
2425
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
2526
)
2627

28+
const (
29+
database = "admin"
30+
)
31+
2732
// bulkWrite performs a bulkwrite operation
2833
type clientBulkWrite struct {
2934
models []clientWriteModel
@@ -42,12 +47,17 @@ type clientBulkWrite struct {
4247

4348
func (bw *clientBulkWrite) execute(ctx context.Context) error {
4449
if len(bw.models) == 0 {
45-
return errors.New("empty write models")
50+
return ErrEmptySlice
51+
}
52+
for _, m := range bw.models {
53+
if m.model == nil {
54+
return ErrNilDocument
55+
}
4656
}
4757
batches := &modelBatches{
4858
session: bw.session,
4959
client: bw.client,
50-
ordered: bw.ordered,
60+
ordered: bw.ordered == nil || *bw.ordered,
5161
models: bw.models,
5262
result: &bw.result,
5363
retryMode: driver.RetryOnce,
@@ -61,7 +71,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
6171
Type: driver.Write,
6272
Batches: batches,
6373
CommandMonitor: bw.client.monitor,
64-
Database: "admin",
74+
Database: database,
6575
Deployment: bw.client.deployment,
6676
Selector: bw.selector,
6777
WriteConcern: bw.writeConcern,
@@ -70,7 +80,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
7080
Timeout: bw.client.timeout,
7181
Logger: bw.client.logger,
7282
Authenticator: bw.client.authenticator,
73-
Name: "bulkWrite",
83+
Name: driverutil.BulkWriteOp,
7484
}.Execute(ctx)
7585
var exception *ClientBulkWriteException
7686
switch tt := err.(type) {
@@ -96,7 +106,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
96106
}
97107
if exception != nil {
98108
var hasSuccess bool
99-
if bw.ordered == nil || *bw.ordered {
109+
if batches.ordered {
100110
_, ok := batches.writeErrors[0]
101111
hasSuccess = !ok
102112
} else {
@@ -125,9 +135,7 @@ func (bw *clientBulkWrite) newCommand() func([]byte, description.SelectedServer)
125135
}
126136
dst = bsoncore.AppendValueElement(dst, "comment", comment)
127137
}
128-
if bw.ordered != nil {
129-
dst = bsoncore.AppendBooleanElement(dst, "ordered", *bw.ordered)
130-
}
138+
dst = bsoncore.AppendBooleanElement(dst, "ordered", bw.ordered == nil || *bw.ordered)
131139
if bw.let != nil {
132140
let, err := marshal(bw.let, bw.client.bsonOpts, bw.client.registry)
133141
if err != nil {
@@ -173,7 +181,7 @@ type modelBatches struct {
173181
session *session.Client
174182
client *Client
175183

176-
ordered *bool
184+
ordered bool
177185
models []clientWriteModel
178186

179187
offset int
@@ -188,7 +196,7 @@ type modelBatches struct {
188196
}
189197

190198
func (mb *modelBatches) IsOrdered() *bool {
191-
return mb.ordered
199+
return &mb.ordered
192200
}
193201

194202
func (mb *modelBatches) AdvanceBatches(n int) {
@@ -265,14 +273,15 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
265273
}
266274

267275
canRetry := true
276+
checkSize := true
268277

269278
l := len(dst)
270279

271280
opsIdx, dst := fn.appendStart(dst, "ops")
272281
nsIdx, nsDst := fn.appendStart(nil, "nsInfo")
273282

274283
totalSize -= 1000
275-
size := (len(dst) - l) * 2
284+
size := len(dst) + len(nsDst)
276285
var n int
277286
for i := mb.offset; i < len(mb.models); i++ {
278287
if n == maxCount {
@@ -286,11 +295,13 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
286295
var err error
287296
switch model := mb.models[i].model.(type) {
288297
case *ClientInsertOneModel:
298+
checkSize = false
289299
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendInsertResult)
290300
var id interface{}
291301
id, doc, err = (&clientInsertDoc{
292302
namespace: nsIdx,
293303
document: model.Document,
304+
sizeLimit: maxDocSize,
294305
}).marshal(mb.client.bsonOpts, mb.client.registry)
295306
if err != nil {
296307
break
@@ -324,6 +335,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
324335
checkDollarKey: true,
325336
}).marshal(mb.client.bsonOpts, mb.client.registry)
326337
case *ClientReplaceOneModel:
338+
checkSize = false
327339
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
328340
doc, err = (&clientUpdateDoc{
329341
namespace: nsIdx,
@@ -335,6 +347,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
335347
upsert: model.Upsert,
336348
multi: false,
337349
checkDollarKey: false,
350+
sizeLimit: maxDocSize,
338351
}).marshal(mb.client.bsonOpts, mb.client.registry)
339352
case *ClientDeleteOneModel:
340353
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
@@ -362,8 +375,8 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
362375
return 0, nil, err
363376
}
364377
length := len(doc)
365-
if length > maxDocSize {
366-
break
378+
if maxDocSize > 0 && length > maxDocSize+16*1024 {
379+
return 0, nil, driver.ErrDocumentTooLarge
367380
}
368381
if !exists {
369382
length += len(ns)
@@ -389,6 +402,9 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
389402
dst = fn.updateLength(dst, opsIdx, int32(len(dst[opsIdx:])))
390403
nsDst = fn.updateLength(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
391404
dst = append(dst, nsDst...)
405+
if checkSize && maxDocSize > 0 && len(dst)-l > maxDocSize+16*1024 {
406+
return 0, nil, driver.ErrDocumentTooLarge
407+
}
392408

393409
mb.retryMode = driver.RetryNone
394410
if mb.client.retryWrites && canRetry {
@@ -424,6 +440,19 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
424440
if err != nil {
425441
return err
426442
}
443+
if !res.Ok {
444+
return ClientBulkWriteException{
445+
TopLevelError: &WriteError{
446+
Code: int(res.Code),
447+
Message: res.Errmsg,
448+
Raw: bson.Raw(resp),
449+
},
450+
WriteConcernErrors: mb.writeConcernErrors,
451+
WriteErrors: mb.writeErrors,
452+
PartialResult: mb.result,
453+
}
454+
}
455+
427456
mb.result.DeletedCount += int64(res.NDeleted)
428457
mb.result.InsertedCount += int64(res.NInserted)
429458
mb.result.MatchedCount += int64(res.NMatched)
@@ -470,21 +499,12 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
470499
if err != nil {
471500
return err
472501
}
473-
isOrdered := mb.ordered == nil || *mb.ordered
474-
if isOrdered && (writeCmdErr.WriteConcernError != nil || !ok || !res.Ok || res.NErrors > 0) {
475-
exception := ClientBulkWriteException{
502+
if mb.ordered && (writeCmdErr.WriteConcernError != nil || !ok || !res.Ok || res.NErrors > 0) {
503+
return ClientBulkWriteException{
476504
WriteConcernErrors: mb.writeConcernErrors,
477505
WriteErrors: mb.writeErrors,
478506
PartialResult: mb.result,
479507
}
480-
if !res.Ok {
481-
exception.TopLevelError = &WriteError{
482-
Code: int(res.Code),
483-
Message: res.Errmsg,
484-
Raw: bson.Raw(resp),
485-
}
486-
}
487-
return exception
488508
}
489509
return nil
490510
}
@@ -558,6 +578,8 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
558578
type clientInsertDoc struct {
559579
namespace int
560580
document interface{}
581+
582+
sizeLimit int
561583
}
562584

563585
func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bsoncodec.Registry) (interface{}, bsoncore.Document, error) {
@@ -568,6 +590,9 @@ func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bsonc
568590
if err != nil {
569591
return nil, nil, err
570592
}
593+
if d.sizeLimit > 0 && len(f) > d.sizeLimit {
594+
return nil, nil, driver.ErrDocumentTooLarge
595+
}
571596
var id interface{}
572597
f, id, err = ensureID(f, primitive.NilObjectID, bsonOpts, registry)
573598
if err != nil {
@@ -588,6 +613,8 @@ type clientUpdateDoc struct {
588613
upsert *bool
589614
multi bool
590615
checkDollarKey bool
616+
617+
sizeLimit int
591618
}
592619

593620
func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bsoncodec.Registry) (bsoncore.Document, error) {
@@ -605,6 +632,9 @@ func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bsonc
605632
if err != nil {
606633
return nil, err
607634
}
635+
if d.sizeLimit > 0 && len(u.Data) > d.sizeLimit {
636+
return nil, driver.ErrDocumentTooLarge
637+
}
608638
doc = bsoncore.AppendValueElement(doc, "updateMods", u)
609639
doc = bsoncore.AppendBooleanElement(doc, "multi", d.multi)
610640

mongo/client_bulk_write_models.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
type ClientWriteModels struct {
1717
models []clientWriteModel
1818
}
19-
2019
type clientWriteModel struct {
2120
namespace string
2221
model interface{}
@@ -106,7 +105,7 @@ func (m *ClientWriteModels) AppendDeleteMany(database, collection string, models
106105
return m
107106
}
108107

109-
// ClientInsertOneModel is used to insert a single document in a BulkWrite operation.
108+
// ClientInsertOneModel is used to insert a single document in a client-level BulkWrite operation.
110109
type ClientInsertOneModel struct {
111110
Document interface{}
112111
}
@@ -166,7 +165,7 @@ func (uom *ClientUpdateOneModel) SetCollation(collation *options.Collation) *Cli
166165
}
167166

168167
// SetUpsert specifies whether or not a new document should be inserted if no document matching the filter is found. If
169-
// an upsert is performed, the _id of the upserted document can be retrieved from the UpsertedIDs field of the
168+
// an upsert is performed, the _id of the upserted document can be retrieved from the UpdateResults field of the
170169
// ClientBulkWriteResult.
171170
func (uom *ClientUpdateOneModel) SetUpsert(upsert bool) *ClientUpdateOneModel {
172171
uom.Upsert = &upsert
@@ -219,7 +218,7 @@ func (umm *ClientUpdateManyModel) SetCollation(collation *options.Collation) *Cl
219218
}
220219

221220
// SetUpsert specifies whether or not a new document should be inserted if no document matching the filter is found. If
222-
// an upsert is performed, the _id of the upserted document can be retrieved from the UpsertedIDs field of the
221+
// an upsert is performed, the _id of the upserted document can be retrieved from the UpdateResults field of the
223222
// ClientBulkWriteResult.
224223
func (umm *ClientUpdateManyModel) SetUpsert(upsert bool) *ClientUpdateManyModel {
225224
umm.Upsert = &upsert
@@ -265,7 +264,7 @@ func (rom *ClientReplaceOneModel) SetCollation(collation *options.Collation) *Cl
265264
}
266265

267266
// SetUpsert specifies whether or not the replacement document should be inserted if no document matching the filter is
268-
// found. If an upsert is performed, the _id of the upserted document can be retrieved from the UpsertedIDs field of the
267+
// found. If an upsert is performed, the _id of the upserted document can be retrieved from the UpdateResults field of the
269268
// BulkWriteResult.
270269
func (rom *ClientReplaceOneModel) SetUpsert(upsert bool) *ClientReplaceOneModel {
271270
rom.Upsert = &upsert

mongo/errors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,20 @@ func (bwe BulkWriteException) serverError() {}
611611

612612
// ClientBulkWriteException is the error type returned by ClientBulkWrite operations.
613613
type ClientBulkWriteException struct {
614+
// A top-level error that occurred when attempting to communicate with the server
615+
// or execute the bulk write. This value may not be populated if the exception was
616+
// thrown due to errors occurring on individual writes.
614617
TopLevelError *WriteError
615618

616619
// The write concern errors that occurred.
617620
WriteConcernErrors []WriteConcernError
618621

619622
// The write errors that occurred during individual operation execution.
623+
// This map will contain at most one entry if the bulk write was ordered.
620624
WriteErrors map[int]WriteError
621625

626+
// The results of any successful operations that were performed before the error
627+
// was encountered.
622628
PartialResult *ClientBulkWriteResult
623629
}
624630

mongo/options/clientbulkwriteoptions.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,19 @@ import (
1212

1313
// ClientBulkWriteOptions represents options that can be used to configure a client-level BulkWrite operation.
1414
type ClientBulkWriteOptions struct {
15-
// If true, writes executed as part of the operation will opt out of document-level validation on the server. This
16-
// option is valid for MongoDB versions >= 3.2 and is ignored for previous server versions. The default value is
17-
// false. See https://www.mongodb.com/docs/manual/core/schema-validation/ for more information about document
18-
// validation.
15+
// If true, writes executed as part of the operation will opt out of document-level validation on the server. The
16+
// default value is false. See https://www.mongodb.com/docs/manual/core/schema-validation/ for more information
17+
// about document validation.
1918
BypassDocumentValidation *bool
2019

2120
// A string or document that will be included in server logs, profiling logs, and currentOp queries to help trace
22-
// the operation. The default value is nil, which means that no comment will be included in the logs.
21+
// the operation. The default value is nil, which means that no comment will be included in the logs.
2322
Comment interface{}
2423

2524
// If true, no writes will be executed after one fails. The default value is true.
2625
Ordered *bool
2726

28-
// Specifies parameters for all update and delete commands in the BulkWrite. This option is only valid for MongoDB
29-
// versions >= 5.0. Older servers will report an error for using this option. This must be a document mapping
27+
// Specifies parameters for all update and delete commands in the BulkWrite. This must be a document mapping
3028
// parameter names to values. Values must be constant or closed expressions that do not reference document fields.
3129
// Parameters can then be accessed as variables in an aggregate expression context (e.g. "$$var").
3230
Let interface{}
@@ -63,8 +61,7 @@ func (b *ClientBulkWriteOptions) SetBypassDocumentValidation(bypass bool) *Clien
6361
return b
6462
}
6563

66-
// SetLet sets the value for the Let field. Let specifies parameters for all update and delete commands in the BulkWrite.
67-
// This option is only valid for MongoDB versions >= 5.0. Older servers will report an error for using this option.
64+
// SetLet sets the value for the Let field. Let specifies parameters for all update and delete commands in the ClientBulkWrite.
6865
// This must be a document mapping parameter names to values. Values must be constant or closed expressions that do not
6966
// reference document fields. Parameters can then be accessed as variables in an aggregate expression context (e.g. "$$var").
7067
func (b *ClientBulkWriteOptions) SetLet(let interface{}) *ClientBulkWriteOptions {
@@ -84,8 +81,8 @@ func (b *ClientBulkWriteOptions) SetVerboseResults(verboseResults bool) *ClientB
8481
return b
8582
}
8683

87-
// MergeClientBulkWriteOptions combines the given BulkWriteOptions instances into a single BulkWriteOptions in a last-one-wins
88-
// fashion.
84+
// MergeClientBulkWriteOptions combines the given ClientBulkWriteOptions instances into a single
85+
// ClientBulkWriteOptions in a last-one-wins fashion.
8986
//
9087
// Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
9188
// single options struct instead.

0 commit comments

Comments
 (0)