Skip to content

Commit 755675d

Browse files
committed
WIP
1 parent 7b71cc1 commit 755675d

File tree

14 files changed

+228
-10106
lines changed

14 files changed

+228
-10106
lines changed

mongo/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -880,8 +880,8 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
880880
if bwo.WriteConcern != nil {
881881
wc = bwo.WriteConcern
882882
}
883-
if sess.TransactionRunning() {
884-
wc = nil
883+
if sess.TransactionRunning() && wc != nil {
884+
return nil, errors.New("cannot set write concern after starting a transaction")
885885
}
886886
if !writeconcern.AckWrite(wc) {
887887
sess = nil

mongo/client_bulk_write.go

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mongo
88

99
import (
1010
"context"
11+
"errors"
1112
"strconv"
1213

1314
"go.mongodb.org/mongo-driver/bson"
@@ -40,6 +41,9 @@ type clientBulkWrite struct {
4041
}
4142

4243
func (bw *clientBulkWrite) execute(ctx context.Context) error {
44+
if len(bw.models) == 0 {
45+
return errors.New("empty write models")
46+
}
4347
docs := make([]bsoncore.Document, len(bw.models))
4448
nsMap := make(map[string]int)
4549
var nsList []string
@@ -170,12 +174,21 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
170174
Database("admin").
171175
Deployment(bw.client.deployment).Crypt(bw.client.cryptFLE).
172176
ServerAPI(bw.client.serverAPI).Timeout(bw.client.timeout).
173-
Logger(bw.client.logger).Authenticator(bw.client.authenticator)
174-
err := op.Execute(ctx)
175-
if err != nil {
176-
return err
177+
Logger(bw.client.logger).Authenticator(bw.client.authenticator).Name("bulkWrite")
178+
opErr := op.Execute(ctx)
179+
var wcErrs []*WriteConcernError
180+
if opErr != nil {
181+
if errors.Is(opErr, driver.ErrUnacknowledgedWrite) {
182+
return nil
183+
}
184+
var writeErr driver.WriteCommandError
185+
if errors.As(opErr, &writeErr) {
186+
wcErr := convertDriverWriteConcernError(writeErr.WriteConcernError)
187+
wcErrs = append(wcErrs, wcErr)
188+
}
177189
}
178190
var res struct {
191+
Ok bool
179192
Cursor struct {
180193
FirstBatch []bson.Raw
181194
}
@@ -184,33 +197,48 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
184197
NMatched int32
185198
NModified int32
186199
NUpserted int32
200+
NErrors int32
201+
Code int32
202+
Errmsg string
187203
}
188204
rawRes := op.Result()
189-
err = bson.Unmarshal(rawRes, &res)
190-
if err != nil {
205+
if err := bson.Unmarshal(rawRes, &res); err != nil {
191206
return err
192207
}
193208
bw.result.DeletedCount = int64(res.NDeleted)
194209
bw.result.InsertedCount = int64(res.NInserted)
195210
bw.result.MatchedCount = int64(res.NMatched)
196211
bw.result.ModifiedCount = int64(res.NModified)
197212
bw.result.UpsertedCount = int64(res.NUpserted)
213+
errors := make(map[int64]WriteError)
198214
for i, cur := range res.Cursor.FirstBatch {
199215
switch res := resMap[i].(type) {
200216
case map[int64]ClientDeleteResult:
201-
if err = appendDeleteResult(cur, res); err != nil {
217+
if err := appendDeleteResult(cur, res, errors); err != nil {
202218
return err
203219
}
204220
case map[int64]ClientInsertResult:
205-
if err = appendInsertResult(cur, res, insIDMap); err != nil {
221+
if err := appendInsertResult(cur, res, errors, insIDMap); err != nil {
206222
return err
207223
}
208224
case map[int64]ClientUpdateResult:
209-
if err = appendUpdateResult(cur, res); err != nil {
225+
if err := appendUpdateResult(cur, res, errors); err != nil {
210226
return err
211227
}
212228
}
213229
}
230+
if !res.Ok || res.NErrors > 0 || opErr != nil {
231+
return ClientBulkWriteException{
232+
TopLevelError: &WriteError{
233+
Code: int(res.Code),
234+
Message: res.Errmsg,
235+
Raw: bson.Raw(rawRes),
236+
},
237+
WriteConcernErrors: wcErrs,
238+
WriteErrors: errors,
239+
PartialResult: &bw.result,
240+
}
241+
}
214242
return nil
215243
}
216244

@@ -383,45 +411,75 @@ func createClientDeleteDoc(
383411
return bsoncore.AppendDocumentEnd(doc, didx)
384412
}
385413

386-
func appendDeleteResult(cur bson.Raw, m map[int64]ClientDeleteResult) error {
414+
func appendDeleteResult(cur bson.Raw, m map[int64]ClientDeleteResult, e map[int64]WriteError) error {
387415
var res struct {
388-
Idx int32
389-
N int32
416+
Ok bool
417+
Idx int32
418+
N int32
419+
Code int32
420+
Errmsg string
390421
}
391422
if err := bson.Unmarshal(cur, &res); err != nil {
392423
return err
393424
}
394-
m[int64(res.Idx)] = ClientDeleteResult{int64(res.N)}
425+
if res.Ok {
426+
m[int64(res.Idx)] = ClientDeleteResult{int64(res.N)}
427+
} else {
428+
e[int64(res.Idx)] = WriteError{
429+
Code: int(res.Code),
430+
Message: res.Errmsg,
431+
}
432+
}
395433
return nil
396434
}
397435

398-
func appendInsertResult(cur bson.Raw, m map[int64]ClientInsertResult, insIdMap map[int]interface{}) error {
436+
func appendInsertResult(cur bson.Raw, m map[int64]ClientInsertResult, e map[int64]WriteError, insIDMap map[int]interface{}) error {
399437
var res struct {
400-
Idx int32
438+
Ok bool
439+
Idx int32
440+
Code int32
441+
Errmsg string
401442
}
402443
if err := bson.Unmarshal(cur, &res); err != nil {
403444
return err
404445
}
405-
m[int64(res.Idx)] = ClientInsertResult{insIdMap[int(res.Idx)]}
446+
if res.Ok {
447+
m[int64(res.Idx)] = ClientInsertResult{insIDMap[int(res.Idx)]}
448+
} else {
449+
e[int64(res.Idx)] = WriteError{
450+
Code: int(res.Code),
451+
Message: res.Errmsg,
452+
}
453+
}
406454
return nil
407455
}
408456

409-
func appendUpdateResult(cur bson.Raw, m map[int64]ClientUpdateResult) error {
457+
func appendUpdateResult(cur bson.Raw, m map[int64]ClientUpdateResult, e map[int64]WriteError) error {
410458
var res struct {
459+
Ok bool
411460
Idx int32
412461
N int32
413462
NModified int32
414463
Upserted struct {
415464
ID interface{} `bson:"_id"`
416465
}
466+
Code int32
467+
Errmsg string
417468
}
418469
if err := bson.Unmarshal(cur, &res); err != nil {
419470
return err
420471
}
421-
m[int64(res.Idx)] = ClientUpdateResult{
422-
MatchedCount: int64(res.N),
423-
ModifiedCount: int64(res.NModified),
424-
UpsertedID: res.Upserted.ID,
472+
if res.Ok {
473+
m[int64(res.Idx)] = ClientUpdateResult{
474+
MatchedCount: int64(res.N),
475+
ModifiedCount: int64(res.NModified),
476+
UpsertedID: res.Upserted.ID,
477+
}
478+
} else {
479+
e[int64(res.Idx)] = WriteError{
480+
Code: int(res.Code),
481+
Message: res.Errmsg,
482+
}
425483
}
426484
return nil
427485
}

mongo/errors.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -611,10 +611,10 @@ func (bwe BulkWriteException) serverError() {}
611611

612612
// ClientBulkWriteException is the error type returned by ClientBulkWrite operations.
613613
type ClientBulkWriteException struct {
614-
TopLevelError *error
614+
TopLevelError *WriteError
615615

616616
// The write concern errors that occurred.
617-
WriteConcernErrors []WriteConcernError
617+
WriteConcernErrors []*WriteConcernError
618618

619619
// The write errors that occurred during individual operation execution.
620620
WriteErrors map[int64]WriteError
@@ -626,12 +626,12 @@ type ClientBulkWriteException struct {
626626
func (bwe ClientBulkWriteException) Error() string {
627627
causes := make([]string, 0, 4)
628628
if bwe.TopLevelError != nil {
629-
causes = append(causes, "top level error: "+(*bwe.TopLevelError).Error())
629+
causes = append(causes, "top level error: "+bwe.TopLevelError.Error())
630630
}
631631
if len(bwe.WriteConcernErrors) > 0 {
632632
errs := make([]error, len(bwe.WriteConcernErrors))
633633
for i := 0; i < len(bwe.WriteConcernErrors); i++ {
634-
errs[i] = &bwe.WriteConcernErrors[i]
634+
errs[i] = bwe.WriteConcernErrors[i]
635635
}
636636
causes = append(causes, "write concern errors: "+joinBatchErrors(errs))
637637
}
@@ -643,7 +643,7 @@ func (bwe ClientBulkWriteException) Error() string {
643643
causes = append(causes, "write errors: "+joinBatchErrors(errs))
644644
}
645645
if bwe.PartialResult != nil {
646-
causes = append(causes, fmt.Sprintf("result: %v", bwe.PartialResult))
646+
causes = append(causes, fmt.Sprintf("result: %v", *bwe.PartialResult))
647647
}
648648

649649
message := "bulk write exception: "
@@ -653,9 +653,6 @@ func (bwe ClientBulkWriteException) Error() string {
653653
return "bulk write exception: " + strings.Join(causes, ", ")
654654
}
655655

656-
// serverError implements the ServerError interface.
657-
func (bwe ClientBulkWriteException) serverError() {}
658-
659656
// returnResult is used to determine if a function calling processWriteError should return
660657
// the result or return nil. Since the processWriteError function is used by many different
661658
// methods, both *One and *Many, we need a way to differentiate if the method should return

mongo/integration/unified/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ func extractErrorDetails(err error) (errorDetails, bool) {
182182
details.raw = we.Raw
183183
}
184184
details.labels = converted.Labels
185+
case mongo.ClientBulkWriteException:
186+
details.raw = converted.TopLevelError.Raw
187+
details.codes = append(details.codes, int32(converted.TopLevelError.Code))
185188
default:
186189
return errorDetails{}, false
187190
}

mongo/integration/unified/schema_version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
var (
1818
supportedSchemaVersions = map[int]string{
19-
1: "1.17",
19+
1: "1.21",
2020
}
2121
)
2222

0 commit comments

Comments
 (0)