Skip to content

Commit f3add1f

Browse files
author
Isabella Siu
committed
GODRIVER-873 return errors from unordered BulkWrite
Change-Id: I5b4dbcf6e6ae72a509c67d96141f89f4300ed761
1 parent 3150543 commit f3add1f

File tree

4 files changed

+191
-18
lines changed

4 files changed

+191
-18
lines changed

mongo/collection.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,26 +187,16 @@ func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
187187
coll.registry,
188188
opts...,
189189
)
190-
191-
if err != nil {
192-
if conv, ok := err.(driver.BulkWriteException); ok {
193-
return &BulkWriteResult{}, BulkWriteException{
194-
WriteConcernError: convertWriteConcernError(conv.WriteConcernError),
195-
WriteErrors: convertBulkWriteErrors(conv.WriteErrors),
196-
}
197-
}
198-
199-
return &BulkWriteResult{}, replaceErrors(err)
200-
}
201-
202-
return &BulkWriteResult{
190+
result := BulkWriteResult{
203191
InsertedCount: res.InsertedCount,
204192
MatchedCount: res.MatchedCount,
205193
ModifiedCount: res.ModifiedCount,
206194
DeletedCount: res.DeletedCount,
207195
UpsertedCount: res.UpsertedCount,
208196
UpsertedIDs: res.UpsertedIDs,
209-
}, nil
197+
}
198+
199+
return &result, replaceErrors(err)
210200
}
211201

212202
// InsertOne inserts a single document into the collection.

mongo/collection_internal_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,167 @@ func TestCollection_database_accessor(t *testing.T) {
258258
require.Equal(t, coll.Database().Name(), dbName)
259259
}
260260

