|
10 | 10 | * ******************************************************************************* |
11 | 11 | * |
12 | 12 | */ |
13 | | - |
14 | | -const db = require('./../sequelize/models') |
15 | | -const retry = require('retry-as-promised') |
16 | | -const sequelize = db.sequelize |
| 13 | +const cq = require('concurrent-queue') |
17 | 14 | const Transaction = require('sequelize/lib/transaction') |
| 15 | + |
18 | 16 | const { isTest } = require('../helpers/app-helper') |
19 | 17 |
|
| 18 | +const transactionsQueue = cq() |
| 19 | + .limit({ concurrency: 1 }) |
| 20 | + .process((task, cb) => { |
| 21 | + task.transaction |
| 22 | + .apply(task.that, task.args) |
| 23 | + .then((res) => cb(null, res)) |
| 24 | + .catch((err) => cb(err, null)) |
| 25 | + }) |
| 26 | + |
20 | 27 | function transaction(f) { |
21 | | - return async function(...fArgs) { |
| 28 | + const fakeTransactionObject = { fakeTransaction: true } |
| 29 | + return function(...fArgs) { |
22 | 30 | if (isTest()) { |
23 | | - return await f.apply(this, fArgs) |
| 31 | + return f.apply(this, fArgs) |
24 | 32 | } |
25 | 33 |
|
26 | | - // TODO [when transactions concurrency issue fixed]: Remove 'fArgs[fArgs.length - 1].fakeTransaction' |
27 | | - if (fArgs.length > 0 && fArgs[fArgs.length - 1] |
28 | | - && (fArgs[fArgs.length - 1] instanceof Transaction || fArgs[fArgs.length - 1].fakeTransaction)) { |
29 | | - return await f.apply(this, fArgs) |
| 34 | + if (fArgs.length > 0 && fArgs[fArgs.length - 1] instanceof Transaction) { |
| 35 | + fArgs[fArgs.length - 1] = fakeTransactionObject |
| 36 | + return f.apply(this, fArgs) |
30 | 37 | } else { |
31 | | - // return f.apply(this, fArgs) |
32 | | - return sequelize.transaction(async (t) => { |
33 | | - fArgs.push(t) |
34 | | - return await f.apply(this, fArgs) |
35 | | - }) |
| 38 | + fArgs.push(fakeTransactionObject) |
| 39 | + return f.apply(this, fArgs) |
36 | 40 | } |
37 | 41 | } |
38 | 42 | } |
39 | 43 |
|
40 | | -function generateTransaction(f) { |
41 | | - return function(...args) { |
42 | | - return retry(() => { |
43 | | - const t = transaction(f) |
44 | | - return t.apply(this, args) |
45 | | - }, { |
46 | | - max: 5, |
47 | | - match: [ |
48 | | - sequelize.ConnectionError, |
49 | | - 'SQLITE_BUSY', |
50 | | - ], |
51 | | - }) |
| 44 | +function queueTransaction(resolve, reject, transaction, that, retries, ...args) { |
| 45 | + const task = { |
| 46 | + transaction, |
| 47 | + that, |
| 48 | + retries, |
| 49 | + args, |
52 | 50 | } |
53 | | -} |
54 | 51 |
|
55 | | -function fakeTransaction(f) { |
56 | | - const fakeTransactionObject = { fakeTransaction: true } |
57 | | - return async function(...fArgs) { |
58 | | - if (isTest()) { |
59 | | - return await f.apply(this, fArgs) |
| 52 | + transactionsQueue(task, (error, success) => { |
| 53 | + if (error === null) { |
| 54 | + return resolve(success) |
60 | 55 | } |
61 | 56 |
|
62 | | - if (fArgs.length > 0 && fArgs[fArgs.length - 1] instanceof Transaction) { |
63 | | - fArgs[fArgs.length - 1] = fakeTransactionObject |
64 | | - return await f.apply(this, fArgs) |
65 | | - } else { |
66 | | - fArgs.push(fakeTransactionObject) |
67 | | - return await f.apply(this, fArgs) |
| 57 | + if (retries < 1 || (error.message || '').indexOf('SQLITE_BUSY') === -1) { |
| 58 | + return reject(error) |
68 | 59 | } |
69 | | - } |
| 60 | + |
| 61 | + queueTransaction(resolve, reject, transaction, that, retries - 1, ...args) |
| 62 | + }) |
70 | 63 | } |
71 | 64 |
|
72 | | -// TODO [when transactions concurrency issue fixed]: Remove |
73 | | -function generateFakeTransaction(f) { |
| 65 | +function applyTransaction(resolve, reject, transaction, that, ...args) { |
| 66 | + transaction.apply(that, args) |
| 67 | + .then(resolve) |
| 68 | + .catch((error) => { |
| 69 | + if ((error.message || '').indexOf('SQLITE_BUSY') === -1) { |
| 70 | + return reject(error) |
| 71 | + } |
| 72 | + |
| 73 | + queueTransaction(resolve, reject, transaction, this, 5, ...args) |
| 74 | + }) |
| 75 | +} |
| 76 | + |
| 77 | +function generateTransaction(f) { |
74 | 78 | return function(...args) { |
75 | | - return retry(() => { |
76 | | - const t = fakeTransaction(f) |
77 | | - return t.apply(this, args) |
78 | | - }, { |
79 | | - max: 5, |
80 | | - match: [ |
81 | | - sequelize.ConnectionError, |
82 | | - 'SQLITE_BUSY', |
83 | | - ], |
| 79 | + const t = transaction(f) |
| 80 | + |
| 81 | + return new Promise((resolve, reject) => { |
| 82 | + applyTransaction(resolve, reject, t, this, ...args) |
84 | 83 | }) |
85 | 84 | } |
86 | 85 | } |
87 | 86 |
|
88 | 87 | module.exports = { |
89 | 88 | generateTransaction: generateTransaction, |
90 | | - generateFakeTransaction: generateFakeTransaction, |
91 | 89 | } |
0 commit comments