Skip to content

Commit 08f9f9e

Browse files
author
Divjot Arora
authored
GODRIVER-1404 Remove topology subscription initial server selection (#221)
1 parent e470a63 commit 08f9f9e

File tree

2 files changed

+122
-30
lines changed

2 files changed

+122
-30
lines changed

x/mongo/driver/topology/topology.go

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ type Topology struct {
9191
var _ driver.Deployment = &Topology{}
9292
var _ driver.Subscriber = &Topology{}
9393

94+
type serverSelectionState struct {
95+
selector description.ServerSelector
96+
timeoutChan <-chan time.Time
97+
}
98+
99+
func newServerSelectionState(selector description.ServerSelector, timeoutChan <-chan time.Time) serverSelectionState {
100+
return serverSelectionState{
101+
selector: selector,
102+
timeoutChan: timeoutChan,
103+
}
104+
}
105+
94106
// New creates a new topology.
95107
func New(opts ...Option) (*Topology, error) {
96108
cfg, err := newConfig(opts...)
@@ -295,16 +307,39 @@ func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelect
295307
defer ssTimeout.Stop()
296308
}
297309

298-
sub, err := t.Subscribe()
299-
if err != nil {
300-
return nil, err
301-
}
302-
defer t.Unsubscribe(sub)
303-
310+
var doneOnce bool
311+
var sub *driver.Subscription
312+
selectionState := newServerSelectionState(ss, ssTimeoutCh)
304313
for {
305-
suitable, err := t.selectServer(ctx, sub.Updates, ss, ssTimeoutCh)
306-
if err != nil {
307-
return nil, err
314+
var suitable []description.Server
315+
var selectErr error
316+
317+
if !doneOnce {
318+
// for the first pass, select a server from the current description.
319+
// this improves selection speed for up-to-date topology descriptions.
320+
suitable, selectErr = t.selectServerFromDescription(ctx, t.Description(), selectionState)
321+
doneOnce = true
322+
} else {
323+
// if the first pass didn't select a server, the previous description did not contain a suitable server, so
324+
// we subscribe to the topology and attempt to obtain a server from that subscription
325+
if sub == nil {
326+
var err error
327+
sub, err = t.Subscribe()
328+
if err != nil {
329+
return nil, err
330+
}
331+
defer t.Unsubscribe(sub)
332+
}
333+
334+
suitable, selectErr = t.selectServerFromSubscription(ctx, sub.Updates, selectionState)
335+
}
336+
if selectErr != nil {
337+
return nil, selectErr
338+
}
339+
340+
if len(suitable) == 0 {
341+
// try again if there are no servers available
342+
continue
308343
}
309344

310345
selected := suitable[rand.Intn(len(suitable))]
@@ -344,8 +379,9 @@ func (t *Topology) SelectServerLegacy(ctx context.Context, ss description.Server
344379
}
345380
defer t.Unsubscribe(sub)
346381

382+
selectionState := newServerSelectionState(ss, ssTimeoutCh)
347383
for {
348-
suitable, err := t.selectServer(ctx, sub.Updates, ss, ssTimeoutCh)
384+
suitable, err := t.selectServerFromSubscription(ctx, sub.Updates, selectionState)
349385
if err != nil {
350386
return nil, err
351387
}
@@ -390,39 +426,60 @@ func wrapServerSelectionError(err error, t *Topology) error {
390426
return fmt.Errorf("server selection error: %v, current topology: { %s }", err, t.String())
391427
}
392428

393-
// selectServer is the core piece of server selection. It handles getting
394-
// topology descriptions and running sever selection on those descriptions.
395-
func (t *Topology) selectServer(ctx context.Context, subscriptionCh <-chan description.Topology, ss description.ServerSelector, timeoutCh <-chan time.Time) ([]description.Server, error) {
429+
// selectServerFromSubscription loops until a topology description is available for server selection. It returns
430+
// when the given context expires, server selection timeout is reached, or a description containing a selectable
431+
// server is available.
432+
func (t *Topology) selectServerFromSubscription(ctx context.Context, subscriptionCh <-chan description.Topology,
433+
selectionState serverSelectionState) ([]description.Server, error) {
434+
396435
var current description.Topology
397436
for {
398437
select {
399438
case <-ctx.Done():
400439
return nil, ctx.Err()
401-
case <-timeoutCh:
440+
case <-selectionState.timeoutChan:
402441
return nil, wrapServerSelectionError(ErrServerSelectionTimeout, t)
403442
case current = <-subscriptionCh:
404443
}
405444

406-
var allowed []description.Server
407-
for _, s := range current.Servers {
408-
if s.Kind != description.Unknown {
409-
allowed = append(allowed, s)
410-
}
411-
}
412-
413-
suitable, err := ss.SelectServer(current, allowed)
445+
suitable, err := t.selectServerFromDescription(ctx, current, selectionState)
414446
if err != nil {
415-
return nil, wrapServerSelectionError(err, t)
447+
return nil, err
416448
}
417449

418450
if len(suitable) > 0 {
419451
return suitable, nil
420452
}
421-
422453
t.RequestImmediateCheck()
423454
}
424455
}
425456

457+
// selectServerFromDescription process the given topology description and returns a slice of suitable servers.
458+
func (t *Topology) selectServerFromDescription(ctx context.Context, desc description.Topology,
459+
selectionState serverSelectionState) ([]description.Server, error) {
460+
461+
select {
462+
case <-ctx.Done():
463+
return nil, ctx.Err()
464+
case <-selectionState.timeoutChan:
465+
return nil, wrapServerSelectionError(ErrServerSelectionTimeout, t)
466+
default:
467+
}
468+
469+
var allowed []description.Server
470+
for _, s := range desc.Servers {
471+
if s.Kind != description.Unknown {
472+
allowed = append(allowed, s)
473+
}
474+
}
475+
476+
suitable, err := selectionState.selector.SelectServer(desc, allowed)
477+
if err != nil {
478+
return nil, wrapServerSelectionError(err, t)
479+
}
480+
return suitable, nil
481+
}
482+
426483
func (t *Topology) pollSRVRecords() {
427484
defer t.pollingwg.Done()
428485

x/mongo/driver/topology/topology_test.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"testing"
1414
"time"
1515

16+
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1617
"go.mongodb.org/mongo-driver/x/mongo/driver"
1718
"go.mongodb.org/mongo-driver/x/mongo/driver/address"
1819
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
@@ -72,7 +73,9 @@ func TestServerSelection(t *testing.T) {
7273
}
7374
subCh := make(chan description.Topology, 1)
7475
subCh <- desc
75-
srvs, err := topo.selectServer(context.Background(), subCh, selectFirst, nil)
76+
77+
state := newServerSelectionState(selectFirst, nil)
78+
srvs, err := topo.selectServerFromSubscription(context.Background(), subCh, state)
7679
noerr(t, err)
7780
if len(srvs) != 1 {
7881
t.Errorf("Incorrect number of descriptions returned. got %d; want %d", len(srvs), 1)
@@ -90,7 +93,8 @@ func TestServerSelection(t *testing.T) {
9093

9194
resp := make(chan []description.Server)
9295
go func() {
93-
srvs, err := topo.selectServer(context.Background(), subCh, selectFirst, nil)
96+
state := newServerSelectionState(selectFirst, nil)
97+
srvs, err := topo.selectServerFromSubscription(context.Background(), subCh, state)
9498
noerr(t, err)
9599
resp <- srvs
96100
}()
@@ -137,7 +141,8 @@ func TestServerSelection(t *testing.T) {
137141
resp := make(chan error)
138142
ctx, cancel := context.WithCancel(context.Background())
139143
go func() {
140-
_, err := topo.selectServer(ctx, subCh, selectNone, nil)
144+
state := newServerSelectionState(selectNone, nil)
145+
_, err := topo.selectServerFromSubscription(ctx, subCh, state)
141146
resp <- err
142147
}()
143148

@@ -174,7 +179,8 @@ func TestServerSelection(t *testing.T) {
174179
resp := make(chan error)
175180
timeout := make(chan time.Time)
176181
go func() {
177-
_, err := topo.selectServer(context.Background(), subCh, selectNone, timeout)
182+
state := newServerSelectionState(selectNone, timeout)
183+
_, err := topo.selectServerFromSubscription(context.Background(), subCh, state)
178184
resp <- err
179185
}()
180186

@@ -209,7 +215,8 @@ func TestServerSelection(t *testing.T) {
209215
resp := make(chan error)
210216
timeout := make(chan time.Time)
211217
go func() {
212-
_, err := topo.selectServer(context.Background(), subCh, selectError, timeout)
218+
state := newServerSelectionState(selectError, timeout)
219+
_, err := topo.selectServerFromSubscription(context.Background(), subCh, state)
213220
resp <- err
214221
}()
215222

@@ -288,7 +295,8 @@ func TestServerSelection(t *testing.T) {
288295

289296
go func() {
290297
// server selection should discover the new topology
291-
srvs, err := topo.selectServer(context.Background(), subCh, description.WriteSelector(), nil)
298+
state := newServerSelectionState(description.WriteSelector(), nil)
299+
srvs, err := topo.selectServerFromSubscription(context.Background(), subCh, state)
292300
noerr(t, err)
293301
resp <- srvs
294302
}()
@@ -307,6 +315,33 @@ func TestServerSelection(t *testing.T) {
307315
t.Errorf("Incorrect sever selected. got %s; want %s", srvs[0].Addr, desc.Servers[1].Addr)
308316
}
309317
})
318+
t.Run("no subscription in fast path", func(t *testing.T) {
319+
// assert that a subscription is not created if there is a server available
320+
topo, err := New()
321+
noerr(t, err)
322+
topo.cfg.cs.HeartbeatInterval = time.Minute
323+
atomic.StoreInt32(&topo.connectionstate, connected)
324+
325+
primaryAddr := address.Address("one")
326+
desc := description.Topology{
327+
Servers: []description.Server{
328+
{Addr: primaryAddr, Kind: description.RSPrimary},
329+
},
330+
}
331+
topo.desc.Store(desc)
332+
for _, srv := range desc.Servers {
333+
s, err := ConnectServer(srv.Addr, func(desc description.Server) { topo.apply(context.Background(), desc) })
334+
noerr(t, err)
335+
topo.servers[srv.Addr] = s
336+
}
337+
338+
// manually close subscriptions so calls to Subscribe will error
339+
topo.subscriptionsClosed = true
340+
selectedServer, err := topo.SelectServer(context.Background(), description.WriteSelector())
341+
noerr(t, err)
342+
selectedAddr := selectedServer.(*SelectedServer).address
343+
assert.Equal(t, primaryAddr, selectedAddr, "expected address %v, got %v", primaryAddr, selectedAddr)
344+
})
310345
}
311346

312347
func TestSessionTimeout(t *testing.T) {

0 commit comments

Comments
 (0)