Skip to content

Commit e2623ca

Browse files
author
Divjot Arora
authored
GODRIVER-1578 Set server to Unknown for set name mismatches in direct connections (#368)
1 parent 90fdeba commit e2623ca

File tree

8 files changed

+187
-40
lines changed

8 files changed

+187
-40
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
{
2+
"description": "Direct connection to RSPrimary with wrong set name",
3+
"uri": "mongodb://a/?directConnection=true&replicaSet=rs",
4+
"phases": [
5+
{
6+
"responses": [
7+
[
8+
"a:27017",
9+
{
10+
"ok": 1,
11+
"ismaster": true,
12+
"hosts": [
13+
"a:27017",
14+
"b:27017"
15+
],
16+
"setName": "wrong",
17+
"minWireVersion": 0,
18+
"maxWireVersion": 6
19+
}
20+
]
21+
],
22+
"outcome": {
23+
"servers": {
24+
"a:27017": {
25+
"type": "Unknown"
26+
}
27+
},
28+
"topologyType": "Single",
29+
"logicalSessionTimeoutMinutes": null,
30+
"setName": "rs"
31+
}
32+
},
33+
{
34+
"responses": [
35+
[
36+
"a:27017",
37+
{
38+
"ok": 1,
39+
"ismaster": true,
40+
"hosts": [
41+
"a:27017",
42+
"b:27017"
43+
],
44+
"setName": "rs",
45+
"minWireVersion": 0,
46+
"maxWireVersion": 6
47+
}
48+
]
49+
],
50+
"outcome": {
51+
"servers": {
52+
"a:27017": {
53+
"type": "RSPrimary",
54+
"setName": "rs"
55+
}
56+
},
57+
"topologyType": "Single",
58+
"logicalSessionTimeoutMinutes": null,
59+
"setName": "rs"
60+
}
61+
}
62+
]
63+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
description: Direct connection to RSPrimary with wrong set name
2+
uri: mongodb://a/?directConnection=true&replicaSet=rs
3+
phases:
4+
- responses:
5+
- - a:27017
6+
- ok: 1
7+
ismaster: true
8+
hosts:
9+
- a:27017
10+
- b:27017
11+
setName: wrong
12+
minWireVersion: 0
13+
maxWireVersion: 6
14+
outcome:
15+
servers:
16+
a:27017:
17+
type: Unknown
18+
topologyType: Single
19+
logicalSessionTimeoutMinutes:
20+
setName: rs
21+
- responses:
22+
- - a:27017
23+
- ok: 1
24+
ismaster: true
25+
hosts:
26+
- a:27017
27+
- b:27017
28+
setName: rs
29+
minWireVersion: 0
30+
maxWireVersion: 6
31+
outcome:
32+
servers:
33+
a:27017:
34+
type: RSPrimary
35+
setName: rs
36+
topologyType: Single
37+
logicalSessionTimeoutMinutes:
38+
setName: rs

x/mongo/driver/topology/fsm.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@ func newFSM() *fsm {
2929
return new(fsm)
3030
}
3131

32-
// apply should operate on immutable TopologyDescriptions and Descriptions. This way we don't have to
33-
// lock for the entire time we're applying server description.
34-
func (f *fsm) apply(s description.Server) (description.Topology, error) {
32+
// apply takes a new server description and modifies the FSM's topology description based on it. It returns the
33+
// updated topology description as well as a server description. The returned server description is either the same
34+
// one that was passed in, or a new one in the case that it had to be changed.
35+
//
36+
// apply should operation on immutable descriptions so we don't have to lock for the entire time we're applying the
37+
// server description.
38+
func (f *fsm) apply(s description.Server) (description.Topology, description.Server, error) {
3539

3640
newServers := make([]description.Server, len(f.Servers))
3741
copy(newServers, f.Servers)
@@ -62,12 +66,12 @@ func (f *fsm) apply(s description.Server) (description.Topology, error) {
6266
}
6367

6468
if _, ok := f.findServer(s.Addr); !ok {
65-
return f.Topology, nil
69+
return f.Topology, s, nil
6670
}
6771

6872
if s.WireVersion != nil {
6973
if s.WireVersion.Max < supportedWireVersions.Min {
70-
return description.Topology{}, fmt.Errorf(
74+
return description.Topology{}, s, fmt.Errorf(
7175
"server at %s reports wire version %d, but this version of the Go driver requires "+
7276
"at least %d (MongoDB %s)",
7377
s.Addr.String(),
@@ -78,7 +82,7 @@ func (f *fsm) apply(s description.Server) (description.Topology, error) {
7882
}
7983

8084
if s.WireVersion.Min > supportedWireVersions.Max {
81-
return description.Topology{}, fmt.Errorf(
85+
return description.Topology{}, s, fmt.Errorf(
8286
"server at %s requires wire version %d, but this version of the Go driver only "+
8387
"supports up to %d",
8488
s.Addr.String(),
@@ -88,23 +92,24 @@ func (f *fsm) apply(s description.Server) (description.Topology, error) {
8892
}
8993
}
9094

95+
updatedDesc := s
9196
switch f.Kind {
9297
case description.Unknown:
93-
f.applyToUnknown(s)
98+
updatedDesc = f.applyToUnknown(s)
9499
case description.Sharded:
95-
f.applyToSharded(s)
100+
updatedDesc = f.applyToSharded(s)
96101
case description.ReplicaSetNoPrimary:
97-
f.applyToReplicaSetNoPrimary(s)
102+
updatedDesc = f.applyToReplicaSetNoPrimary(s)
98103
case description.ReplicaSetWithPrimary:
99-
f.applyToReplicaSetWithPrimary(s)
104+
updatedDesc = f.applyToReplicaSetWithPrimary(s)
100105
case description.Single:
101-
f.applyToSingle(s)
106+
updatedDesc = f.applyToSingle(s)
102107
}
103108

104-
return f.Topology, nil
109+
return f.Topology, updatedDesc, nil
105110
}
106111

107-
func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) {
112+
func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) description.Server {
108113
switch s.Kind {
109114
case description.Standalone, description.Mongos:
110115
f.removeServerByAddr(s.Addr)
@@ -115,9 +120,11 @@ func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) {
115120
case description.Unknown, description.RSGhost:
116121
f.replaceServer(s)
117122
}
123+
124+
return s
118125
}
119126

120-
func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) {
127+
func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) description.Server {
121128
switch s.Kind {
122129
case description.Standalone, description.Mongos:
123130
f.removeServerByAddr(s.Addr)
@@ -130,39 +137,53 @@ func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) {
130137
f.replaceServer(s)
131138
f.checkIfHasPrimary()
132139
}
140+
141+
return s
133142
}
134143

135-
func (f *fsm) applyToSharded(s description.Server) {
144+
func (f *fsm) applyToSharded(s description.Server) description.Server {
136145
switch s.Kind {
137146
case description.Mongos, description.Unknown:
138147
f.replaceServer(s)
139148
case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
140149
f.removeServerByAddr(s.Addr)
141150
}
151+
152+
return s
142153
}
143154

144-
func (f *fsm) applyToSingle(s description.Server) {
155+
func (f *fsm) applyToSingle(s description.Server) description.Server {
145156
switch s.Kind {
146157
case description.Unknown:
147158
f.replaceServer(s)
148159
case description.Standalone, description.Mongos:
149160
if f.SetName != "" {
150161
f.removeServerByAddr(s.Addr)
151-
return
162+
return s
152163
}
153164

154165
f.replaceServer(s)
155166
case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
167+
// A replica set name can be provided when creating a direct connection. In this case, if the set name returned
168+
// by the isMaster response doesn't match up with the one provided during configuration, the server description
169+
// is replaced with a default Unknown description.
170+
//
171+
// We create a new server description rather than doing s.Kind = description.Unknown because the other fields,
172+
// such as RTT, need to be cleared for Unknown descriptions as well.
156173
if f.SetName != "" && f.SetName != s.SetName {
157-
f.removeServerByAddr(s.Addr)
158-
return
174+
s = description.Server{
175+
Addr: s.Addr,
176+
Kind: description.Unknown,
177+
}
159178
}
160179

161180
f.replaceServer(s)
162181
}
182+
183+
return s
163184
}
164185

165-
func (f *fsm) applyToUnknown(s description.Server) {
186+
func (f *fsm) applyToUnknown(s description.Server) description.Server {
166187
switch s.Kind {
167188
case description.Mongos:
168189
f.setKind(description.Sharded)
@@ -177,6 +198,8 @@ func (f *fsm) applyToUnknown(s description.Server) {
177198
case description.Unknown, description.RSGhost:
178199
f.replaceServer(s)
179200
}
201+
202+
return s
180203
}
181204

182205
func (f *fsm) checkIfHasPrimary() {

x/mongo/driver/topology/sdam_spec_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ func setUpFSM(t *testing.T, uri string) *fsm {
119119
fsm.Kind = description.Single
120120
}
121121

122+
// GODRIVER-1578: The new test requires support for directConnection, which will be added in follow-up ticket
123+
// GODRIVER-1486. In this case, look for "directconnection" in the parsed URI's set of unrecognized options and
124+
// emulate direct connection behavior.
125+
if dcValues, ok := cs.UnknownOptions["directconnection"]; ok && dcValues[0] == "true" {
126+
fsm.Kind = description.Single
127+
}
128+
122129
for _, host := range cs.Hosts {
123130
fsm.Servers = append(fsm.Servers, description.Server{Addr: address.Address(host).Canonicalize()})
124131
}
@@ -133,7 +140,7 @@ func applyResponses(f *fsm, responses []response) error {
133140
return err
134141
}
135142
server := description.NewServer(address.Address(response.Host), bsoncore.Document(doc))
136-
_, err = f.apply(server)
143+
_, _, err = f.apply(server)
137144

138145
if err != nil {
139146
return err

x/mongo/driver/topology/server.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,14 @@ type Server struct {
107107
subscriptionsClosed bool
108108
}
109109

110+
// updateTopologyCallback is a callback used to create a server that should be called when the parent Topology instance
111+
// should be updated based on a new server description. The callback must return the server description that should be
112+
// stored by the server.
113+
type updateTopologyCallback func(description.Server) description.Server
114+
110115
// ConnectServer creates a new Server and then initializes it using the
111116
// Connect method.
112-
func ConnectServer(addr address.Address, updateCallback func(description.Server), opts ...ServerOption) (*Server, error) {
117+
func ConnectServer(addr address.Address, updateCallback updateTopologyCallback, opts ...ServerOption) (*Server, error) {
113118
srvr, err := NewServer(addr, opts...)
114119
if err != nil {
115120
return nil, err
@@ -166,7 +171,7 @@ func NewServer(addr address.Address, opts ...ServerOption) (*Server, error) {
166171

167172
// Connect initializes the Server by starting background monitoring goroutines.
168173
// This method must be called before a Server can be used.
169-
func (s *Server) Connect(updateCallback func(description.Server)) error {
174+
func (s *Server) Connect(updateCallback updateTopologyCallback) error {
170175
if !atomic.CompareAndSwapInt32(&s.connectionstate, disconnected, connected) {
171176
return ErrServerConnected
172177
}
@@ -191,7 +196,7 @@ func (s *Server) Disconnect(ctx context.Context) error {
191196
return ErrServerClosed
192197
}
193198

194-
s.updateTopologyCallback.Store((func(description.Server))(nil))
199+
s.updateTopologyCallback.Store((updateTopologyCallback)(nil))
195200

196201
// For every call to Connect there must be at least 1 goroutine that is
197202
// waiting on the done channel.
@@ -446,12 +451,13 @@ func (s *Server) updateDescription(desc description.Server, initial bool) {
446451
// ¯\_(ツ)_/¯
447452
_ = recover()
448453
}()
449-
s.desc.Store(desc)
450454

451-
callback, ok := s.updateTopologyCallback.Load().(func(description.Server))
455+
// Use the updateTopologyCallback to update the parent Topology and get the description that should be stored.
456+
callback, ok := s.updateTopologyCallback.Load().(updateTopologyCallback)
452457
if ok && callback != nil {
453-
callback(desc)
458+
desc = callback(desc)
454459
}
460+
s.desc.Store(desc)
455461

456462
s.subLock.Lock()
457463
for _, c := range s.subscribers {

x/mongo/driver/topology/server_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,12 @@ func TestServer(t *testing.T) {
243243
t.Run("update topology", func(t *testing.T) {
244244
var updated atomic.Value // bool
245245
updated.Store(false)
246-
s, err := ConnectServer(address.Address("localhost"), func(description.Server) { updated.Store(true) })
246+
247+
updateCallback := func(desc description.Server) description.Server {
248+
updated.Store(true)
249+
return desc
250+
}
251+
s, err := ConnectServer(address.Address("localhost"), updateCallback)
247252
require.NoError(t, err)
248253
s.updateDescription(description.Server{Addr: s.address}, false)
249254
require.True(t, updated.Load().(bool))

0 commit comments

Comments
 (0)