Skip to content

Commit b65b461

Browse files
GODRIVER-2516 NoWritesPerformed Operation Error Handling (#1052)
Co-authored-by: Benjamin Rewis <[email protected]>
1 parent bc8eee9 commit b65b461

File tree

4 files changed

+125
-1
lines changed

4 files changed

+125
-1
lines changed

mongo/integration/mtest/mongotest.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,19 +637,30 @@ func (t *T) createTestClient() {
637637
if clientOpts.Deployment == nil && t.clientType != Mock && clientOpts.ServerAPIOptions == nil && testContext.requireAPIVersion {
638638
clientOpts.SetServerAPIOptions(options.ServerAPI(driver.TestServerAPIVersion))
639639
}
640-
// command monitor
640+
641+
// Setup command monitor
642+
var customMonitor = clientOpts.Monitor
641643
clientOpts.SetMonitor(&event.CommandMonitor{
642644
Started: func(_ context.Context, cse *event.CommandStartedEvent) {
645+
if customMonitor != nil && customMonitor.Started != nil {
646+
customMonitor.Started(context.Background(), cse)
647+
}
643648
t.monitorLock.Lock()
644649
defer t.monitorLock.Unlock()
645650
t.started = append(t.started, cse)
646651
},
647652
Succeeded: func(_ context.Context, cse *event.CommandSucceededEvent) {
653+
if customMonitor != nil && customMonitor.Succeeded != nil {
654+
customMonitor.Succeeded(context.Background(), cse)
655+
}
648656
t.monitorLock.Lock()
649657
defer t.monitorLock.Unlock()
650658
t.succeeded = append(t.succeeded, cse)
651659
},
652660
Failed: func(_ context.Context, cfe *event.CommandFailedEvent) {
661+
if customMonitor != nil && customMonitor.Failed != nil {
662+
customMonitor.Failed(context.Background(), cfe)
663+
}
653664
t.monitorLock.Lock()
654665
defer t.monitorLock.Unlock()
655666
t.failed = append(t.failed, cfe)

mongo/integration/retryable_writes_prose_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ package integration
99
import (
1010
"bytes"
1111
"context"
12+
"fmt"
1213
"sync"
1314
"testing"
1415
"time"
1516

17+
"github.com/stretchr/testify/require"
1618
"go.mongodb.org/mongo-driver/bson"
19+
"go.mongodb.org/mongo-driver/bson/bsontype"
1720
"go.mongodb.org/mongo-driver/event"
1821
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1922
"go.mongodb.org/mongo-driver/internal/testutil/monitor"
@@ -214,4 +217,72 @@ func TestRetryableWritesProse(t *testing.T) {
214217
"expected an insert event, got a(n) %v event", cmdEvt.CommandName)
215218
}
216219
})
220+
221+
mtNWPOpts := mtest.NewOptions().MinServerVersion("6.0").Topologies(mtest.ReplicaSet)
222+
mt.RunOpts(fmt.Sprintf("%s label returns original error", driver.NoWritesPerformed), mtNWPOpts,
223+
func(mt *mtest.T) {
224+
const shutdownInProgressErrorCode int32 = 91
225+
const notWritablePrimaryErrorCode int32 = 10107
226+
227+
monitor := new(event.CommandMonitor)
228+
mt.ResetClient(options.Client().SetRetryWrites(true).SetMonitor(monitor))
229+
230+
// Configure a fail point for a "ShutdownInProgress" error.
231+
mt.SetFailPoint(mtest.FailPoint{
232+
ConfigureFailPoint: "failCommand",
233+
Mode: mtest.FailPointMode{Times: 1},
234+
Data: mtest.FailPointData{
235+
WriteConcernError: &mtest.WriteConcernErrorData{
236+
Code: shutdownInProgressErrorCode,
237+
},
238+
FailCommands: []string{"insert"},
239+
},
240+
})
241+
242+
// secondFailPointConfigured is used to determine if the conditions from the
243+
// shutdownInProgressErrorCode actually configures the "NoWritablePrimary" fail command.
244+
var secondFailPointConfigured bool
245+
246+
//Set a command monitor on the client that configures a failpoint with a "NoWritesPerformed"
247+
monitor.Succeeded = func(_ context.Context, evt *event.CommandSucceededEvent) {
248+
var errorCode int32
249+
if wce := evt.Reply.Lookup("writeConcernError"); wce.Type == bsontype.EmbeddedDocument {
250+
var ok bool
251+
errorCode, ok = wce.Document().Lookup("code").Int32OK()
252+
if !ok {
253+
t.Fatalf("expected code to be an int32, got %v",
254+
wce.Document().Lookup("code").Type)
255+
return
256+
}
257+
}
258+
259+
// Do not set a fail point if event was not a writeConcernError with an error code for
260+
// "ShutdownInProgress".
261+
if errorCode != shutdownInProgressErrorCode {
262+
return
263+
}
264+
265+
mt.SetFailPoint(mtest.FailPoint{
266+
ConfigureFailPoint: "failCommand",
267+
Mode: mtest.FailPointMode{Times: 1},
268+
Data: mtest.FailPointData{
269+
ErrorCode: notWritablePrimaryErrorCode,
270+
ErrorLabels: &[]string{
271+
driver.NoWritesPerformed,
272+
driver.RetryableWriteError,
273+
},
274+
FailCommands: []string{"insert"},
275+
},
276+
})
277+
secondFailPointConfigured = true
278+
}
279+
280+
// Attempt to insert a document.
281+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
282+
283+
require.True(mt, secondFailPointConfigured)
284+
285+
// Assert that the "NotWritablePrimary" error is returned.
286+
require.True(mt, err.(mongo.WriteException).HasErrorCode(int(shutdownInProgressErrorCode)))
287+
})
217288
}

x/mongo/driver/errors.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ var (
3737
NetworkError = "NetworkError"
3838
// RetryableWriteError is an error lable for retryable write errors.
3939
RetryableWriteError = "RetryableWriteError"
40+
// NoWritesPerformed is an error label indicated that no writes were performed for an operation.
41+
NoWritesPerformed = "NoWritesPerformed"
4042
// ErrCursorNotFound is the cursor not found error for legacy find operations.
4143
ErrCursorNotFound = errors.New("cursor not found")
4244
// ErrUnacknowledgedWrite is returned from functions that have an unacknowledged
@@ -132,6 +134,18 @@ func (wce WriteCommandError) Retryable(wireVersion *description.VersionRange) bo
132134
return (*wce.WriteConcernError).Retryable()
133135
}
134136

137+
// HasErrorLabel returns true if the error contains the specified label.
138+
func (wce WriteCommandError) HasErrorLabel(label string) bool {
139+
if wce.Labels != nil {
140+
for _, l := range wce.Labels {
141+
if l == label {
142+
return true
143+
}
144+
}
145+
}
146+
return false
147+
}
148+
135149
// WriteConcernError is a write concern failure that occurred as a result of a
136150
// write operation.
137151
type WriteConcernError struct {

x/mongo/driver/operation.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ type RetryablePoolError interface {
5959
Retryable() bool
6060
}
6161

62+
// LabeledError is an error that can have error labels added to it.
63+
type LabelledError interface {
64+
HasErrorLabel(string) bool
65+
}
66+
6267
// InvalidOperationError is returned from Validate and indicates that a required field is missing
6368
// from an instance of Operation.
6469
type InvalidOperationError struct{ MissingField string }
@@ -370,6 +375,7 @@ func (op Operation) Execute(ctx context.Context) error {
370375
var res bsoncore.Document
371376
var operationErr WriteCommandError
372377
var prevErr error
378+
var prevIndefiniteErr error
373379
batching := op.Batches.Valid()
374380
retryEnabled := op.RetryMode != nil && op.RetryMode.Enabled()
375381
retrySupported := false
@@ -381,6 +387,16 @@ func (op Operation) Execute(ctx context.Context) error {
381387
resetForRetry := func(err error) {
382388
retries--
383389
prevErr = err
390+
391+
// Set the previous indefinite error to be returned in any case where a retryable write error does not have a
392+
// NoWritesPerfomed label (the definite case).
393+
switch err := err.(type) {
394+
case LabelledError:
395+
if !err.HasErrorLabel(NoWritesPerformed) && err.HasErrorLabel(RetryableWriteError) {
396+
prevIndefiniteErr = err.(error)
397+
}
398+
}
399+
384400
// If we got a connection, close it immediately to release pool resources for
385401
// subsequent retries.
386402
if conn != nil {
@@ -615,6 +631,12 @@ func (op Operation) Execute(ctx context.Context) error {
615631
continue
616632
}
617633

634+
// If the error is no longer retryable and has the NoWritesPerformed label, then we should
635+
// return the previous indefinite error.
636+
if tt.HasErrorLabel(NoWritesPerformed) {
637+
return prevIndefiniteErr
638+
}
639+
618640
// If the operation isn't being retried, process the response
619641
if op.ProcessResponseFn != nil {
620642
info := ResponseInfo{
@@ -702,6 +724,12 @@ func (op Operation) Execute(ctx context.Context) error {
702724
continue
703725
}
704726

727+
// If the error is no longer retryable and has the NoWritesPerformed label, then we should
728+
// return the previous indefinite error.
729+
if tt.HasErrorLabel(NoWritesPerformed) {
730+
return prevIndefiniteErr
731+
}
732+
705733
// If the operation isn't being retried, process the response
706734
if op.ProcessResponseFn != nil {
707735
info := ResponseInfo{

0 commit comments

Comments
 (0)