Skip to content

Commit cb5e8aa

Browse files
committed
WIP
1 parent 00cc83a commit cb5e8aa

File tree

3 files changed

+304
-62
lines changed

3 files changed

+304
-62
lines changed

mongo/client_bulk_write.go

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,31 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
7373
Authenticator: bw.client.authenticator,
7474
Name: "bulkWrite",
7575
}.Execute(ctx)
76-
if err != nil && errors.Is(err, driver.ErrUnacknowledgedWrite) {
77-
return nil
76+
var exception *ClientBulkWriteException
77+
switch tt := err.(type) {
78+
case CommandError:
79+
exception = &ClientBulkWriteException{
80+
TopLevelError: &WriteError{
81+
Code: int(tt.Code),
82+
Message: tt.Message,
83+
Raw: tt.Raw,
84+
},
85+
}
86+
default:
87+
if errors.Is(err, driver.ErrUnacknowledgedWrite) {
88+
err = nil
89+
}
90+
}
91+
if len(batches.writeConcernErrors) > 0 || len(batches.writeErrors) > 0 {
92+
if exception == nil {
93+
exception = new(ClientBulkWriteException)
94+
}
95+
exception.WriteConcernErrors = batches.writeConcernErrors
96+
exception.WriteErrors = batches.writeErrors
97+
}
98+
if exception != nil {
99+
exception.PartialResult = batches.result
100+
return *exception
78101
}
79102
return err
80103
}
@@ -219,7 +242,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
219242
return 0, dst, io.EOF
220243
}
221244

222-
mb.cursorHandlers = make([]func(*cursorInfo, bson.Raw) bool, len(mb.models))
245+
mb.cursorHandlers = mb.cursorHandlers[:0]
223246
mb.newIDMap = make(map[int]interface{})
224247

225248
nsMap := make(map[string]int)
@@ -240,6 +263,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
240263
opsIdx, dst := fn.appendStart(dst, "ops")
241264
nsIdx, nsDst := fn.appendStart(nil, "nsInfo")
242265

