Skip to content

Commit 4f738e2

Browse files
committed
WIP
1 parent 08a06da commit 4f738e2

File tree

3 files changed

+46
-22
lines changed

3 files changed

+46
-22
lines changed

mongo/client_bulk_write.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
5858
}
5959
resMap := make([]interface{}, len(bw.models))
6060
insIDMap := make(map[int]interface{})
61+
canRetry := true
6162
for i, v := range bw.models {
6263
var doc bsoncore.Document
6364
var err error
@@ -94,6 +95,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
9495
bw.client.bsonOpts,
9596
bw.client.registry)
9697
case *ClientUpdateManyModel:
98+
canRetry = false
9799
nsIdx = getNsIndex(model.Namespace)
98100
if bw.result.UpdateResults == nil {
99101
bw.result.UpdateResults = make(map[int64]ClientUpdateResult)
@@ -144,6 +146,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
144146
bw.client.bsonOpts,
145147
bw.client.registry)
146148
case *ClientDeleteManyModel:
149+
canRetry = false
147150
nsIdx = getNsIndex(model.Namespace)
148151
if bw.result.DeleteResults == nil {
149152
bw.result.DeleteResults = make(map[int64]ClientDeleteResult)
@@ -168,24 +171,22 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
168171
Documents: docs,
169172
Ordered: bw.ordered,
170173
}
174+
retry := driver.RetryNone
175+
if bw.client.retryWrites && canRetry {
176+
retry = driver.RetryOncePerCommand
177+
}
171178
op := operation.NewCommandFn(bw.newCommand(nsList)).Batches(batches).
172179
Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.client.monitor).
173180
ServerSelector(bw.selector).ClusterClock(bw.client.clock).
174-
Database("admin").
181+
Database("admin").Type(driver.Write).RetryMode(retry).
175182
Deployment(bw.client.deployment).Crypt(bw.client.cryptFLE).
176183
ServerAPI(bw.client.serverAPI).Timeout(bw.client.timeout).
177184
Logger(bw.client.logger).Authenticator(bw.client.authenticator).Name("bulkWrite")
178-
opErr := op.Execute(ctx)
179-
var wcErrs []*WriteConcernError
180-
if opErr != nil {
181-
if errors.Is(opErr, driver.ErrUnacknowledgedWrite) {
185+
err := op.Execute(ctx)
186+
if err != nil {
187+
if errors.Is(err, driver.ErrUnacknowledgedWrite) {
182188
return nil
183189
}
184-
var writeErr driver.WriteCommandError
185-
if errors.As(opErr, &writeErr) {
186-
wcErr := convertDriverWriteConcernError(writeErr.WriteConcernError)
187-
wcErrs = append(wcErrs, wcErr)
188-
}
189190
}
190191
var res struct {
191192
Ok bool
@@ -212,34 +213,33 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
212213
bw.result.UpsertedCount = int64(res.NUpserted)
213214
errors := make(map[int64]WriteError)
214215
for i, cur := range res.Cursor.FirstBatch {
215-
switch res := resMap[i].(type) {
216+
switch r := resMap[i].(type) {
216217
case map[int64]ClientDeleteResult:
217-
if err := appendDeleteResult(cur, res, errors); err != nil {
218+
if err := appendDeleteResult(cur, r, errors); err != nil {
218219
return err
219220
}
220221
case map[int64]ClientInsertResult:
221-
if err := appendInsertResult(cur, res, errors, insIDMap); err != nil {
222+
if err := appendInsertResult(cur, r, errors, insIDMap); err != nil {
222223
return err
223224
}
224225
case map[int64]ClientUpdateResult:
225-
if err := appendUpdateResult(cur, res, errors); err != nil {
226+
if err := appendUpdateResult(cur, r, errors); err != nil {
226227
return err
227228
}
228229
}
229230
}
230-
if !res.Ok || res.NErrors > 0 || opErr != nil {
231+
if !res.Ok || res.NErrors > 0 {
231232
return ClientBulkWriteException{
232233
TopLevelError: &WriteError{
233234
Code: int(res.Code),
234235
Message: res.Errmsg,
235236
Raw: bson.Raw(rawRes),
236237
},
237-
WriteConcernErrors: wcErrs,
238-
WriteErrors: errors,
239-
PartialResult: &bw.result,
238+
WriteErrors: errors,
239+
PartialResult: &bw.result,
240240
}
241241
}
242-
return nil
242+
return err
243243
}
244244

245245
func (bw *clientBulkWrite) newCommand(nsList []string) func([]byte, description.SelectedServer) ([]byte, error) {

x/mongo/driver/operation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ func (op Operation) Execute(ctx context.Context) error {
598598
if conn != nil {
599599
// If we are dealing with a sharded cluster, then mark the failed server
600600
// as "deprioritized".
601-
if desc := conn.Description; desc != nil && op.Deployment.Kind() == description.Sharded {
601+
if op.Deployment.Kind() == description.Sharded {
602602
deprioritizedServers = []description.Server{conn.Description()}
603603
}
604604

x/mongo/driver/operation/command.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ type Command struct {
2626
name string
2727
authenticator driver.Authenticator
2828
commandFn func([]byte, description.SelectedServer) ([]byte, error)
29-
batches *driver.Batches
3029
database string
3130
deployment driver.Deployment
3231
selector description.ServerSelector
3332
writeConcern *writeconcern.WriteConcern
3433
readPreference *readpref.ReadPref
3534
clock *session.ClusterClock
35+
retry *driver.RetryMode
36+
opType driver.Type
37+
batches *driver.Batches
3638
session *session.Client
3739
monitor *event.CommandMonitor
3840
resultResponse bsoncore.Document
@@ -95,7 +97,6 @@ func (c *Command) Execute(ctx context.Context) error {
9597

9698
return driver.Operation{
9799
CommandFn: c.commandFn,
98-
Batches: c.batches,
99100
ProcessResponseFn: func(info driver.ResponseInfo) error {
100101
c.resultResponse = info.ServerResponse
101102

@@ -113,6 +114,9 @@ func (c *Command) Execute(ctx context.Context) error {
113114
},
114115
Client: c.session,
115116
Clock: c.clock,
117+
RetryMode: c.retry,
118+
Type: c.opType,
119+
Batches: c.batches,
116120
CommandMonitor: c.monitor,
117121
Database: c.database,
118122
Deployment: c.deployment,
@@ -168,6 +172,26 @@ func (c *Command) Batches(batches *driver.Batches) *Command {
168172
return c
169173
}
170174

175+
// RetryMode sets the RetryMode for this operation.
176+
func (c *Command) RetryMode(retry driver.RetryMode) *Command {
177+
if c == nil {
178+
c = new(Command)
179+
}
180+
181+
c.retry = &retry
182+
return c
183+
}
184+
185+
// Type sets the opType for this operation.
186+
func (c *Command) Type(t driver.Type) *Command {
187+
if c == nil {
188+
c = new(Command)
189+
}
190+
191+
c.opType = t
192+
return c
193+
}
194+
171195
// Database sets the database to run this operation against.
172196
func (c *Command) Database(database string) *Command {
173197
if c == nil {

0 commit comments

Comments
 (0)