Skip to content

Commit b7b20d0

Browse files
committed
Add actionable error for retryable writes on unsupported storage engine
GODRIVER-1219 Change-Id: I530785bce7116c30eec96f4c728130d5f9da330c
1 parent 94c698b commit b7b20d0

File tree

4 files changed

+223
-6
lines changed

4 files changed

+223
-6
lines changed

mongo/client_internal_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"path"
1515
"reflect"
16+
"strconv"
1617
"testing"
1718
"time"
1819

@@ -27,7 +28,13 @@ import (
2728
"go.mongodb.org/mongo-driver/mongo/writeconcern"
2829
"go.mongodb.org/mongo-driver/tag"
2930
"go.mongodb.org/mongo-driver/x/bsonx"
31+
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
32+
"go.mongodb.org/mongo-driver/x/mongo/driver"
33+
"go.mongodb.org/mongo-driver/x/mongo/driver/address"
3034
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
35+
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
36+
"go.mongodb.org/mongo-driver/x/mongo/driver/drivertest"
37+
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
3138
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
3239
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
3340
)
@@ -283,6 +290,161 @@ func TestClient_ReplaceTopologyError(t *testing.T) {
283290

284291
}
285292

293+
type retryableSSD struct {
294+
C driver.Connection
295+
}
296+
297+
var _ driver.Deployment = retryableSSD{}
298+
var _ driver.Server = retryableSSD{}
299+
300+
func (rssd retryableSSD) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
301+
return rssd, nil
302+
}
303+
304+
func (rssd retryableSSD) Kind() description.TopologyKind {
305+
return description.Single
306+
}
307+
308+
func (rssd retryableSSD) Connection(context.Context) (driver.Connection, error) {
309+
return rssd.C, nil
310+
}
311+
312+
func (rssd retryableSSD) SupportsRetryWrites() bool {
313+
return true
314+
}
315+
316+
func TestRetryWritesError20Wrapped(t *testing.T) {
317+
serverVersion, err := getServerVersion(createTestDatabase(t, nil))
318+
require.NoError(t, err)
319+
320+
if compareVersions(t, serverVersion, "3.6") < 0 {
321+
t.Skip()
322+
}
323+
324+
idx, writeError := bsoncore.AppendDocumentStart(nil)
325+
writeError = bsoncore.AppendInt32Element(writeError, "ok", 1)
326+
elemIdx, elem := bsoncore.AppendDocumentStart(nil)
327+
elem = bsoncore.AppendInt32Element(elem, "index", 0)
328+
elem = bsoncore.AppendStringElement(elem, "errmsg", "Transaction numbers")
329+
elem = bsoncore.AppendInt32Element(elem, "code", 20)
330+
elem, _ = bsoncore.AppendDocumentEnd(elem, elemIdx)
331+
writeErrorsIdx, writeErrors := bsoncore.AppendArrayStart(nil)
332+
writeErrors = bsoncore.AppendDocumentElement(writeErrors, strconv.Itoa(0), elem)
333+
writeErrors, _ = bsoncore.AppendArrayEnd(writeErrors, writeErrorsIdx)
334+
writeError = bsoncore.AppendArrayElement(writeError, "writeErrors", writeErrors)
335+
writeError, _ = bsoncore.AppendDocumentEnd(writeError, idx)
336+
337+
idx, writeErrorNot20 := bsoncore.AppendDocumentStart(nil)
338+
writeErrorNot20 = bsoncore.AppendInt32Element(writeErrorNot20, "ok", 1)
339+
elemIdx, elem = bsoncore.AppendDocumentStart(nil)
340+
elem = bsoncore.AppendInt32Element(elem, "index", 0)
341+
elem = bsoncore.AppendStringElement(elem, "errmsg", "Transaction numbers")
342+
elem = bsoncore.AppendInt32Element(elem, "code", 19)
343+
elem, _ = bsoncore.AppendDocumentEnd(elem, elemIdx)
344+
writeErrorsIdx, writeErrors = bsoncore.AppendArrayStart(nil)
345+
writeErrors = bsoncore.AppendDocumentElement(writeErrors, strconv.Itoa(0), elem)
346+
writeErrors, _ = bsoncore.AppendArrayEnd(writeErrors, writeErrorsIdx)
347+
writeErrorNot20 = bsoncore.AppendArrayElement(writeErrorNot20, "writeErrors", writeErrors)
348+
writeErrorNot20, _ = bsoncore.AppendDocumentEnd(writeErrorNot20, idx)
349+
350+
idx, writeErrorOnly20 := bsoncore.AppendDocumentStart(nil)
351+
writeErrorOnly20 = bsoncore.AppendInt32Element(writeErrorOnly20, "ok", 1)
352+
elemIdx, elem = bsoncore.AppendDocumentStart(nil)
353+
elem = bsoncore.AppendInt32Element(elem, "index", 0)
354+
elem = bsoncore.AppendStringElement(elem, "errmsg", "something other than transaction numbers")
355+
elem = bsoncore.AppendInt32Element(elem, "code", 20)
356+
elem, _ = bsoncore.AppendDocumentEnd(elem, elemIdx)
357+
writeErrorsIdx, writeErrors = bsoncore.AppendArrayStart(nil)
358+
writeErrors = bsoncore.AppendDocumentElement(writeErrors, strconv.Itoa(0), elem)
359+
writeErrors, _ = bsoncore.AppendArrayEnd(writeErrors, writeErrorsIdx)
360+
writeErrorOnly20 = bsoncore.AppendArrayElement(writeErrorOnly20, "writeErrors", writeErrors)
361+
writeErrorOnly20, _ = bsoncore.AppendDocumentEnd(writeErrorOnly20, idx)
362+
363+
idx, notOk := bsoncore.AppendDocumentStart(nil)
364+
notOk = bsoncore.AppendInt64Element(notOk, "ok", 0)
365+
notOk = bsoncore.AppendStringElement(notOk, "errmsg", "Transaction numbers")
366+
notOk = bsoncore.AppendInt32Element(notOk, "code", 20)
367+
notOk, _ = bsoncore.AppendDocumentEnd(notOk, idx)
368+
369+
idx, not20notOK := bsoncore.AppendDocumentStart(nil)
370+
not20notOK = bsoncore.AppendInt64Element(not20notOK, "ok", 0)
371+
not20notOK = bsoncore.AppendStringElement(not20notOK, "errmsg", "Transaction numbers")
372+
not20notOK = bsoncore.AppendInt32Element(not20notOK, "code", 19)
373+
not20notOK, _ = bsoncore.AppendDocumentEnd(not20notOK, idx)
374+
375+
idx, only20NotOK := bsoncore.AppendDocumentStart(nil)
376+
only20NotOK = bsoncore.AppendInt64Element(only20NotOK, "ok", 0)
377+
only20NotOK = bsoncore.AppendStringElement(only20NotOK, "errmsg", "something other than transaction numbers")
378+
only20NotOK = bsoncore.AppendInt32Element(only20NotOK, "code", 20)
379+
only20NotOK, _ = bsoncore.AppendDocumentEnd(only20NotOK, idx)
380+
381+
tests := []struct {
382+
name string
383+
wireMessage []byte // bsoncore byte slice
384+
shouldError bool
385+
expectedErrorMessage string
386+
}{
387+
{"writeError", writeError, true, driver.ErrUnsupportedStorageEngine.Error()},
388+
{"writeError with only err code 20 and wrong err message", writeErrorOnly20, true, "write command error: [{write errors: [{something other than transaction numbers}]}, {<nil>}]"},
389+
{"writeError with only err code 19 and right err message", writeErrorNot20, true, "write command error: [{write errors: [{Transaction numbers}]}, {<nil>}]"},
390+
{"NotOkError", notOk, true, driver.ErrUnsupportedStorageEngine.Error()},
391+
{"NotOkError with err code 20 and wrong err message", only20NotOK, true, "something other than transaction numbers"},
392+
{"NotOkError with err code 19 and right err message", not20notOK, true, "Transaction numbers"},
393+
}
394+
395+
for _, test := range tests {
396+
t.Run(test.name, func(t *testing.T) {
397+
conn := &drivertest.ChannelConn{
398+
Written: make(chan []byte, 1),
399+
Desc: description.Server{
400+
CanonicalAddr: address.Address("localhost:27017"),
401+
MaxDocumentSize: 16777216,
402+
MaxMessageSize: 48000000,
403+
MaxBatchCount: 100000,
404+
SessionTimeoutMinutes: 30,
405+
Kind: description.RSPrimary,
406+
WireVersion: &description.VersionRange{
407+
Max: 8,
408+
},
409+
},
410+
ReadResp: make(chan []byte, 1),
411+
}
412+
413+
conn.ReadResp <- drivertest.MakeReply(test.wireMessage)
414+
415+
deployment := retryableSSD{C: conn}
416+
417+
client := createTestClient(t)
418+
coll := client.Database("test").Collection("test")
419+
420+
sess, err := client.StartSession()
421+
defer sess.EndSession(context.Background())
422+
noerr(t, err)
423+
424+
idx, writeError = bsoncore.AppendDocumentStart(nil)
425+
writeError = bsoncore.AppendStringElement(writeError, "_id", "1")
426+
writeError, _ = bsoncore.AppendDocumentEnd(writeError, idx)
427+
428+
op := operation.NewInsert(writeError).CommandMonitor(coll.client.monitor).ClusterClock(coll.client.clock).
429+
Database(coll.db.name).Collection(coll.name).
430+
Deployment(coll.client.topology).Deployment(deployment).Retry(driver.RetryOnce).Session(sess.(*sessionImpl).clientSession)
431+
432+
err = op.Execute(context.Background())
433+
if test.shouldError {
434+
if err == nil || err.Error() != test.expectedErrorMessage {
435+
t.Fatalf("unexpected error occured, wanted: %v got: %v", test.expectedErrorMessage, err)
436+
}
437+
return
438+
}
439+
440+
if err != nil {
441+
t.Fatalf("did not expect an error, instead recieved: %v", err)
442+
}
443+
444+
})
445+
}
446+
}
447+
286448
func TestClient_ListDatabases_noFilter(t *testing.T) {
287449
t.Parallel()
288450

mongo/retryable_writes_test.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@ import (
1212
"io/ioutil"
1313
"os"
1414
"path"
15-
"testing"
16-
1715
"strings"
18-
19-
"time"
20-
2116
"sync"
17+
"testing"
18+
"time"
2219

2320
"github.com/stretchr/testify/require"
2421
"go.mongodb.org/mongo-driver/bson"
@@ -29,6 +26,7 @@ import (
2926
"go.mongodb.org/mongo-driver/mongo/readpref"
3027
"go.mongodb.org/mongo-driver/mongo/writeconcern"
3128
"go.mongodb.org/mongo-driver/x/bsonx"
29+
"go.mongodb.org/mongo-driver/x/mongo/driver"
3230
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
3331
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
3432
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
@@ -147,6 +145,38 @@ func TestTxnNumberIncluded(t *testing.T) {
147145
}
148146
}
149147

148+
func TestRetryableWritesErrorOnMMAPV1(t *testing.T) {
149+
name := "test"
150+
version, err := getServerVersion(createTestDatabase(t, &name))
151+
require.NoError(t, err)
152+
153+
if shouldSkipRetryTest(t, version) {
154+
t.Skip("only run on 3.6.x and not on standalone")
155+
}
156+
157+
client := createTestClient(t)
158+
require.NoError(t, err)
159+
db := client.Database("test")
160+
defer func() { _ = db.Drop(context.Background()) }()
161+
coll := client.Database("test").Collection("test")
162+
defer func() { _ = coll.Drop(context.Background()) }()
163+
164+
res := db.RunCommand(context.Background(), bson.D{
165+
{"serverStatus", 1},
166+
})
167+
noerr(t, res.Err())
168+
169+
storageEngine, ok := res.rdr.Lookup("storageEngine", "name").StringValueOK()
170+
if !ok || storageEngine != "mmapv1" {
171+
t.Skip("only run on mmapv1")
172+
}
173+
174+
_, err = coll.InsertOne(context.Background(), bson.D{
175+
{"_id", 1},
176+
})
177+
require.Equal(t, driver.ErrUnsupportedStorageEngine, err)
178+
}
179+
150180
// test case for all RetryableWritesSpec tests
151181
func TestRetryableWritesSpec(t *testing.T) {
152182
for _, file := range testhelpers.FindJSONFilesInDir(t, retryWritesDir) {

x/mongo/driver/errors.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ var (
2828
// ErrUnacknowledgedWrite is returned from functions that have an unacknowledged
2929
// write concern.
3030
ErrUnacknowledgedWrite = errors.New("unacknowledged write")
31+
// ErrUnsupportedStorageEngine is returned when a retryable write is attempted against a server
32+
// that uses a storage engine that does not support retryable writes
33+
ErrUnsupportedStorageEngine = errors.New("this MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string")
3134
)
3235

3336
// QueryFailureError is an error representing a command failure as a document.
@@ -66,6 +69,17 @@ type WriteCommandError struct {
6669
WriteErrors WriteErrors
6770
}
6871

72+
// UnsupportedStorageEngine returns whether or not the WriteCommandError comes from a retryable write being attempted
73+
// against a server that has a storage engine where they are not supported
74+
func (wce WriteCommandError) UnsupportedStorageEngine() bool {
75+
for _, writeError := range wce.WriteErrors {
76+
if writeError.Code == 20 && strings.HasPrefix(strings.ToLower(writeError.Message), "transaction numbers") {
77+
return true
78+
}
79+
}
80+
return false
81+
}
82+
6983
func (wce WriteCommandError) Error() string {
7084
var buf bytes.Buffer
7185
fmt.Fprint(&buf, "write command error: [")
@@ -167,6 +181,11 @@ type Error struct {
167181
Name string
168182
}
169183

184+
// UnsupportedStorageEngine returns whether e came as a result of an unsupported storage engine
185+
func (e Error) UnsupportedStorageEngine() bool {
186+
return e.Code == 20 && strings.HasPrefix(strings.ToLower(e.Message), "transaction numbers")
187+
}
188+
170189
// Error implements the error interface.
171190
func (e Error) Error() string {
172191
if e.Name != "" {

x/mongo/driver/operation.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,9 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
361361
}
362362
switch tt := err.(type) {
363363
case WriteCommandError:
364+
if e := err.(WriteCommandError); retryable && op.Type == Write && e.UnsupportedStorageEngine() {
365+
return ErrUnsupportedStorageEngine
366+
}
364367
if retryable && tt.Retryable() && retries != 0 {
365368
retries--
366369
original, err = err, nil
@@ -407,6 +410,9 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
407410
if tt.HasErrorLabel(TransientTransactionError) || tt.HasErrorLabel(UnknownTransactionCommitResult) {
408411
op.Client.ClearPinnedServer()
409412
}
413+
if e := err.(Error); retryable && op.Type == Write && e.UnsupportedStorageEngine() {
414+
return ErrUnsupportedStorageEngine
415+
}
410416
if retryable && tt.Retryable() && retries != 0 {
411417
retries--
412418
original, err = err, nil
@@ -830,7 +836,7 @@ func (op Operation) addSession(dst []byte, desc description.SelectedServer) ([]b
830836
dst = bsoncore.AppendDocumentElement(dst, "lsid", lsid)
831837

832838
var addedTxnNumber bool
833-
if op.Type == Write && client != nil && client.RetryWrite {
839+
if op.Type == Write && client.RetryWrite {
834840
addedTxnNumber = true
835841
dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber)
836842
}

0 commit comments

Comments
 (0)