Skip to content

Commit 7f54217

Browse files
Divjot Aroraiwysiu
authored andcommitted
Transactions bug fixes
- Ignore all command errors from AbortTransaction - Use transaction's read preference for server selection GODRIVER-1245 Change-Id: I9f62cec3fac07628586187504e33bcfe5679db64
1 parent 1e3091f commit 7f54217

File tree

5 files changed

+33
-27
lines changed

5 files changed

+33
-27
lines changed

mongo/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,10 +498,11 @@ func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...
498498
return ListDatabasesResult{}, err
499499
}
500500

501-
selector := makePinnedSelector(sess, description.CompositeSelector([]description.ServerSelector{
501+
selector := description.CompositeSelector([]description.ServerSelector{
502502
description.ReadPrefSelector(readpref.Primary()),
503503
description.LatencySelector(c.localThreshold),
504-
}))
504+
})
505+
selector = makeReadPrefSelector(sess, selector, c.localThreshold)
505506

506507
ldo := options.MergeListDatabasesOptions(opts...)
507508
op := operation.NewListDatabases(filterDoc).

mongo/collection.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -654,11 +654,10 @@ func aggregate(a aggregateParams) (*Cursor, error) {
654654
sess = nil
655655
}
656656

657-
defaultSelector := a.readSelector
658-
if hasOutputStage {
659-
defaultSelector = a.writeSelector
657+
selector := makePinnedSelector(sess, a.writeSelector)
658+
if !hasOutputStage {
659+
selector = makeReadPrefSelector(sess, a.readSelector, a.client.localThreshold)
660660
}
661-
selector := makePinnedSelector(sess, defaultSelector)
662661

663662
ao := options.MergeAggregateOptions(a.opts...)
664663
cursorOpts := driver.CursorOptions{
@@ -756,8 +755,7 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
756755
rc = nil
757756
}
758757

759-
selector := makePinnedSelector(sess, coll.readSelector)
760-
758+
selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
761759
op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
762760
CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name).
763761
Collection(coll.name).Deployment(coll.client.topology)
@@ -832,8 +830,7 @@ func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
832830
rc = nil
833831
}
834832

835-
selector := makePinnedSelector(sess, coll.readSelector)
836-
833+
selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
837834
op := operation.NewCount().Session(sess).ClusterClock(coll.client.clock).
838835
Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor).
839836
Deployment(coll.client.topology).ReadConcern(rc).ReadPreference(coll.readPreference).
@@ -888,8 +885,7 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
888885
rc = nil
889886
}
890887

891-
selector := makePinnedSelector(sess, coll.readSelector)
892-
888+
selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
893889
option := options.MergeDistinctOptions(opts...)
894890

895891
op := operation.NewDistinct(fieldName, bsoncore.Document(f)).
@@ -971,8 +967,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
971967
rc = nil
972968
}
973969

974-
selector := makePinnedSelector(sess, coll.readSelector)
975-
970+
selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
976971
op := operation.NewFind(f).
977972
Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
978973
CommandMonitor(coll.client.monitor).ServerSelector(selector).
@@ -1421,3 +1416,14 @@ func makePinnedSelector(sess *session.Client, defaultSelector description.Server
14211416
return defaultSelector.SelectServer(t, svrs)
14221417
}
14231418
}
1419+
1420+
func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelectorFunc {
1421+
if sess != nil && sess.TransactionRunning() {
1422+
selector = description.CompositeSelector([]description.ServerSelector{
1423+
description.ReadPrefSelector(sess.CurrentRp),
1424+
description.LatencySelector(localThreshold),
1425+
})
1426+
}
1427+
1428+
return makePinnedSelector(sess, selector)
1429+
}

mongo/database.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,11 @@ func (db *Database) ListCollections(ctx context.Context, filter interface{}, opt
273273
return nil, err
274274
}
275275

276-
selector := makePinnedSelector(sess, db.readSelector)
276+
selector := description.CompositeSelector([]description.ServerSelector{
277+
description.ReadPrefSelector(readpref.Primary()),
278+
description.LatencySelector(db.client.localThreshold),
279+
})
280+
selector = makeReadPrefSelector(sess, selector, db.client.localThreshold)
277281

278282
lco := options.MergeListCollectionsOptions(opts...)
279283
op := operation.NewListCollections(filterDoc).

mongo/index_view.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,14 @@ func (iv IndexView) List(ctx context.Context, opts ...*options.ListIndexesOption
7575
return nil, err
7676
}
7777

78-
readSelector := description.CompositeSelector([]description.ServerSelector{
78+
selector := description.CompositeSelector([]description.ServerSelector{
7979
description.ReadPrefSelector(readpref.Primary()),
8080
description.LatencySelector(iv.coll.client.localThreshold),
8181
})
82+
selector = makeReadPrefSelector(sess, selector, iv.coll.client.localThreshold)
8283
op := operation.NewListIndexes().
8384
Session(sess).CommandMonitor(iv.coll.client.monitor).
84-
ServerSelector(readSelector).ClusterClock(iv.coll.client.clock).
85+
ServerSelector(selector).ClusterClock(iv.coll.client.clock).
8586
Database(iv.coll.db.name).Collection(iv.coll.name).
8687
Deployment(iv.coll.client.topology)
8788

@@ -197,10 +198,7 @@ func (iv IndexView) CreateMany(ctx context.Context, models []IndexModel, opts ..
197198
return nil, err
198199
}
199200

200-
selector := iv.coll.writeSelector
201-
if sess != nil && sess.PinnedServer != nil {
202-
selector = sess.PinnedServer
203-
}
201+
selector := makePinnedSelector(sess, iv.coll.writeSelector)
204202

205203
option := options.MergeCreateIndexesOptions(opts...)
206204

@@ -332,10 +330,7 @@ func (iv IndexView) drop(ctx context.Context, name string, opts ...*options.Drop
332330
sess = nil
333331
}
334332

335-
selector := iv.coll.writeSelector
336-
if sess != nil && sess.PinnedServer != nil {
337-
selector = sess.PinnedServer
338-
}
333+
selector := makePinnedSelector(sess, iv.coll.writeSelector)
339334

340335
dio := options.MergeDropIndexesOptions(opts...)
341336
op := operation.NewDropIndexes(name).

mongo/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,14 @@ func (s *sessionImpl) AbortTransaction(ctx context.Context) error {
185185
selector := makePinnedSelector(s.clientSession, description.WriteSelector())
186186

187187
s.clientSession.Aborting = true
188-
err = operation.NewAbortTransaction().Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").
188+
_ = operation.NewAbortTransaction().Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").
189189
Deployment(s.topo).WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).
190190
Retry(driver.RetryOncePerCommand).CommandMonitor(s.client.monitor).RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).Execute(ctx)
191191

192192
s.clientSession.Aborting = false
193193
_ = s.clientSession.AbortTransaction()
194194

195-
return replaceErrors(err)
195+
return nil
196196
}
197197

198198
// CommitTransaction commits the sesson's transaction.

0 commit comments

Comments
 (0)