Skip to content
120 changes: 66 additions & 54 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
MongoErrorLabel,
MongoExpiredSessionError,
MongoInvalidArgumentError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError,
MongoTransactionError,
Expand Down Expand Up @@ -725,7 +726,7 @@ export class ClientSession
timeoutMS?: number;
}
): Promise<T> {
const MAX_TIMEOUT = 120000;
const MAX_TIMEOUT = 120_000;

const timeoutMS = options?.timeoutMS ?? this.timeoutMS ?? null;
this.timeoutContext =
Expand All @@ -737,10 +738,12 @@ export class ClientSession
})
: null;

// 1. Record the current monotonic time, which will be used to enforce the 120-second timeout before later retry attempts.
const startTime = this.timeoutContext?.csotEnabled() // This is strictly to appease TS. We must narrow the context to a CSOT context before accessing `.start`.
? this.timeoutContext.start
: processTimeMS();
// 1. Compute the absolute deadline for timeout enforcement.
// 1.3 Set `TIMEOUT_MS` to be `timeoutMS` if given, otherwise MAX_TIMEOUT (120-seconds).
const csotEnabled = !!this.timeoutContext?.csotEnabled();
const deadline = this.timeoutContext?.csotEnabled()
? processTimeMS() + this.timeoutContext.remainingTimeMS
: processTimeMS() + MAX_TIMEOUT;

let committed = false;
let result: T;
Expand Down Expand Up @@ -774,17 +777,13 @@ export class ClientSession
BACKOFF_MAX_MS
);

const willExceedTransactionDeadline =
(this.timeoutContext?.csotEnabled() &&
backoffMS > this.timeoutContext.remainingTimeMS) ||
processTimeMS() + backoffMS > startTime + MAX_TIMEOUT;

if (willExceedTransactionDeadline) {
throw (
if (processTimeMS() + backoffMS >= deadline) {
throw makeTimeoutError(
lastError ??
new MongoRuntimeError(
`Transaction retry did not record an error: should never occur. Please file a bug.`
)
new MongoRuntimeError(
`Transaction retry did not record an error: should never occur. Please file a bug.`
),
csotEnabled
);
}

Expand Down Expand Up @@ -827,6 +826,8 @@ export class ClientSession
throw fnError;
}

lastError = fnError;

if (
this.transaction.state === TxnState.STARTING_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS
Expand All @@ -836,14 +837,15 @@ export class ClientSession
await this.abortTransaction();
}

if (
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext?.csotEnabled() || processTimeMS() - startTime < MAX_TIMEOUT)
) {
// 7.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of `withTransaction`
// is less than 120 seconds, jump back to step two.
lastError = fnError;
continue retryTransaction;
if (fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
if (processTimeMS() < deadline) {
// 7.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of `withTransaction`
// is less than TIMEOUT_MS, jump back to step two.
continue retryTransaction;
} else {
// 7.ii (cont.) If timeout has been exceeded, raise the transient error (or wrap in timeout for CSOT).
throw makeTimeoutError(fnError, csotEnabled);
}
}

// 7.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction,
Expand All @@ -865,37 +867,32 @@ export class ClientSession
committed = true;
// 10. If commitTransaction reported an error:
} catch (commitError) {
// If CSOT is enabled, we repeatedly retry until timeoutMS expires. This is enforced by providing a
// timeoutContext to each async API, which know how to cancel themselves (i.e., the next retry will
// abort the withTransaction call).
// If CSOT is not enabled, do we still have time remaining or have we timed out?
const hasTimedOut =
!this.timeoutContext?.csotEnabled() && processTimeMS() - startTime >= MAX_TIMEOUT;

if (!hasTimedOut) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
// 10.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than 120 seconds, jump back to step eight.
continue retryCommit;
}

if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
// 10.ii If the commitTransaction error includes a "TransientTransactionError" label
// and the elapsed time of withTransaction is less than 120 seconds, jump back to step two.
lastError = commitError;

continue retryTransaction;
}
lastError = commitError;

if (processTimeMS() >= deadline) {
throw makeTimeoutError(commitError, csotEnabled);
}

/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
// 10.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than TIMEOUT_MS, jump back to step eight.
continue retryCommit;
}

if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
// 10.ii If the commitTransaction error includes a "TransientTransactionError" label
// and the elapsed time of withTransaction is less than TIMEOUT_MS, jump back to step two.
continue retryTransaction;
}

// 10.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
Expand All @@ -912,6 +909,21 @@ export class ClientSession
}
}

function makeTimeoutError(cause: Error, csotEnabled: boolean): Error {
if (csotEnabled) {
const timeoutError = new MongoOperationTimeoutError('Timed out during withTransaction', {
cause
});
if (cause instanceof MongoError) {
for (const label of cause.errorLabels) {
timeoutError.addErrorLabel(label);
}
}
return timeoutError;
}
return cause;
}

