Skip to content

Commit 933eecf

Browse files
author
Lars-Erik Roald
committed
atomic deletes
1 parent b31507e commit 933eecf

File tree

8 files changed

+302
-121
lines changed

8 files changed

+302
-121
lines changed

src/patchTable.js

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/* eslint-disable require-atomic-updates */
22
let applyPatch = require('./applyPatch');
33
let fromCompareObject = require('./fromCompareObject');
4-
let validateDeleteConflict = require('./validateDeleteConflict');
54
let validateDeleteAllowed = require('./validateDeleteAllowed');
65
let clearCache = require('./table/clearCache');
76
const getSessionSingleton = require('./table/getSessionSingleton');
@@ -232,11 +231,12 @@ async function patchTableCore(context, table, patches, { strategy = undefined, d
232231
async function remove({ path, op, oldValue, options }, table, row) {
233232
let property = path[0];
234233
path = path.slice(1);
235-
row = row || await fetchFromDb({context, table, key: toKey(property)});
234+
if (!row)
235+
row = await getOrCreateRow({ table, strategy: {}, property });
236236
if (path.length === 0) {
237237
await validateDeleteAllowed({ row, options, table });
238-
if (await validateDeleteConflict({ context, row, oldValue, options, table }))
239-
await row.deleteCascade();
238+
applyDeleteConcurrencyState(row, oldValue, options, table);
239+
await row.deleteCascade();
240240
}
241241
property = path[0];
242242
if (isColumn(property, table)) {
@@ -367,6 +367,32 @@ async function patchTableCore(context, table, patches, { strategy = undefined, d
367367
return table[name] && table[name]._relation.columns;
368368
}
369369

370+
function applyDeleteConcurrencyState(row, oldValue, options, table) {
371+
const state = { columns: {} };
372+
if (oldValue && oldValue === Object(oldValue)) {
373+
for (let p in oldValue) {
374+
if (!isColumn(p, table))
375+
continue;
376+
const columnOptions = inferOptions(options, p);
377+
const concurrency = columnOptions.concurrency || 'optimistic';
378+
if (concurrency === 'overwrite')
379+
continue;
380+
state.columns[p] = { oldValue: fromCompareObject(oldValue[p]), concurrency };
381+
}
382+
}
383+
if (Object.keys(state.columns).length === 0) {
384+
const concurrency = options.concurrency || 'optimistic';
385+
if (concurrency !== 'overwrite') {
386+
for (let i = 0; i < table._primaryColumns.length; i++) {
387+
const pkName = table._primaryColumns[i].alias;
388+
state.columns[pkName] = { oldValue: row[pkName], concurrency };
389+
}
390+
}
391+
}
392+
if (Object.keys(state.columns).length > 0)
393+
row._concurrencyState = state;
394+
}
395+
370396
function inferOptions(defaults, property) {
371397
const parent = {};
372398
if ('readonly' in defaults)

src/table/commands/delete/newSingleCommand.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ var extractFilter = require('../../query/extractFilter');
44
var newSingleCommandCore = require('./singleCommand/newSingleCommandCore');
55
var createAlias = require('./createAlias');
66

7-
function _new(context, table, filter, relations) {
7+
function _new(context, table, filter, relations, concurrencyState) {
88
var alias = createAlias(table, relations.length);
99
filter = extractFilter(filter);
1010
filter = newSubFilter(context, relations, filter);
1111
var discriminator = newDiscriminatorSql(context, table, alias);
1212
if (discriminator !== '')
1313
filter = filter.and(context, discriminator);
14-
return newSingleCommandCore(context, table, filter, alias);
14+
return newSingleCommandCore(context, table, filter, alias, concurrencyState);
1515
}
1616

17-
module.exports = _new;
17+
module.exports = _new;
Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,128 @@
11
var getSessionSingleton = require('../../../getSessionSingleton');
2+
var newParameterized = require('../../../query/newParameterized');
23

3-
function newSingleCommandCore(context, table, filter, alias) {
4+
function newSingleCommandCore(context, table, filter, alias, concurrencyState) {
45
var c = {};
6+
var quote = getSessionSingleton(context, 'quote');
7+
var engine = getSessionSingleton(context, 'engine');
8+
var concurrency = buildConcurrencyChecks(concurrencyState);
9+
var parameters = filter.parameters ? filter.parameters.slice() : [];
10+
if (concurrency && concurrency.parameters.length > 0)
11+
parameters = parameters.concat(concurrency.parameters);
512

613
c.sql = function() {
714
var whereSql = filter.sql();
15+
if (concurrency && concurrency.sql) {
16+
if (whereSql)
17+
whereSql += ' AND ' + concurrency.sql;
18+
else
19+
whereSql = concurrency.sql;
20+
}
821
if (whereSql)
922
whereSql = ' where ' + whereSql;
1023
var deleteFromSql = getSessionSingleton(context, 'deleteFromSql');
1124
return deleteFromSql(table, alias, whereSql);
1225
};
1326

14-
c.parameters = filter.parameters;
27+
c.parameters = parameters;
1528

1629
return c;
30+
31+
function buildConcurrencyChecks(state) {
32+
const columnsState = state && state.columns;
33+
if (!columnsState)
34+
return;
35+
const parts = [];
36+
const params = [];
37+
for (let alias in columnsState) {
38+
const columnState = columnsState[alias];
39+
if (!columnState || columnState.concurrency === 'overwrite')
40+
continue;
41+
const column = table[alias];
42+
if (!column)
43+
continue;
44+
const encoded = (engine === 'mysql' && column.tsType === 'JSONColumn')
45+
? encodeJsonValue(columnState.oldValue, column)
46+
: column.encode(context, columnState.oldValue);
47+
const comparison = buildNullSafeComparison(column, encoded);
48+
if (comparison.sql)
49+
parts.push(comparison.sql());
50+
if (comparison.parameters.length > 0)
51+
params.push(...comparison.parameters);
52+
}
53+
if (parts.length === 0)
54+
return;
55+
return { sql: parts.join(' AND '), parameters: params };
56+
}
57+
58+
function buildNullSafeComparison(column, encoded) {
59+
const columnSql = quote(column._dbName);
60+
if (engine === 'pg') {
61+
return newParameterized(columnSql + ' IS NOT DISTINCT FROM ' + encoded.sql(), encoded.parameters);
62+
}
63+
if (engine === 'mysql') {
64+
return newParameterized(columnSql + ' <=> ' + encoded.sql(), encoded.parameters);
65+
}
66+
if (engine === 'sqlite') {
67+
return newParameterized(columnSql + ' IS ' + encoded.sql(), encoded.parameters);
68+
}
69+
if (engine === 'sap' && column.tsType === 'JSONColumn') {
70+
if (encoded.sql() === 'null')
71+
return newParameterized(columnSql + ' IS NULL');
72+
const casted = newParameterized('CONVERT(VARCHAR(16384), ' + encoded.sql() + ')', encoded.parameters);
73+
return newParameterized('CONVERT(VARCHAR(16384), ' + columnSql + ')=' + casted.sql(), casted.parameters);
74+
}
75+
if (engine === 'oracle' && column.tsType === 'JSONColumn') {
76+
if (encoded.sql() === 'null')
77+
return newParameterized(columnSql + ' IS NULL');
78+
const jsonValue = newParameterized('JSON(' + encoded.sql() + ')', encoded.parameters);
79+
return newParameterized('JSON_EQUAL(' + columnSql + ', ' + jsonValue.sql() + ')', jsonValue.parameters);
80+
}
81+
if (encoded.sql() === 'null')
82+
return newParameterized(columnSql + ' IS NULL');
83+
return newParameterized(columnSql + '=' + encoded.sql(), encoded.parameters);
84+
}
85+
86+
function encodeJsonValue(value, column) {
87+
if (engine === 'oracle') {
88+
if (value === null || value === undefined)
89+
return newParameterized('null');
90+
if (isJsonObject(value))
91+
return column.encode(context, value);
92+
if (typeof value === 'boolean' || typeof value === 'number')
93+
return newParameterized('?', [String(value)]);
94+
return newParameterized('?', [value]);
95+
}
96+
if (engine === 'pg') {
97+
const jsonValue = JSON.stringify(value === undefined ? null : value);
98+
return newParameterized('?::jsonb', [jsonValue]);
99+
}
100+
if (engine === 'mysql') {
101+
const jsonValue = JSON.stringify(value === undefined ? null : value);
102+
return newParameterized('CAST(? AS JSON)', [jsonValue]);
103+
}
104+
if (engine === 'sqlite') {
105+
if (isJsonObject(value)) {
106+
const jsonValue = JSON.stringify(value);
107+
return newParameterized('?', [jsonValue]);
108+
}
109+
if (value === null || value === undefined)
110+
return newParameterized('null');
111+
return newParameterized('?', [value]);
112+
}
113+
if (engine === 'mssql' || engine === 'mssqlNative') {
114+
if (isJsonObject(value))
115+
return newParameterized('JSON_QUERY(?)', [JSON.stringify(value)]);
116+
if (value === null || value === undefined)
117+
return newParameterized('null');
118+
return newParameterized('?', [String(value)]);
119+
}
120+
return column.encode(context, value);
121+
}
122+
123+
function isJsonObject(value) {
124+
return value && typeof value === 'object';
125+
}
17126
}
18127

19-
module.exports = newSingleCommandCore;
128+
module.exports = newSingleCommandCore;

src/table/commands/newDeleteCommand.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
var newSingleCommand = require('./delete/newSingleCommand');
22

3-
function newCommand(context, queries, table, filter, strategy, relations) {
4-
var singleCommand = newSingleCommand(context, table, filter, relations);
3+
function newCommand(context, queries, table, filter, strategy, relations, concurrencyState) {
4+
var singleCommand = newSingleCommand(context, table, filter, relations, concurrencyState);
55
for (var name in strategy) {
66
if (!(strategy[name] === null || strategy[name]))
77
continue;
@@ -15,4 +15,4 @@ function newCommand(context, queries, table, filter, strategy, relations) {
1515
return queries;
1616
}
1717

18-
module.exports = newCommand;
18+
module.exports = newCommand;

src/table/resultToRows/delete.js

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,25 @@ function _delete(context, row, strategy, table) {
1414
args.push(row[primary.alias]);
1515
});
1616
var filter = newPrimaryKeyFilter.apply(null, args);
17-
var cmds = newDeleteCommand(context, [], table, filter, strategy, relations);
17+
var concurrencyState = row._concurrencyState;
18+
delete row._concurrencyState;
19+
var cmds = newDeleteCommand(context, [], table, filter, strategy, relations, concurrencyState);
1820
cmds.forEach(function(cmd) {
1921
pushCommand(context, cmd);
2022
});
2123
var cmd = cmds[0];
24+
var concurrencySummary = summarizeConcurrency(concurrencyState);
25+
if (concurrencySummary.hasConcurrency) {
26+
var deleteCmd = cmds[cmds.length - 1];
27+
deleteCmd.onResult = function(result) {
28+
var rowCount = extractRowCount(result);
29+
if (rowCount === undefined)
30+
return;
31+
if (rowCount === 0 && concurrencySummary.hasOptimistic) {
32+
throw new Error('The row was changed by another user.');
33+
}
34+
};
35+
}
2236
if (table._emitChanged.callbacks.length > 0) {
2337
cmd.disallowCompress = true;
2438
var dto = createDto(table, row);
@@ -28,4 +42,35 @@ function _delete(context, row, strategy, table) {
2842

2943
}
3044

31-
module.exports = _delete;
45+
function summarizeConcurrency(concurrencyState) {
46+
const summary = { hasConcurrency: false, hasOptimistic: false };
47+
if (!concurrencyState || !concurrencyState.columns)
48+
return summary;
49+
for (let name in concurrencyState.columns) {
50+
const state = concurrencyState.columns[name];
51+
if (!state)
52+
continue;
53+
const strategy = state.concurrency || 'optimistic';
54+
if (strategy === 'overwrite')
55+
continue;
56+
summary.hasConcurrency = true;
57+
if (strategy === 'optimistic')
58+
summary.hasOptimistic = true;
59+
}
60+
return summary;
61+
}
62+
63+
function extractRowCount(result) {
64+
if (Array.isArray(result) && typeof result[0].rowsAffected === 'number')
65+
return result[0].rowsAffected;
66+
if (!result || typeof result !== 'object')
67+
return;
68+
if (typeof result.rowCount === 'number')
69+
return result.rowCount;
70+
if (typeof result.affectedRows === 'number')
71+
return result.affectedRows;
72+
if (typeof result.rowsAffected === 'number')
73+
return result.rowsAffected;
74+
}
75+
76+
module.exports = _delete;

src/validateDeleteAllowed.js

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ async function validateDeleteAllowed({ row, options, table }) {
55
e.status = 405;
66
throw e;
77
}
8-
for (let p in options)
9-
if (isColumn(p, table))
10-
return;
11-
else if (isManyRelation(p, table)) {
8+
if (!hasReadonlyTrue(options))
9+
return;
10+
for (let p in options) {
11+
if (isManyRelation(p, table)) {
1212
const childTable = table[p]._relation.childTable;
1313
const childOptions = inferOptions(options, p);
14+
if (!hasReadonlyTrue(childOptions))
15+
continue;
1416
const children = await row[p];
1517
for (let i = 0; i < children.length; i++) {
1618
const childRow = children[i];
@@ -19,15 +21,15 @@ async function validateDeleteAllowed({ row, options, table }) {
1921
}
2022
else if (isOneRelation(p, table)) {
2123
const childOptions = inferOptions(options, p);
24+
if (!hasReadonlyTrue(childOptions))
25+
continue;
2226
const childTable = table[p]._relation.childTable;
2327
let childRow = await row[p];
2428
await validateDeleteAllowed({ row: childRow, options: childOptions, table: childTable });
2529
}
30+
}
2631
}
2732

28-
function isColumn(name, table) {
29-
return table[name] && table[name].equal;
30-
}
3133

3234
function isManyRelation(name, table) {
3335
return table[name] && table[name]._relation.isMany;
@@ -46,4 +48,19 @@ function inferOptions(defaults, property) {
4648
return {...parent, ...(defaults[property] || {})};
4749
}
4850

49-
module.exports = validateDeleteAllowed;
51+
function hasReadonlyTrue(options) {
52+
if (!options || options !== Object(options))
53+
return false;
54+
if (options.readonly === true)
55+
return true;
56+
for (let p in options) {
57+
const value = options[p];
58+
if (!value || value !== Object(value))
59+
continue;
60+
if (hasReadonlyTrue(value))
61+
return true;
62+
}
63+
return false;
64+
}
65+
66+
module.exports = validateDeleteAllowed;

0 commit comments

Comments
 (0)