Skip to content

Commit bbb9495

Browse files
committed
Introduce transaction with retries API
This commit adds two new API functions: * `Session#readTransaction()` * `Session#writeTransaction()` Both take a single function as input. This function takes a single argument of type `Transaction` and returns a promise. It can be used to perform regular async operations like query execution. Introduced functions will commit/rollback transaction depending on the returned promise, so user code does not need to call `Transaction#commit()` explicitly. They also perform retries if given transaction fails with network errors (`ServiceUnavaliable` or `SessionExpired`) or with transient errors (`DeadlockDetected`, etc.). Retries are one with exponential backoff with initial delay of 1 second and total cap of 30 seconds. These API functions are useful to hide tolerable network problems and transient errors for both single and causal cluster deployments. Commit also fixes a problem in transaction error handling where `_onError` executed a rollback but did not properly wait for the returned promise to complete.
1 parent 3568b43 commit bbb9495

13 files changed

+930
-30
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
21+
22+
const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000; // 30 seconds
23+
const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000; // 1 seconds
24+
const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0;
25+
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2;
26+
27+
export default class TransactionExecutor {
28+
29+
constructor(maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor) {
30+
this._maxRetryTimeMs = maxRetryTimeMs || DEFAULT_MAX_RETRY_TIME_MS;
31+
this._initialRetryDelayMs = initialRetryDelayMs || DEFAULT_INITIAL_RETRY_DELAY_MS;
32+
this._multiplier = multiplier || DEFAULT_RETRY_DELAY_MULTIPLIER;
33+
this._jitterFactor = jitterFactor || DEFAULT_RETRY_DELAY_JITTER_FACTOR;
34+
35+
this._inFlightTimeoutIds = [];
36+
37+
this._verifyAfterConstruction();
38+
}
39+
40+
execute(transactionCreator, transactionWork) {
41+
return new Promise((resolve, reject) => {
42+
this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
43+
}).catch(error => {
44+
const retryStartTimeMs = Date.now();
45+
const retryDelayMs = this._initialRetryDelayMs;
46+
return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTimeMs, retryDelayMs);
47+
});
48+
}
49+
50+
close() {
51+
// cancel all existing timeouts to prevent further retries
52+
this._inFlightTimeoutIds.forEach(timeoutId => clearTimeout(timeoutId));
53+
this._inFlightTimeoutIds = [];
54+
}
55+
56+
_retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, retryDelayMs) {
57+
const elapsedTimeMs = Date.now() - retryStartTime;
58+
59+
if (elapsedTimeMs > this._maxRetryTimeMs || !TransactionExecutor._canRetryOn(error)) {
60+
return Promise.reject(error);
61+
}
62+
63+
return new Promise((resolve, reject) => {
64+
const nextRetryTime = this._computeDelayWithJitter(retryDelayMs);
65+
const timeoutId = setTimeout(() => {
66+
// filter out this timeoutId when time has come and function is being executed
67+
this._inFlightTimeoutIds = this._inFlightTimeoutIds.filter(id => id !== timeoutId);
68+
this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
69+
}, nextRetryTime);
70+
// add newly created timeoutId to the list of all in-flight timeouts
71+
this._inFlightTimeoutIds.push(timeoutId);
72+
}).catch(error => {
73+
const nextRetryDelayMs = retryDelayMs * this._multiplier;
74+
return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, nextRetryDelayMs);
75+
});
76+
}
77+
78+
_executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) {
79+
try {
80+
const tx = transactionCreator();
81+
const transactionWorkResult = transactionWork(tx);
82+
83+
// user defined callback is supposed to return a promise, but it might not; so to protect against an
84+
// incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
85+
// validation step without type checks
86+
const resultPromise = Promise.resolve(transactionWorkResult);
87+
88+
resultPromise.then(result => {
89+
// transaction work returned resolved promise, try to commit the transaction
90+
tx.commit().then(() => {
91+
// transaction was committed, return result to the user
92+
resolve(result);
93+
}).catch(error => {
94+
// transaction failed to commit, propagate the failure
95+
reject(error);
96+
});
97+
}).catch(error => {
98+
// transaction work returned rejected promise, propagate the failure
99+
reject(error);
100+
});
101+
102+
} catch (error) {
103+
reject(error);
104+
}
105+
}
106+
107+
_computeDelayWithJitter(delayMs) {
108+
const jitter = (delayMs * this._jitterFactor);
109+
const min = delayMs - jitter;
110+
const max = delayMs + jitter;
111+
return Math.random() * (max - min) + min;
112+
}
113+
114+
static _canRetryOn(error) {
115+
return error && error.code &&
116+
(error.code === SERVICE_UNAVAILABLE ||
117+
error.code === SESSION_EXPIRED ||
118+
error.code.indexOf('TransientError') >= 0);
119+
}
120+
121+
_verifyAfterConstruction() {
122+
if (this._maxRetryTimeMs < 0) {
123+
throw newError('Max retry time should be >= 0: ' + this._maxRetryTimeMs);
124+
}
125+
if (this._initialRetryDelayMs < 0) {
126+
throw newError('Initial retry delay should >= 0: ' + this._initialRetryDelayMs);
127+
}
128+
if (this._multiplier < 1.0) {
129+
throw newError('Multiplier should be >= 1.0: ' + this._multiplier);
130+
}
131+
if (this._jitterFactor < 0 || this._jitterFactor > 1) {
132+
throw newError('Jitter factor should be in [0.0, 1.0]: ' + this._jitterFactor);
133+
}
134+
}
135+
};

