Skip to content

Commit dec06b9

Browse files
committed
WIP
1 parent 45d22f8 commit dec06b9

File tree

11 files changed

+249
-231
lines changed

11 files changed

+249
-231
lines changed

mongo/client.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error)
426426
return nil, replaceErrors(err)
427427
}
428428

429-
// Writes are not retryable on standalones, so let operation determine whether to retry
430-
sess.RetryWrite = false
431429
sess.RetryRead = c.retryReads
432430

433431
return &sessionImpl{

mongo/client_bulk_write.go

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
// bulkWrite performs a bulkwrite operation
2929
type clientBulkWrite struct {
30-
models []interface{}
30+
models []clientWriteModel
3131
errorsOnly bool
3232
ordered *bool
3333
bypassDocumentValidation *bool
@@ -46,11 +46,12 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
4646
return errors.New("empty write models")
4747
}
4848
batches := &modelBatches{
49-
session: bw.session,
50-
client: bw.client,
51-
ordered: bw.ordered,
52-
models: bw.models,
53-
result: &bw.result,
49+
session: bw.session,
50+
client: bw.client,
51+
ordered: bw.ordered,
52+
models: bw.models,
53+
result: &bw.result,
54+
retryMode: driver.RetryOnce,
5455
}
5556
err := driver.Operation{
5657
CommandFn: bw.newCommand(),
@@ -142,7 +143,7 @@ type modelBatches struct {
142143
client *Client
143144

144145
ordered *bool
145-
models []interface{}
146+
models []clientWriteModel
146147

147148
offset int
148149

@@ -222,17 +223,14 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
222223
mb.newIDMap = make(map[int]interface{})
223224

224225
nsMap := make(map[string]int)
225-
getNsIndex := func(namespace string) (int, bsoncore.Document) {
226-
idx, doc := bsoncore.AppendDocumentStart(nil)
227-
doc = bsoncore.AppendStringElement(doc, "ns", namespace)
228-
doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
229-
230-
if v, ok := nsMap[namespace]; ok {
231-
return v, doc
226+
getNsIndex := func(namespace string) (int, bool) {
227+
v, ok := nsMap[namespace]
228+
if ok {
229+
return v, ok
232230
}
233231
nsIdx := len(nsMap)
234232
nsMap[namespace] = nsIdx
235-
return nsIdx, doc
233+
return nsIdx, ok
236234
}
237235

238236
canRetry := true
@@ -249,12 +247,13 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
249247
break
250248
}
251249

252-
var nsIdx int
253-
var ns, doc bsoncore.Document
250+
ns := mb.models[i].namespace
251+
nsIdx, exists := getNsIndex(ns)
252+
253+
var doc bsoncore.Document
254254
var err error
255-
switch model := mb.models[i].(type) {
255+
switch model := mb.models[i].model.(type) {
256256
case *ClientInsertOneModel:
257-
nsIdx, ns = getNsIndex(model.Namespace)
258257
mb.cursorHandlers[i] = mb.appendInsertResult
259258
var id interface{}
260259
id, doc, err = (&clientInsertDoc{
@@ -266,7 +265,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
266265
}
267266
mb.newIDMap[i] = id
268267
case *ClientUpdateOneModel:
269-
nsIdx, ns = getNsIndex(model.Namespace)
270268
mb.cursorHandlers[i] = mb.appendUpdateResult
271269
doc, err = (&clientUpdateDoc{
272270
namespace: nsIdx,
@@ -281,7 +279,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
281279
}).marshal(mb.client.bsonOpts, mb.client.registry)
282280
case *ClientUpdateManyModel:
283281
canRetry = false
284-
nsIdx, ns = getNsIndex(model.Namespace)
285282
mb.cursorHandlers[i] = mb.appendUpdateResult
286283
doc, err = (&clientUpdateDoc{
287284
namespace: nsIdx,
@@ -295,7 +292,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
295292
checkDollarKey: true,
296293
}).marshal(mb.client.bsonOpts, mb.client.registry)
297294
case *ClientReplaceOneModel:
298-
nsIdx, ns = getNsIndex(model.Namespace)
299295
mb.cursorHandlers[i] = mb.appendUpdateResult
300296
doc, err = (&clientUpdateDoc{
301297
namespace: nsIdx,
@@ -309,7 +305,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
309305
checkDollarKey: false,
310306
}).marshal(mb.client.bsonOpts, mb.client.registry)
311307
case *ClientDeleteOneModel:
312-
nsIdx, ns = getNsIndex(model.Namespace)
313308
mb.cursorHandlers[i] = mb.appendDeleteResult
314309
doc, err = (&clientDeleteDoc{
315310
namespace: nsIdx,
@@ -320,7 +315,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
320315
}).marshal(mb.client.bsonOpts, mb.client.registry)
321316
case *ClientDeleteManyModel:
322317
canRetry = false
323-
nsIdx, ns = getNsIndex(model.Namespace)
324318
mb.cursorHandlers[i] = mb.appendDeleteResult
325319
doc, err = (&clientDeleteDoc{
326320
namespace: nsIdx,
@@ -343,7 +337,12 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
343337
}
344338

345339
dst = fn.appendDocument(dst, strconv.Itoa(n), doc)
346-
nsDst = fn.appendDocument(nsDst, strconv.Itoa(n), ns)
340+
if !exists {
341+
idx, doc := bsoncore.AppendDocumentStart(nil)
342+
doc = bsoncore.AppendStringElement(doc, "ns", ns)
343+
doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
344+
nsDst = fn.appendDocument(nsDst, strconv.Itoa(n), doc)
345+
}
347346
n++
348347
}
349348
if n == 0 {
@@ -356,7 +355,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
356355

357356
mb.retryMode = driver.RetryNone
358357
if mb.client.retryWrites && canRetry {
359-
mb.retryMode = driver.RetryOncePerCommand
358+
mb.retryMode = driver.RetryOnce
360359
}
361360
return n, dst, nil
362361
}
@@ -430,7 +429,7 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
430429
if int(cur.Idx) >= len(mb.cursorHandlers) {
431430
continue
432431
}
433-
ok = ok && mb.cursorHandlers[int(cur.Idx)](&cur, cursor.Current)
432+
ok = mb.cursorHandlers[int(cur.Idx)](&cur, cursor.Current) && ok
434433
}
435434
err = cursor.Err()
436435
if err != nil {
@@ -456,32 +455,51 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
456455
}
457456

458457
func (mb *modelBatches) appendDeleteResult(cur *cursorInfo, raw bson.Raw) bool {
458+
if err := cur.extractError(); err != nil {
459+
err.Raw = raw
460+
if mb.writeErrors == nil {
461+
mb.writeErrors = make(map[int]WriteError)
462+
}
463+
mb.writeErrors[int(cur.Idx)] = *err
464+
return false
465+
}
466+
459467
if mb.result.DeleteResults == nil {
460468
mb.result.DeleteResults = make(map[int]ClientDeleteResult)
461469
}
462470
mb.result.DeleteResults[int(cur.Idx)] = ClientDeleteResult{int64(cur.N)}
471+
472+
return true
473+
}
474+
475+
func (mb *modelBatches) appendInsertResult(cur *cursorInfo, raw bson.Raw) bool {
463476
if err := cur.extractError(); err != nil {
464477
err.Raw = raw
478+
if mb.writeErrors == nil {
479+
mb.writeErrors = make(map[int]WriteError)
480+
}
465481
mb.writeErrors[int(cur.Idx)] = *err
466482
return false
467483
}
468-
return true
469-
}
470484

471-
func (mb *modelBatches) appendInsertResult(cur *cursorInfo, raw bson.Raw) bool {
472485
if mb.result.InsertResults == nil {
473486
mb.result.InsertResults = make(map[int]ClientInsertResult)
474487
}
475488
mb.result.InsertResults[int(cur.Idx)] = ClientInsertResult{mb.newIDMap[int(cur.Idx)]}
489+
490+
return true
491+
}
492+
493+
func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
476494
if err := cur.extractError(); err != nil {
477495
err.Raw = raw
496+
if mb.writeErrors == nil {
497+
mb.writeErrors = make(map[int]WriteError)
498+
}
478499
mb.writeErrors[int(cur.Idx)] = *err
479500
return false
480501
}
481-
return true
482-
}
483502

484-
func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
485503
if mb.result.UpdateResults == nil {
486504
mb.result.UpdateResults = make(map[int]ClientUpdateResult)
487505
}
@@ -495,11 +513,7 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
495513
result.UpsertedID = cur.Upserted.ID
496514
}
497515
mb.result.UpdateResults[int(cur.Idx)] = result
498-
if err := cur.extractError(); err != nil {
499-
err.Raw = raw
500-
mb.writeErrors[int(cur.Idx)] = *err
501-
return false
502-
}
516+
503517
return true
504518
}
505519

0 commit comments

Comments
 (0)