Skip to content

Commit b03a7e4

Browse files
benjirewisBenjamin Rewis
authored andcommitted
GODRIVER-1852 Add commitTransaction error check for timeout (#569)
1 parent 07b0039 commit b03a7e4

File tree

2 files changed

+131
-2
lines changed

2 files changed

+131
-2
lines changed

mongo/session.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,10 @@ func (s *sessionImpl) WithTransaction(ctx context.Context, fn func(sessCtx Sessi
209209
CommitLoop:
210210
for {
211211
err = s.CommitTransaction(ctx)
212-
if err == nil {
213-
return res, nil
212+
// End when error is nil (transaction has been committed), or when context has timed out or been
213+
// canceled, as retrying has no chance of success.
214+
if err == nil || ctx.Err() != nil {
215+
return res, err
214216
}
215217

216218
select {
@@ -307,6 +309,11 @@ func (s *sessionImpl) CommitTransaction(ctx context.Context) error {
307309
}
308310

309311
err = op.Execute(ctx)
312+
// Return error without updating transaction state if it is a timeout, as the transaction has not
313+
// actually been committed.
314+
if IsTimeout(err) {
315+
return replaceErrors(err)
316+
}
310317
s.clientSession.Committing = false
311318
commitErr := s.clientSession.CommitTransaction()
312319

mongo/with_transactions_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,128 @@ func TestConvenientTransactions(t *testing.T) {
259259
assert.True(t, ok, "expected context for abortTransaction to contain ctxKey")
260260
assert.Equal(t, "foobar", ctxValue, "expected value for ctxKey to be 'world', got %s", ctxValue)
261261
})
262+
t.Run("commitTransaction timeout allows abortTransaction", func(t *testing.T) {
263+
// Create a special CommandMonitor that only records information about abortTransaction events.
264+
var abortStarted []*event.CommandStartedEvent
265+
var abortSucceeded []*event.CommandSucceededEvent
266+
var abortFailed []*event.CommandFailedEvent
267+
monitor := &event.CommandMonitor{
268+
Started: func(ctx context.Context, evt *event.CommandStartedEvent) {
269+
if evt.CommandName == "abortTransaction" {
270+
abortStarted = append(abortStarted, evt)
271+
}
272+
},
273+
Succeeded: func(_ context.Context, evt *event.CommandSucceededEvent) {
274+
if evt.CommandName == "abortTransaction" {
275+
abortSucceeded = append(abortSucceeded, evt)
276+
}
277+
},
278+
Failed: func(_ context.Context, evt *event.CommandFailedEvent) {
279+
if evt.CommandName == "abortTransaction" {
280+
abortFailed = append(abortFailed, evt)
281+
}
282+
},
283+
}
284+
285+
// Set up a new Client using the command monitor defined above get a handle to a collection. The collection
286+
// needs to be explicitly created on the server because implicit collection creation is not allowed in
287+
// transactions for server versions <= 4.2.
288+
client := setupConvenientTransactions(t, options.Client().SetMonitor(monitor))
289+
db := client.Database("foo")
290+
coll := db.Collection("test")
291+
defer func() {
292+
_ = coll.Drop(bgCtx)
293+
}()
294+
295+
err := db.RunCommand(bgCtx, bson.D{{"create", coll.Name()}}).Err()
296+
assert.Nil(t, err, "error creating collection on server: %v", err)
297+
298+
// Start session.
299+
session, err := client.StartSession()
300+
defer session.EndSession(bgCtx)
301+
assert.Nil(t, err, "StartSession error: %v", err)
302+
303+
_ = WithSession(bgCtx, session, func(sessionContext SessionContext) error {
304+
// Start transaction.
305+
err = session.StartTransaction()
306+
assert.Nil(t, err, "StartTransaction error: %v", err)
307+
308+
// Insert a document.
309+
_, err := coll.InsertOne(sessionContext, bson.D{{"val", 17}})
310+
assert.Nil(t, err, "InsertOne error: %v", err)
311+
312+
// Set a timeout of 0 for commitTransaction.
313+
commitTimeoutCtx, commitCancel := context.WithTimeout(sessionContext, 0)
314+
defer commitCancel()
315+
316+
// CommitTransaction results in context.DeadlineExceeded.
317+
commitErr := session.CommitTransaction(commitTimeoutCtx)
318+
assert.True(t, IsTimeout(commitErr),
319+
"expected timeout error error; got %v", commitErr)
320+
321+
// Assert session state is not Committed.
322+
clientSession := session.(XSession).ClientSession()
323+
assert.False(t, clientSession.TransactionCommitted(), "expected session state to not be Committed")
324+
325+
// AbortTransaction without error.
326+
abortErr := session.AbortTransaction(context.Background())
327+
assert.Nil(t, abortErr, "AbortTransaction error: %v", abortErr)
328+
329+
// Assert that AbortTransaction was started once and succeeded.
330+
assert.Equal(t, 1, len(abortStarted), "expected 1 abortTransaction started event, got %d", len(abortStarted))
331+
assert.Equal(t, 1, len(abortSucceeded), "expected 1 abortTransaction succeeded event, got %d",
332+
len(abortSucceeded))
333+
assert.Equal(t, 0, len(abortFailed), "expected 0 abortTransaction failed events, got %d", len(abortFailed))
334+
335+
return nil
336+
})
337+
})
338+
t.Run("commitTransaction timeout does not retry", func(t *testing.T) {
339+
withTransactionTimeout = 2 * time.Second
340+
341+
coll := db.Collection("test")
342+
// Explicitly create the collection on server because implicit collection creation is not allowed in
343+
// transactions for server versions <= 4.2.
344+
err := db.RunCommand(bgCtx, bson.D{{"create", coll.Name()}}).Err()
345+
assert.Nil(t, err, "error creating collection on server: %v", err)
346+
defer func() {
347+
_ = coll.Drop(bgCtx)
348+
}()
349+
350+
// Start session.
351+
sess, err := client.StartSession()
352+
assert.Nil(t, err, "StartSession error: %v", err)
353+
defer sess.EndSession(context.Background())
354+
355+
// Defer running killAllSessions to manually close open transaction.
356+
defer func() {
357+
err := dbAdmin.RunCommand(bgCtx, bson.D{
358+
{"killAllSessions", bson.A{}},
359+
}).Err()
360+
if err != nil {
361+
if ce, ok := err.(CommandError); !ok || ce.Code != errorInterrupted {
362+
t.Fatalf("killAllSessions error: %v", err)
363+
}
364+
}
365+
}()
366+
367+
// Create context to manually cancel in callback.
368+
cancelCtx, cancel := context.WithCancel(bgCtx)
369+
defer cancel()
370+
371+
// Insert a document within a session and manually cancel context.
372+
callback := func() {
373+
_, _ = sess.WithTransaction(cancelCtx, func(sessCtx SessionContext) (interface{}, error) {
374+
_, err := coll.InsertOne(sessCtx, bson.M{"x": 1})
375+
assert.Nil(t, err, "InsertOne error: %v", err)
376+
cancel()
377+
return nil, nil
378+
})
379+
}
380+
381+
// Assert that transaction is canceled within 500ms and not 2 seconds.
382+
assert.Soon(t, callback, 500*time.Millisecond)
383+
})
262384
}
263385

264386
func setupConvenientTransactions(t *testing.T, extraClientOpts ...*options.ClientOptions) *Client {

0 commit comments

Comments
 (0)