266+
totalSize -= 1000
243267
size := (len(dst) - l) * 2
244268
var n int
245269
for i := mb.offset; i < len(mb.models); i++ {
@@ -254,7 +278,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
254278
var err error
255279
switch model := mb.models[i].model.(type) {
256280
case *ClientInsertOneModel:
257-
mb.cursorHandlers[i] = mb.appendInsertResult
281+
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendInsertResult)
258282
var id interface{}
259283
id, doc, err = (&clientInsertDoc{
260284
namespace: nsIdx,
@@ -265,7 +289,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
265289
}
266290
mb.newIDMap[i] = id
267291
case *ClientUpdateOneModel:
268-
mb.cursorHandlers[i] = mb.appendUpdateResult
292+
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
269293
doc, err = (&clientUpdateDoc{
270294
namespace: nsIdx,
271295
filter: model.Filter,
@@ -279,7 +303,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
279303
}).marshal(mb.client.bsonOpts, mb.client.registry)
280304
case *ClientUpdateManyModel:
281305
canRetry = false
282-
mb.cursorHandlers[i] = mb.appendUpdateResult
306+
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
283307
doc, err = (&clientUpdateDoc{
284308
namespace: nsIdx,
285309
filter: model.Filter,
@@ -292,7 +316,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
292316
checkDollarKey: true,
293317
}).marshal(mb.client.bsonOpts, mb.client.registry)
294318
case *ClientReplaceOneModel:
295-
mb.cursorHandlers[i] = mb.appendUpdateResult
319+
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
296320
doc, err = (&clientUpdateDoc{
297321
namespace: nsIdx,
298322
filter: model.Filter,
@@ -305,7 +329,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
305329
checkDollarKey: false,
306330
}).marshal(mb.client.bsonOpts, mb.client.registry)
307331
case *ClientDeleteOneModel:
308-
mb.cursorHandlers[i] = mb.appendDeleteResult
332+
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
309333
doc, err = (&clientDeleteDoc{
310334
namespace: nsIdx,
311335
filter: model.Filter,
@@ -315,22 +339,27 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
315339
}).marshal(mb.client.bsonOpts, mb.client.registry)
316340
case *ClientDeleteManyModel:
317341
canRetry = false
318-
mb.cursorHandlers[i] = mb.appendDeleteResult
342+
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
319343
doc, err = (&clientDeleteDoc{
320344
namespace: nsIdx,
321345
filter: model.Filter,
322346
collation: model.Collation,
323347
hint: model.Hint,
324348
multi: true,
325349
}).marshal(mb.client.bsonOpts, mb.client.registry)
350+
default:
351+
mb.cursorHandlers = append(mb.cursorHandlers, nil)
326352
}
327353
if err != nil {
328354
return 0, nil, err
329355
}
330-
length := len(doc) + len(ns)
356+
length := len(doc)
331357
if length > maxDocSize {
332358
break
333359
}
360+
if !exists {
361+
length += len(ns)
362+
}
334363
size += length
335364
if size >= totalSize {
336365
break
@@ -369,7 +398,6 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
369398
mb.writeConcernErrors = append(mb.writeConcernErrors, *wce)
370399
}
371400
}
372-
// closeImplicitSession(sess)
373401
if len(resp) == 0 {
374402
return nil
375403
}
@@ -435,8 +463,9 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
435463
if err != nil {
436464
return err
437465
}
466+
isOrdered := mb.ordered == nil || *mb.ordered
438467
fmt.Println("ProcessResponse toplevelerror", res.Ok, res.NErrors, res.Code, res.Errmsg)
439-
if writeCmdErr.WriteConcernError != nil || !ok || !res.Ok || res.NErrors > 0 {
468+
if isOrdered && (writeCmdErr.WriteConcernError != nil || !ok || !res.Ok || res.NErrors > 0) {
440469
exception := ClientBulkWriteException{
441470
WriteConcernErrors: mb.writeConcernErrors,
442471
WriteErrors: mb.writeErrors,
@@ -455,48 +484,51 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
455484
}
456485

457486
func (mb *modelBatches) appendDeleteResult(cur *cursorInfo, raw bson.Raw) bool {
487+
idx := int(cur.Idx) + mb.offset
458488
if err := cur.extractError(); err != nil {
459489
err.Raw = raw
460490
if mb.writeErrors == nil {
461491
mb.writeErrors = make(map[int]WriteError)
462492
}
463-
mb.writeErrors[int(cur.Idx)] = *err
493+
mb.writeErrors[idx] = *err
464494
return false
465495
}
466496

467497
if mb.result.DeleteResults == nil {
468498
mb.result.DeleteResults = make(map[int]ClientDeleteResult)
469499
}
470-
mb.result.DeleteResults[int(cur.Idx)] = ClientDeleteResult{int64(cur.N)}
500+
mb.result.DeleteResults[idx] = ClientDeleteResult{int64(cur.N)}
471501

472502
return true
473503
}
474504

475505
func (mb *modelBatches) appendInsertResult(cur *cursorInfo, raw bson.Raw) bool {
506+
idx := int(cur.Idx) + mb.offset
476507
if err := cur.extractError(); err != nil {
477508
err.Raw = raw
478509
if mb.writeErrors == nil {
479510
mb.writeErrors = make(map[int]WriteError)
480511
}
481-
mb.writeErrors[int(cur.Idx)] = *err
512+
mb.writeErrors[idx] = *err
482513
return false
483514
}
484515

485516
if mb.result.InsertResults == nil {
486517
mb.result.InsertResults = make(map[int]ClientInsertResult)
487518
}
488-
mb.result.InsertResults[int(cur.Idx)] = ClientInsertResult{mb.newIDMap[int(cur.Idx)]}
519+
mb.result.InsertResults[idx] = ClientInsertResult{mb.newIDMap[idx]}
489520

490521
return true
491522
}
492523

493524
func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
525+
idx := int(cur.Idx) + mb.offset
494526
if err := cur.extractError(); err != nil {
495527
err.Raw = raw
496528
if mb.writeErrors == nil {
497529
mb.writeErrors = make(map[int]WriteError)
498530
}
499-
mb.writeErrors[int(cur.Idx)] = *err
531+
mb.writeErrors[idx] = *err
500532
return false
501533
}
502534

@@ -512,7 +544,7 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
512544
if cur.Upserted != nil {
513545
result.UpsertedID = cur.Upserted.ID
514546
}
515-
mb.result.UpdateResults[int(cur.Idx)] = result
547+
mb.result.UpdateResults[idx] = result
516548

517549
return true
518550
}

0 commit comments

Comments
 (0)