Skip to content

Commit d92bcdb

Browse files
committed
Add new ServerSelector for PinnedServers
GODRIVER-1130 Change-Id: If6ec2e5137eff02ee8ee53b4fe7cdd676e4a26d2
1 parent 3ae303d commit d92bcdb

File tree

4 files changed

+27
-52
lines changed

4 files changed

+27
-52
lines changed

mongo/client.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -454,13 +454,10 @@ func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...
454454
return ListDatabasesResult{}, err
455455
}
456456

457-
selector := description.CompositeSelector([]description.ServerSelector{
457+
selector := makePinnedSelector(sess, description.CompositeSelector([]description.ServerSelector{
458458
description.ReadPrefSelector(readpref.Primary()),
459459
description.LatencySelector(c.localThreshold),
460-
})
461-
if sess != nil && sess.PinnedServer != nil {
462-
selector = sess.PinnedServer
463-
}
460+
}))
464461

465462
ldo := options.MergeListDatabasesOptions(opts...)
466463
op := operation.NewListDatabases(filterDoc).

mongo/collection.go

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -267,10 +267,7 @@ func (coll *Collection) insert(ctx context.Context, documents []interface{},
267267
sess = nil
268268
}
269269

270-
selector := coll.writeSelector
271-
if sess != nil && sess.PinnedServer != nil {
272-
selector = sess.PinnedServer
273-
}
270+
selector := makePinnedSelector(sess, coll.writeSelector)
274271

275272
op := operation.NewInsert(docs...).
276273
Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
@@ -386,10 +383,7 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn
386383
sess = nil
387384
}
388385

389-
selector := coll.writeSelector
390-
if sess != nil && sess.PinnedServer != nil {
391-
selector = sess.PinnedServer
392-
}
386+
selector := makePinnedSelector(sess, coll.writeSelector)
393387

394388
var limit int32
395389
if deleteOne {
@@ -490,10 +484,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter, update bson
490484
sess = nil
491485
}
492486

493-
selector := coll.writeSelector
494-
if sess != nil && sess.PinnedServer != nil {
495-
selector = sess.PinnedServer
496-
}
487+
selector := makePinnedSelector(sess, coll.writeSelector)
497488

498489
op := operation.NewUpdate(updateDoc).
499490
Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
@@ -672,13 +663,11 @@ func aggregate(a aggregateParams) (*Cursor, error) {
672663
sess = nil
673664
}
674665

675-
selector := a.readSelector
666+
defaultSelector := a.readSelector
676667
if hasDollarOut {
677-
selector = a.writeSelector
678-
}
679-
if sess != nil && sess.PinnedServer != nil {
680-
selector = sess.PinnedServer
668+
defaultSelector = a.writeSelector
681669
}
670+
selector := makePinnedSelector(sess, defaultSelector)
682671

683672
ao := options.MergeAggregateOptions(a.opts...)
684673
cursorOpts := driver.CursorOptions{
@@ -870,10 +859,7 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
870859
rc = nil
871860
}
872861

873-
selector := coll.readSelector
874-
if sess != nil && sess.PinnedServer != nil {
875-
selector = sess.PinnedServer
876-
}
862+
selector := makePinnedSelector(sess, coll.readSelector)
877863

878864
option := options.MergeDistinctOptions(opts...)
879865

@@ -951,10 +937,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
951937
rc = nil
952938
}
953939

954-
selector := coll.writeSelector
955-
if sess != nil && sess.PinnedServer != nil {
956-
selector = sess.PinnedServer
957-
}
940+
selector := makePinnedSelector(sess, coll.writeSelector)
958941

959942
op := operation.NewFind(f).
960943
Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
@@ -1142,10 +1125,7 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd
11421125
sess = nil
11431126
}
11441127

1145-
selector := coll.writeSelector
1146-
if sess != nil && sess.PinnedServer != nil {
1147-
selector = sess.PinnedServer
1148-
}
1128+
selector := makePinnedSelector(sess, coll.writeSelector)
11491129

11501130
retry := driver.RetryNone
11511131
if coll.client.retryWrites {
@@ -1378,10 +1358,7 @@ func (coll *Collection) Drop(ctx context.Context) error {
13781358
sess = nil
13791359
}
13801360

1381-
selector := coll.writeSelector
1382-
if sess != nil && sess.PinnedServer != nil {
1383-
selector = sess.PinnedServer
1384-
}
1361+
selector := makePinnedSelector(sess, coll.writeSelector)
13851362

13861363
op := operation.NewDropCollection().
13871364
Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
@@ -1397,3 +1374,15 @@ func (coll *Collection) Drop(ctx context.Context) error {
13971374
}
13981375
return nil
13991376
}
1377+
1378+
// makePinnedSelector makes a selector for a pinned session with a pinned server. Will attempt to do server selection on
1379+
// the pinned server but if that fails it will go through a list of default selectors
1380+
func makePinnedSelector(sess *session.Client, defaultSelector description.ServerSelector) description.ServerSelectorFunc {
1381+
return func(t description.Topology, svrs []description.Server) ([]description.Server, error) {
1382+
if sess != nil && sess.PinnedServer != nil {
1383+
return sess.PinnedServer.SelectServer(t, svrs)
1384+
}
1385+
1386+
return defaultSelector.SelectServer(t, svrs)
1387+
}
1388+
}

mongo/database.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,7 @@ func (db *Database) Drop(ctx context.Context) error {
231231
sess = nil
232232
}
233233

234-
selector := db.writeSelector
235-
if sess != nil && sess.PinnedServer != nil {
236-
selector = sess.PinnedServer
237-
}
234+
selector := makePinnedSelector(sess, db.writeSelector)
238235

239236
op := operation.NewDropDatabase().
240237
Session(sess).WriteConcern(wc).CommandMonitor(db.client.monitor).
@@ -275,10 +272,7 @@ func (db *Database) ListCollections(ctx context.Context, filter interface{}, opt
275272
return nil, err
276273
}
277274

278-
selector := db.readSelector
279-
if sess != nil && sess.PinnedServer != nil {
280-
selector = sess.PinnedServer
281-
}
275+
selector := makePinnedSelector(sess, db.readSelector)
282276

283277
lco := options.MergeListCollectionsOptions(opts...)
284278
op := operation.NewListCollections(filterDoc).

mongo/session.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,7 @@ func (s *sessionImpl) CommitTransaction(ctx context.Context) error {
217217
s.clientSession.RetryingCommit = true
218218
}
219219

220-
var selector description.ServerSelectorFunc = func(t description.Topology, svrs []description.Server) ([]description.Server, error) {
221-
if s.clientSession.PinnedServer != nil {
222-
return s.clientSession.PinnedServer.SelectServer(t, svrs)
223-
}
224-
return description.WriteSelector().SelectServer(t, svrs)
225-
}
220+
selector := makePinnedSelector(s.clientSession, description.WriteSelector())
226221

227222
s.clientSession.Committing = true
228223
err = operation.NewCommitTransaction().

0 commit comments

Comments
 (0)