const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
'CannotSatisfyWriteConcern',
'UnknownReplWriteConcern',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ import { expect } from 'chai';
import { test } from 'mocha';
import * as sinon from 'sinon';

import { type ClientSession, type Collection, type MongoClient } from '../../mongodb';
import { configureFailPoint, type FailCommandFailPoint, measureDuration } from '../../tools/utils';
import { type ClientSession, type Collection, type MongoClient, MongoError } from '../../mongodb';
import {
clearFailPoint,
configureFailPoint,
type FailCommandFailPoint,
measureDuration
} from '../../tools/utils';

const failCommand: FailCommandFailPoint = {
configureFailPoint: 'failCommand',
Expand Down Expand Up @@ -85,3 +90,158 @@ describe('Retry Backoff is Enforced', function () {
}
);
});

describe('Retry Timeout is Enforced', function () {
// Drivers should test that withTransaction enforces a non-configurable timeout before retrying
// both commits and entire transactions.
//
// We stub performance.now() to simulate elapsed time exceeding the 120-second retry limit,
// as recommended by the spec: "This might be done by internally modifying the timeout value
// used by withTransaction with some private API or using a mock timer."
//
// Without CSOT, the original error is propagated directly.
// With CSOT, the error is wrapped in a MongoOperationTimeoutError.

let client: MongoClient;
let collection: Collection;
let timeOffset: number;

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('foo').collection('bar');

timeOffset = 0;
const originalNow = performance.now.bind(performance);
sinon.stub(performance, 'now').callsFake(() => originalNow() + timeOffset);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how spec suggests to test, similar implementations in:

});

afterEach(async function () {
sinon.restore();
await clearFailPoint(this.configuration);
await client?.close();
});

// Case 1: If the callback raises an error with the TransientTransactionError label and the retry
// timeout has been exceeded, withTransaction should propagate the error to its caller.
test(
'callback TransientTransactionError propagated when retry timeout exceeded',
{
requires: {
mongodb: '>=4.4',
topology: '!single'
}
},
async function () {
// 1. Configure a failpoint that fails insert with TransientTransactionError.
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: ['insert'],
errorCode: 24,
errorLabels: ['TransientTransactionError']
}
});

// 2. Run withTransaction. The callback advances the clock past the 120-second retry
// limit before the insert fails, so the timeout is detected immediately.
const { result } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(async session => {
timeOffset = 120_000;
await collection.insertOne({}, { session });
});
});
});

// 3. Assert that the error is the original TransientTransactionError (propagated directly
// in the legacy non-CSOT path).
expect(result).to.be.instanceOf(MongoError);
expect((result as MongoError).hasErrorLabel('TransientTransactionError')).to.be.true;
}
);

// Case 2: If committing raises an error with the UnknownTransactionCommitResult label, and the
// retry timeout has been exceeded, withTransaction should propagate the error to
// its caller.
test(
'commit UnknownTransactionCommitResult propagated when retry timeout exceeded',
{
requires: {
mongodb: '>=4.4',
topology: '!single'
}
},
async function () {
// 1. Configure a failpoint that fails commitTransaction with UnknownTransactionCommitResult.
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: ['commitTransaction'],
errorCode: 64,
errorLabels: ['UnknownTransactionCommitResult']
}
});

// 2. Run withTransaction. The callback advances the clock past the 120-second retry
// limit. The insert succeeds, but the commit fails and the timeout is detected.
const { result } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(async session => {
timeOffset = 120_000;
await collection.insertOne({}, { session });
});
});
});

// 3. Assert that the error is the original commit error (propagated directly
// in the legacy non-CSOT path).
expect(result).to.be.instanceOf(MongoError);
expect((result as MongoError).hasErrorLabel('UnknownTransactionCommitResult')).to.be.true;
}
);

// Case 3: If committing raises an error with the TransientTransactionError label and the retry
// timeout has been exceeded, withTransaction should propagate the error to its
// caller. This case may occur if the commit was internally retried against a new primary after a
// failover and the second primary returned a NoSuchTransaction error response.
test(
'commit TransientTransactionError propagated when retry timeout exceeded',
{
requires: {
mongodb: '>=4.4',
topology: '!single'
}
},
async function () {
// 1. Configure a failpoint that fails commitTransaction with TransientTransactionError
// (errorCode 251 = NoSuchTransaction).
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: ['commitTransaction'],
errorCode: 251,
errorLabels: ['TransientTransactionError']
}
});

// 2. Run withTransaction. The callback advances the clock past the 120-second retry
// limit. The insert succeeds, but the commit fails and the timeout is detected.
const { result } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(async session => {
timeOffset = 120_000;
await collection.insertOne({}, { session });
});
});
});

// 3. Assert that the error is the original commit error (propagated directly
// in the legacy non-CSOT path).
expect(result).to.be.instanceOf(MongoError);
expect((result as MongoError).hasErrorLabel('TransientTransactionError')).to.be.true;
}
);
});
Loading
Loading