Skip to content

Commit 5277acc

Browse files
committed
Remove readPref requirement with connect=direct
GODRIVER-641 Change-Id: I7d65dabdeb972c814e0e107dee8f871592618591
1 parent bc25b34 commit 5277acc

File tree

3 files changed

+126
-36
lines changed

3 files changed

+126
-36
lines changed

mongo/readpref/mode.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ type Mode uint8
1616

1717
// Mode constants
1818
const (
19+
_ Mode = iota
1920
// PrimaryMode indicates that only a primary is
2021
// considered for reading. This is the default
2122
// mode.
22-
PrimaryMode Mode = iota
23+
PrimaryMode
2324
// PrimaryPreferredMode indicates that if a primary
2425
// is available, use it; otherwise, eligible
2526
// secondaries will be considered.
@@ -51,5 +52,5 @@ func ModeFromString(mode string) (Mode, error) {
5152
case "nearest":
5253
return NearestMode, nil
5354
}
54-
return Mode(uint8(0)), fmt.Errorf("unknown read preference %v", mode)
55+
return Mode(0), fmt.Errorf("unknown read preference %v", mode)
5556
}

x/network/command/command_test.go

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package command
88

99
import (
1010
"testing"
11+
"time"
1112

1213
"github.com/mongodb/mongo-go-driver/mongo/readpref"
1314
"github.com/mongodb/mongo-go-driver/x/network/description"
@@ -17,12 +18,105 @@ import (
1718
func noerr(t *testing.T, err error) {
1819
if err != nil {
1920
t.Helper()
20-
t.Errorf("Unepexted error: %v", err)
21+
t.Errorf("Unexpected error: %v", err)
2122
t.FailNow()
2223
}
2324
}
2425

26+
var connectDirectOpMsgTests = []struct {
27+
serverKind description.ServerKind
28+
suppliedReadPref *readpref.ReadPref
29+
expectedReadPrefMode readpref.Mode
30+
}{
31+
{description.RSSecondary, nil, readpref.PrimaryPreferredMode},
32+
{description.RSSecondary, readpref.Primary(), readpref.PrimaryPreferredMode},
33+
{description.RSSecondary, readpref.Secondary(), readpref.SecondaryMode},
34+
{description.Mongos, readpref.Primary(), readpref.Mode(0)},
35+
{description.Mongos, readpref.SecondaryPreferred(), readpref.SecondaryPreferredMode},
36+
{description.Mongos, readpref.SecondaryPreferred(readpref.WithTags("a", "2")), readpref.SecondaryPreferredMode},
37+
{description.Mongos, readpref.SecondaryPreferred(readpref.WithMaxStaleness(time.Duration(10))), readpref.SecondaryPreferredMode},
38+
{description.Mongos, readpref.Secondary(), readpref.SecondaryMode},
39+
}
40+
41+
var connectDirectOpQueryTests = []struct {
42+
serverKind description.ServerKind
43+
suppliedReadPref *readpref.ReadPref
44+
expectedReadPrefMode readpref.Mode
45+
expectedSlaveOkBit wiremessage.QueryFlag
46+
}{
47+
{description.Mongos, readpref.Primary(), readpref.Mode(0), 0},
48+
{description.Mongos, readpref.SecondaryPreferred(), readpref.Mode(0), wiremessage.SlaveOK},
49+
{description.Mongos, readpref.SecondaryPreferred(readpref.WithTags("a", "2")), readpref.SecondaryPreferredMode, wiremessage.SlaveOK},
50+
{description.Mongos, readpref.SecondaryPreferred(readpref.WithMaxStaleness(time.Duration(10))), readpref.SecondaryPreferredMode, wiremessage.SlaveOK},
51+
{description.Mongos, readpref.Secondary(), readpref.SecondaryMode, wiremessage.SlaveOK},
52+
}
53+
2554
func TestCommandEncode(t *testing.T) {
55+
for _, tt := range connectDirectOpMsgTests {
56+
t.Run("connect direct op_msg", func(t *testing.T) {
57+
cmd := &Read{ReadPref: tt.suppliedReadPref}
58+
wm, err := cmd.Encode(description.SelectedServer{
59+
Server: description.Server{
60+
Kind: tt.serverKind,
61+
WireVersion: &description.VersionRange{Max: 6},
62+
},
63+
Kind: description.Single,
64+
})
65+
noerr(t, err)
66+
67+
msg, ok := wm.(wiremessage.Msg)
68+
if !ok {
69+
t.Fatalf("Returned wiremessage is not a msg. got %T; want %T", wm, wiremessage.Msg{})
70+
}
71+
res, _ := msg.GetMainDocument()
72+
rp, err := res.LookupErr("$readPreference", "mode")
73+
if tt.expectedReadPrefMode == readpref.Mode(0) {
74+
if err == nil {
75+
t.Errorf("Did not expect $readPreference to be set, but it was. got %v", rp.StringValue())
76+
}
77+
} else {
78+
if err != nil {
79+
t.Fatalf("Expected $readPreference to be set, but it wasn't.")
80+
}
81+
if mode, _ := readpref.ModeFromString(rp.StringValue()); mode != tt.expectedReadPrefMode {
82+
t.Errorf("Expected $readPreference to be set to %v, but it wasn't. got %v", tt.expectedReadPrefMode, mode)
83+
}
84+
}
85+
})
86+
}
87+
88+
for _, tt := range connectDirectOpQueryTests {
89+
t.Run("connect direct op_query", func(t *testing.T) {
90+
cmd := &Read{ReadPref: tt.suppliedReadPref}
91+
wm, err := cmd.Encode(description.SelectedServer{
92+
Server: description.Server{Kind: tt.serverKind},
93+
Kind: description.Single,
94+
})
95+
noerr(t, err)
96+
97+
query, ok := wm.(wiremessage.Query)
98+
if !ok {
99+
t.Fatalf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
100+
}
101+
if query.Flags&wiremessage.SlaveOK != tt.expectedSlaveOkBit {
102+
t.Errorf("slaveOk bit did not have expected value. got %v; want %v", query.Flags, wiremessage.SlaveOK)
103+
}
104+
rp, err := query.Query.LookupErr("$readPreference", "mode")
105+
if tt.expectedReadPrefMode == readpref.Mode(0) {
106+
if err == nil {
107+
t.Errorf("Did not expect $readPreference to be set, but it was. got %v", rp.StringValue())
108+
}
109+
} else {
110+
if err != nil {
111+
t.Fatalf("Expected $readPreference to be set, but it wasn't.")
112+
}
113+
if mode, _ := readpref.ModeFromString(rp.StringValue()); mode != tt.expectedReadPrefMode {
114+
t.Errorf("Expected $readPreference to be set to %v, but it wasn't. got %v", tt.expectedReadPrefMode, mode)
115+
}
116+
}
117+
})
118+
}
119+
26120
t.Run("sets slaveOk for non-primary read preference mode", func(t *testing.T) {
27121
cmd := &Read{
28122
ReadPref: readpref.SecondaryPreferred(),
@@ -31,8 +125,7 @@ func TestCommandEncode(t *testing.T) {
31125
noerr(t, err)
32126
query, ok := wm.(wiremessage.Query)
33127
if !ok {
34-
t.Errorf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
35-
t.FailNow()
128+
t.Fatalf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
36129
}
37130
if query.Flags&wiremessage.SlaveOK != wiremessage.SlaveOK {
38131
t.Errorf("Expected the slaveOk flag to be set, but it wasn't. got %v; want %v", query.Flags, wiremessage.SlaveOK)
@@ -44,21 +137,19 @@ func TestCommandEncode(t *testing.T) {
44137
noerr(t, err)
45138
query, ok := wm.(wiremessage.Query)
46139
if !ok {
47-
t.Errorf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
48-
t.FailNow()
140+
t.Fatalf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
49141
}
50142
if query.Flags&wiremessage.SlaveOK != wiremessage.SlaveOK {
51143
t.Errorf("Expected the slaveOk flag to be set, but it wasn't. got %v; want %v", query.Flags, wiremessage.SlaveOK)
52144
}
53145
})
54-
t.Run("sets slaveOK for all read commands in direct mode", func(t *testing.T) {
146+
t.Run("sets slaveOK for all read commands to non-mongos in direct mode", func(t *testing.T) {
55147
cmd := &Read{}
56148
wm, err := cmd.Encode(description.SelectedServer{Kind: description.Single})
57149
noerr(t, err)
58150
query, ok := wm.(wiremessage.Query)
59151
if !ok {
60-
t.Errorf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
61-
t.FailNow()
152+
t.Fatalf("Returned wiremessage is not a query. got %T; want %T", wm, wiremessage.Query{})
62153
}
63154
if query.Flags&wiremessage.SlaveOK != wiremessage.SlaveOK {
64155
t.Errorf("Expected the slaveOk flag to be set, but it wasn't. got %v; want %v", query.Flags, wiremessage.SlaveOK)

x/network/command/read.go

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,33 @@ type Read struct {
3333
err error
3434
}
3535

36-
func (r *Read) createReadPref(kind description.ServerKind) bsonx.Doc {
37-
if r.ReadPref == nil {
36+
func (r *Read) createReadPref(serverKind description.ServerKind, topologyKind description.TopologyKind, isOpQuery bool) bsonx.Doc {
37+
doc := bsonx.Doc{}
38+
rp := r.ReadPref
39+
40+
if rp == nil {
41+
if topologyKind == description.Single && serverKind != description.Mongos {
42+
return append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
43+
}
3844
return nil
3945
}
4046

41-
doc := bsonx.Doc{}
42-
43-
switch r.ReadPref.Mode() {
47+
switch rp.Mode() {
4448
case readpref.PrimaryMode:
49+
if serverKind == description.Mongos {
50+
return nil
51+
}
52+
if topologyKind == description.Single {
53+
return append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
54+
}
4555
doc = append(doc, bsonx.Elem{"mode", bsonx.String("primary")})
4656
case readpref.PrimaryPreferredMode:
4757
doc = append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
4858
case readpref.SecondaryPreferredMode:
59+
_, ok := r.ReadPref.MaxStaleness()
60+
if serverKind == description.Mongos && isOpQuery && !ok && len(r.ReadPref.TagSets()) == 0 {
61+
return nil
62+
}
4963
doc = append(doc, bsonx.Elem{"mode", bsonx.String("secondaryPreferred")})
5064
case readpref.SecondaryMode:
5165
doc = append(doc, bsonx.Elem{"mode", bsonx.String("secondary")})
@@ -78,8 +92,8 @@ func (r *Read) createReadPref(kind description.ServerKind) bsonx.Doc {
7892
// addReadPref will add a read preference to the query document.
7993
//
8094
// NOTE: This method must always return either a valid bson.Reader or an error.
81-
func (r *Read) addReadPref(rp *readpref.ReadPref, kind description.ServerKind, query bson.Raw) (bson.Raw, error) {
82-
doc := r.createReadPref(kind)
95+
func (r *Read) addReadPref(rp *readpref.ReadPref, serverKind description.ServerKind, topologyKind description.TopologyKind, query bson.Raw) (bson.Raw, error) {
96+
doc := r.createReadPref(serverKind, topologyKind, true)
8397
if doc == nil {
8498
return query, nil
8599
}
@@ -102,7 +116,7 @@ func (r *Read) encodeOpMsg(desc description.SelectedServer, cmd bsonx.Doc) (wire
102116
Sections: make([]wiremessage.Section, 0),
103117
}
104118

105-
readPrefDoc := r.createReadPref(desc.Server.Kind)
119+
readPrefDoc := r.createReadPref(desc.Server.Kind, desc.Kind, false)
106120
fullDocRdr, err := opmsgAddGlobals(cmd, r.DB, readPrefDoc)
107121
if err != nil {
108122
return nil, err
@@ -136,31 +150,15 @@ func (r *Read) slaveOK(desc description.SelectedServer) wiremessage.QueryFlag {
136150
return 0
137151
}
138152

139-
// return true if a read preference needs to be added when encoding r as a OP_QUERY message
140-
func (r *Read) queryNeedsReadPref(kind description.ServerKind) bool {
141-
if kind != description.Mongos || r.ReadPref == nil {
142-
return false
143-
}
144-
145-
// simple Primary or SecondaryPreferred is communicated via slaveOk to Mongos.
146-
if r.ReadPref.Mode() == readpref.PrimaryMode || r.ReadPref.Mode() == readpref.SecondaryPreferredMode {
147-
if _, ok := r.ReadPref.MaxStaleness(); !ok && len(r.ReadPref.TagSets()) == 0 {
148-
return false
149-
}
150-
}
151-
152-
return true
153-
}
154-
155153
// Encode c as OP_QUERY
156154
func (r *Read) encodeOpQuery(desc description.SelectedServer, cmd bsonx.Doc) (wiremessage.WireMessage, error) {
157155
rdr, err := marshalCommand(cmd)
158156
if err != nil {
159157
return nil, err
160158
}
161159

162-
if r.queryNeedsReadPref(desc.Server.Kind) {
163-
rdr, err = r.addReadPref(r.ReadPref, desc.Server.Kind, rdr)
160+
if desc.Server.Kind == description.Mongos {
161+
rdr, err = r.addReadPref(r.ReadPref, desc.Server.Kind, desc.Kind, rdr)
164162
if err != nil {
165163
return nil, err
166164
}

0 commit comments

Comments
 (0)