261+
func TestCollection_BulkWrite_writeErrors_Insert(t *testing.T) {
262+
if testing.Short() {
263+
t.Skip("skipping integration test in short mode")
264+
}
265+
266+
want := WriteError{Code: 11000}
267+
doc1 := NewInsertOneModel().SetDocument(bson.D{{"_id", "x"}})
268+
doc2 := NewInsertOneModel().SetDocument(bson.D{{"_id", "y"}})
269+
models := []WriteModel{doc1, doc1, doc2, doc2}
270+
271+
t.Run("unordered", func(t *testing.T) {
272+
coll := createTestCollection(t, nil, nil)
273+
res, err := coll.BulkWrite(context.Background(), models, options.BulkWrite().SetOrdered(false))
274+
require.Equal(t, res.InsertedCount, int64(2))
275+
276+
got, ok := err.(BulkWriteException)
277+
if !ok {
278+
t.Errorf("Did not receive correct type of error. got %T; want %T", err, WriteErrors{})
279+
t.FailNow()
280+
}
281+
if len(got.WriteErrors) != 2 {
282+
t.Errorf("Incorrect number of errors receieved. got %d; want %d", len(got.WriteErrors), 2)
283+
t.FailNow()
284+
}
285+
if got.WriteErrors[0].Code != want.Code {
286+
t.Errorf("Did not receive the correct error code. got %d; want %d", got.WriteErrors[0].Code, want.Code)
287+
}
288+
})
289+
290+
t.Run("ordered", func(t *testing.T) {
291+
coll := createTestCollection(t, nil, nil)
292+
res, err := coll.BulkWrite(context.Background(), models, options.BulkWrite())
293+
require.Equal(t, res.InsertedCount, int64(1))
294+
295+
got, ok := err.(BulkWriteException)
296+
if !ok {
297+
t.Errorf("Did not receive correct type of error. got %T; want %T", err, WriteErrors{})
298+
t.FailNow()
299+
}
300+
if len(got.WriteErrors) != 1 {
301+
t.Errorf("Incorrect number of errors receieved. got %d; want %d", len(got.WriteErrors), 1)
302+
t.FailNow()
303+
}
304+
if got.WriteErrors[0].Code != want.Code {
305+
t.Errorf("Did not receive the correct error code. got %d; want %d", got.WriteErrors[0].Code, want.Code)
306+
}
307+
})
308+
}
309+
310+
func TestCollection_BulkWrite_writeErrors_Delete(t *testing.T) {
311+
if testing.Short() {
312+
t.Skip("skipping integration test in short mode")
313+
}
314+
315+
doc := NewDeleteOneModel().SetFilter(bson.D{{"x", 1}})
316+
models := []WriteModel{doc, doc}
317+
318+
db := createTestDatabase(t, nil)
319+
collName := testutil.ColName(t)
320+
err := db.RunCommand(
321+
context.Background(),
322+
bsonx.Doc{
323+
{"create", bsonx.String(collName)},
324+
{"capped", bsonx.Boolean(true)},
325+
{"size", bsonx.Int32(64 * 1024)},
326+
},
327+
).Err()
328+
require.NoError(t, err)
329+
330+
t.Run("unordered", func(t *testing.T) {
331+
coll := db.Collection(collName)
332+
_, err = coll.BulkWrite(context.Background(), models, options.BulkWrite().SetOrdered(false))
333+
334+
got, ok := err.(BulkWriteException)
335+
if !ok {
336+
t.Errorf("Did not receive correct type of error. got %T; want %T", err, WriteErrors{})
337+
t.FailNow()
338+
}
339+
if len(got.WriteErrors) != 2 {
340+
t.Errorf("Incorrect number of errors receieved. got %d; want %d", len(got.WriteErrors), 2)
341+
t.FailNow()
342+
}
343+
if got.WriteErrors[0].Code != 20 && got.WriteErrors[0].Code != 10101 {
344+
t.Errorf("Did not receive the correct error code. got %d; want 20 or 10101", got.WriteErrors[0].Code)
345+
}
346+
})
347+
348+
t.Run("ordered", func(t *testing.T) {
349+
coll := db.Collection(collName)
350+
_, err = coll.BulkWrite(context.Background(), models, options.BulkWrite())
351+
352+
got, ok := err.(BulkWriteException)
353+
if !ok {
354+
t.Errorf("Did not receive correct type of error. got %T; want %T", err, WriteErrors{})
355+
t.FailNow()
356+
}
357+
if len(got.WriteErrors) != 1 {
358+
t.Errorf("Incorrect number of errors receieved. got %d; want %d", len(got.WriteErrors), 1)
359+
t.FailNow()
360+
}
361+
if got.WriteErrors[0].Code != 20 && got.WriteErrors[0].Code != 10101 {
362+
t.Errorf("Did not receive the correct error code. got %d; want 20 or 10101", got.WriteErrors[0].Code)
363+
}
364+
})
365+
}
366+
367+
func TestCollection_BulkWrite_writeErrors_Update(t *testing.T) {
368+
if testing.Short() {
369+
t.Skip("skipping integration test in short mode")
370+
}
371+
372+
doc1 := NewUpdateOneModel().SetFilter(bson.D{{"_id", "foo"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}})
373+
doc2 := NewUpdateOneModel().SetFilter(bson.D{{"_id", "foo"}}).SetUpdate(bson.D{{"$set", bson.D{{"x", "fa"}}}})
374+
models := []WriteModel{doc1, doc1, doc2}
375+
want := WriteError{Code: 66}
376+
377+
t.Run("unordered", func(t *testing.T) {
378+
coll := createTestCollection(t, nil, nil)
379+
_, err := coll.InsertOne(context.Background(), bsonx.Doc{{"_id", bsonx.String("foo")}})
380+
require.NoError(t, err)
381+
382+
res, err := coll.BulkWrite(context.Background(), models, options.BulkWrite().SetOrdered(false))
383+
require.Equal(t, res.ModifiedCount, int64(1))
384+
385+
got, ok := err.(BulkWriteException)
386+
if !ok {
387+
t.Errorf("Did not receive correct type of error. got %T; want %T", err, WriteErrors{})
388+
t.FailNow()
389+
}
390+
if len(got.WriteErrors) != 2 {
391+
t.Errorf("Incorrect number of errors receieved. got %d; want %d", len(got.WriteErrors), 2)
392+
t.FailNow()
393+
}
394+
if got.WriteErrors[0].Code != want.Code {
395+
t.Errorf("Did not receive the correct error code. got %d; want %d", got.WriteErrors[0].Code, want.Code)
396+
}
397+
})
398+
399+
t.Run("ordered", func(t *testing.T) {
400+
coll := createTestCollection(t, nil, nil)
401+
_, err := coll.InsertOne(context.Background(), bsonx.Doc{{"_id", bsonx.String("foo")}})
402+
require.NoError(t, err)
403+
404+
res, err := coll.BulkWrite(context.Background(), models, options.BulkWrite())
405+
require.Equal(t, res.ModifiedCount, int64(0))
406+
407+
got, ok := err.(BulkWriteException)
408+
if !ok {
409+
t.Errorf("Did not receive correct type of error. got %T; want %T", err, WriteErrors{})
410+
t.FailNow()
411+
}
412+
if len(got.WriteErrors) != 1 {
413+
t.Errorf("Incorrect number of errors receieved. got %d; want %d", len(got.WriteErrors), 1)
414+
t.FailNow()
415+
}
416+
if got.WriteErrors[0].Code != want.Code {
417+
t.Errorf("Did not receive the correct error code. got %d; want %d", got.WriteErrors[0].Code, want.Code)
418+
}
419+
})
420+
}
421+
261422
func TestCollection_InsertOne(t *testing.T) {
262423
if testing.Short() {
263424
t.Skip("skipping integration test in short mode")

mongo/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ func replaceErrors(err error) error {
4141
if ce, ok := err.(command.Error); ok {
4242
return CommandError{Code: ce.Code, Message: ce.Message, Labels: ce.Labels, Name: ce.Name}
4343
}
44+
if conv, ok := err.(driver.BulkWriteException); ok {
45+
return BulkWriteException{
46+
WriteConcernError: convertWriteConcernError(conv.WriteConcernError),
47+
WriteErrors: convertBulkWriteErrors(conv.WriteErrors),
48+
}
49+
}
50+
4451
return err
4552
}
4653

x/mongo/driver/bulk_write.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func BulkWrite(
9090
WriteErrors: make([]BulkWriteError, 0),
9191
}
9292

93+
var lastErr error
9394
var opIndex int64 // the operation index for the upsertedIDs map
9495
continueOnError := !ordered
9596
for _, batch := range batches {
@@ -109,16 +110,26 @@ func BulkWrite(
109110

110111
if !continueOnError && (err != nil || len(batchErr.WriteErrors) > 0 || batchErr.WriteConcernError != nil) {
111112
if err != nil {
112-
return result.BulkWrite{}, err
113+
return bwRes, err
113114
}
114115

115-
return result.BulkWrite{}, bwErr
116+
return bwRes, bwErr
117+
}
118+
119+
if err != nil {
120+
lastErr = err
116121
}
117122

118123
opIndex += int64(len(batch.models))
119124
}
120125

121126
bwRes.MatchedCount -= bwRes.UpsertedCount
127+
if lastErr != nil {
128+
return bwRes, lastErr
129+
}
130+
if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
131+
return bwRes, bwErr
132+
}
122133
return bwRes, nil
123134
}
124135

@@ -225,8 +236,9 @@ func runInsert(
225236
WriteConcern: wc,
226237
}
227238

239+
cmd.Opts = []bsonx.Elem{{"ordered", bsonx.Boolean(!continueOnError)}}
228240
if bypassDocValidation != nil {
229-
cmd.Opts = []bsonx.Elem{{"bypassDocumentValidation", bsonx.Boolean(*bypassDocValidation)}}
241+
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*bypassDocValidation)})
230242
}
231243

232244
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite || !batch.canRetry {
@@ -295,6 +307,7 @@ func runDelete(
295307
Clock: clock,
296308
WriteConcern: wc,
297309
}
310+
cmd.Opts = []bsonx.Elem{{"ordered", bsonx.Boolean(!continueOnError)}}
298311

299312
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite || !batch.canRetry {
300313
if cmd.Session != nil {
@@ -366,9 +379,11 @@ func runUpdate(
366379
Clock: clock,
367380
WriteConcern: wc,
368381
}
382+
383+
cmd.Opts = []bsonx.Elem{{"ordered", bsonx.Boolean(!continueOnError)}}
369384
if bypassDocValidation != nil {
370385
// TODO this is temporary!
371-
cmd.Opts = []bsonx.Elem{{"bypassDocumentValidation", bsonx.Boolean(*bypassDocValidation)}}
386+
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*bypassDocValidation)})
372387
//cmd.Opts = []option.UpdateOptioner{option.OptBypassDocumentValidation(bypassDocValidation)}
373388
}
374389

0 commit comments

Comments
 (0)