Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ require (
go.mongodb.org/mongo-driver v1.3.5
go.uber.org/multierr v1.5.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/tools v0.1.6 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.mongodb.org/mongo-driver v1.3.5 h1:S0ZOruh4YGHjD7JoN7mIsTrNjnQbOjrmgrx6l6pZN7I=
go.mongodb.org/mongo-driver v1.3.5/go.mod h1:Ual6Gkco7ZGQw8wE1t4tLnvBsf6yVSM60qW6TgOeJ5c=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
Expand All @@ -111,26 +112,38 @@ golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkpre
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
Expand All @@ -141,7 +154,13 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.6 h1:SIasE1FVIQOWz2GEAHFOmoW7xchJcqlucjSULTL0Ag4=
golang.org/x/tools v0.1.6/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
8 changes: 5 additions & 3 deletions mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (m *Mongo) RoundTrip(msg *Message) (_ *Message, err error) {

// FIXME this assumes that cursorIDs are unique on the cluster, but two servers can have the same cursorID reference different cursors
requestCursorID, _ := msg.Op.CursorID()
server, err := m.selectServer(requestCursorID)
rpref, _ := msg.Op.ReadPref()

server, err := m.selectServer(requestCursorID, rpref)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -237,7 +239,7 @@ func (m *Mongo) RoundTrip(msg *Message) (_ *Message, err error) {
}, nil
}

func (m *Mongo) selectServer(requestCursorID int64) (server driver.Server, err error) {
func (m *Mongo) selectServer(requestCursorID int64, rpref *readpref.ReadPref) (server driver.Server, err error) {
defer func(start time.Time) {
_ = m.statsd.Timing("server_selection", time.Since(start), []string{fmt.Sprintf("success:%v", err == nil)}, 1)
}(time.Now())
Expand All @@ -250,7 +252,7 @@ func (m *Mongo) selectServer(requestCursorID int64) (server driver.Server, err e
}

selector := description.CompositeSelector([]description.ServerSelector{
description.ReadPrefSelector(readpref.Primary()), // ignored by sharded clusters
description.ReadPrefSelector(rpref), // ignored by sharded clusters
description.LatencySelector(15 * time.Millisecond), // default localThreshold for the client
})
return m.topology.SelectServer(m.roundTripCtx, selector)
Expand Down
50 changes: 48 additions & 2 deletions mongo/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/x/mongo/driver"

"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
)

Expand All @@ -24,6 +24,7 @@ type Operation interface {
RequestID() int32
Error() error
Unacknowledged() bool
ReadPref() (rpref *readpref.ReadPref, ok bool)
}

// see https://github.com/mongodb/mongo-go-driver/blob/v1.3.4/x/mongo/driver/operation.go#L1165-L1230
Expand Down Expand Up @@ -102,6 +103,10 @@ func (o *opUnknown) Unacknowledged() bool {
return false
}

func (o *opUnknown) ReadPref() (rp *readpref.ReadPref, ok bool) {
return readpref.Primary(), false
}

// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#wire-op-query
type opQuery struct {
reqID int32
Expand Down Expand Up @@ -201,6 +206,10 @@ func (q *opQuery) Unacknowledged() bool {
return false
}

func (q *opQuery) ReadPref() (rp *readpref.ReadPref, ok bool) {
return readpref.Primary(), false
}

// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-msg
type opMsg struct {
reqID int32
Expand All @@ -213,6 +222,7 @@ type opMsgSection interface {
cursorID() (cursorID int64, ok bool)
isIsMaster() bool
append(buffer []byte) []byte
ReadPref() (rpref *readpref.ReadPref, ok bool)
}

type opMsgSectionSingle struct {
Expand All @@ -226,6 +236,21 @@ func (o *opMsgSectionSingle) cursorID() (cursorID int64, ok bool) {
return o.msg.Lookup("cursor", "id").Int64OK()
}

func (o *opMsgSectionSingle) ReadPref() (rpref *readpref.ReadPref, ok bool) {
if prefDoc, ok := o.msg.Lookup("$readPreference").DocumentOK(); ok {
if prefStr, ok := prefDoc.Lookup("mode").StringValueOK(); ok {
// Note: only the mode is unpacked currently
mode, err := readpref.ModeFromString(prefStr)
if err == nil {
rpref, _ := readpref.New(mode)
return rpref, true
}
}
}

return readpref.Primary(), false
}

func (o *opMsgSectionSingle) isIsMaster() bool {
if db, ok := o.msg.Lookup("$db").StringValueOK(); ok && db == "admin" {
ismaster, _ := o.msg.Lookup("ismaster").Int32OK()
Expand Down Expand Up @@ -271,6 +296,10 @@ func (o *opMsgSectionSequence) append(buffer []byte) []byte {
return buffer
}

func (o *opMsgSectionSequence) ReadPref() (rpref *readpref.ReadPref, ok bool) {
return readpref.Primary(), false
}

// see https://github.com/mongodb/mongo-go-driver/blob/v1.3.4/x/mongo/driver/operation.go#L1191-L1220
func decodeMsg(reqID int32, wm []byte) (*opMsg, error) {
var ok bool
Expand Down Expand Up @@ -381,6 +410,15 @@ func (m *opMsg) Unacknowledged() bool {
return m.flags&wiremessage.MoreToCome == wiremessage.MoreToCome
}

func (m *opMsg) ReadPref() (rp *readpref.ReadPref, ok bool) {
for _, section := range m.sections {
if rpref, ok := section.ReadPref(); ok {
return rpref, ok
}
}
return readpref.Primary(), false
}

// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-reply
type opReply struct {
reqID int32
Expand Down Expand Up @@ -468,6 +506,10 @@ func (r *opReply) Unacknowledged() bool {
return false
}

func (r *opReply) ReadPref() (rp *readpref.ReadPref, ok bool) {
return readpref.Primary(), false
}

// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-get-more
type opGetMore struct {
reqID int32
Expand Down Expand Up @@ -544,6 +586,10 @@ func (g *opGetMore) Unacknowledged() bool {
return false
}

func (g *opGetMore) ReadPref() (rp *readpref.ReadPref, ok bool) {
return readpref.Primary(), false
}

func appendi32(dst []byte, i32 int32) []byte {
return append(dst, byte(i32), byte(i32>>8), byte(i32>>16), byte(i32>>24))
}
Expand Down