src/v1/session.js

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import Transaction from './transaction';
2222
import {newError} from './error';
2323
import {assertString} from './internal/util';
2424
import ConnectionHolder from './internal/connection-holder';
25-
import {READ, WRITE} from './driver';
25+
import Driver, {READ, WRITE} from './driver';
26+
import TransactionExecutor from './internal/transaction-executor';
2627

2728
/**
2829
* A Session instance is used for handling the connection and
@@ -36,7 +37,7 @@ class Session {
3637
* @constructor
3738
* @param {string} mode the default access mode for this session.
3839
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
39-
* @param {string} bookmark - the initial bookmark for this session.
40+
* @param {string} [bookmark=undefined] - the initial bookmark for this session.
4041
*/
4142
constructor(mode, connectionProvider, bookmark) {
4243
this._mode = mode;
@@ -45,6 +46,7 @@ class Session {
4546
this._open = true;
4647
this._hasTx = false;
4748
this._lastBookmark = bookmark;
49+
this._transactionExecutor = new TransactionExecutor()
4850
}
4951

5052
/**
@@ -92,21 +94,24 @@ class Session {
9294
* @returns {Transaction} - New Transaction
9395
*/
9496
beginTransaction(bookmark) {
97+
return this._beginTransaction(this._mode, bookmark);
98+
}
99+
100+
_beginTransaction(accessMode, bookmark) {
95101
if (bookmark) {
96102
assertString(bookmark, 'Bookmark');
97103
this._updateBookmark(bookmark);
98104
}
99105

100106
if (this._hasTx) {
101-
throw newError("You cannot begin a transaction on a session with an "
102-
+ "open transaction; either run from within the transaction or use a "
103-
+ "different session.")
107+
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
108+
'either run from within the transaction or use a different session.');
104109
}
105110

106-
this._hasTx = true;
107-
108-
const connectionHolder = this._connectionHolderWithMode(this._mode);
111+
const mode = Driver._validateSessionMode(accessMode);
112+
const connectionHolder = this._connectionHolderWithMode(mode);
109113
connectionHolder.initializeConnection();
114+
this._hasTx = true;
110115

111116
return new Transaction(connectionHolder, () => {
112117
this._hasTx = false;
@@ -118,6 +123,21 @@ class Session {
118123
return this._lastBookmark;
119124
}
120125

126+
readTransaction(transactionWork) {
127+
return this._runTransaction(READ, transactionWork);
128+
}
129+
130+
writeTransaction(transactionWork) {
131+
return this._runTransaction(WRITE, transactionWork);
132+
}
133+
134+
_runTransaction(accessMode, transactionWork) {
135+
return this._transactionExecutor.execute(
136+
() => this._beginTransaction(accessMode, this.lastBookmark()),
137+
transactionWork
138+
);
139+
}
140+
121141
_updateBookmark(newBookmark) {
122142
if (newBookmark) {
123143
this._lastBookmark = newBookmark;
@@ -132,6 +152,7 @@ class Session {
132152
close(callback = (() => null)) {
133153
if (this._open) {
134154
this._open = false;
155+
this._transactionExecutor.close();
135156
this._readConnectionHolder.close().then(() => {
136157
this._writeConnectionHolder.close().then(() => {
137158
callback();

src/v1/transaction.js

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,21 @@ class Transaction {
104104
}
105105

106106
_onError() {
107-
// rollback explicitly if tx.run failed, rollback
108107
if (this._state == _states.ACTIVE) {
109-
this.rollback();
108+
// attempt to rollback, useful when Transaction#run() failed
109+
return this.rollback().catch(ignoredError => {
110+
// ignore all errors because it is best effort and transaction might already be rolled back
111+
}).then(() => {
112+
// after rollback attempt change this transaction's state to FAILED
113+
this._state = _states.FAILED;
114+
});
110115
} else {
111-
// else just do the cleanup
112-
this._onClose();
116+
// error happened in in-active transaction, just to the cleanup and change state to FAILED
117+
this._state = _states.FAILED;
118+
this._onClose();
119+
// no async actions needed - return resolved promise
120+
return Promise.resolve();
113121
}
114-
this._state = _states.FAILED;
115122
}
116123
}
117124

@@ -126,9 +133,10 @@ class _TransactionStreamObserver extends StreamObserver {
126133

127134
onError(error) {
128135
if (!this._hasFailed) {
129-
this._tx._onError();
130-
super.onError(error);
131-
this._hasFailed = true;
136+
this._tx._onError().then(() => {
137+
super.onError(error);
138+
this._hasFailed = true;
139+
});
132140
}
133141
}
134142

test/internal/timers-util.js

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
class SetTimeoutMock {
20+
21+
constructor() {
22+
this._clearState();
23+
}
24+
25+
install() {
26+
this._originalSetTimeout = global.setTimeout;
27+
global.setTimeout = (code, delay) => {
28+
if (!this._paused) {
29+
code();
30+
this.invocationDelays.push(delay);
31+
}
32+
return this._timeoutIdCounter++;
33+
};
34+
35+
this._originalClearTimeout = global.clearTimeout;
36+
global.clearTimeout = id => {
37+
this.clearedTimeouts.push(id);
38+
};
39+
40+
return this;
41+
}
42+
43+
pause() {
44+
this._paused = true;
45+
}
46+
47+
uninstall() {
48+
global.setTimeout = this._originalSetTimeout;
49+
global.clearTimeout = this._originalClearTimeout;
50+
this._clearState();
51+
}
52+
53+
setTimeoutOriginal(code, delay) {
54+
return this._originalSetTimeout.call(null, code, delay);
55+
}
56+
57+
_clearState() {
58+
this._originalSetTimeout = null;
59+
this._originalClearTimeout = null;
60+
this._paused = false;
61+
this._timeoutIdCounter = 0;
62+
63+
this.invocationDelays = [];
64+
this.clearedTimeouts = [];
65+
}
66+
}
67+
68+
export const setTimeoutMock = new SetTimeoutMock();
69+
70+
export function hijackNextDateNowCall(newValue) {
71+
const originalDate = global.Date;
72+
global.Date = new FakeDate(originalDate, newValue);
73+
}
74+
75+
class FakeDate {
76+
77+
constructor(originalDate, nextNowValue) {
78+
this._originalDate = originalDate;
79+
this._nextNowValue = nextNowValue;
80+
}
81+
82+
now() {
83+
global.Date = this._originalDate;
84+
return this._nextNowValue;
85+
}
86+
}

0 commit comments

Comments
 (0)