Skip to content

Commit c65dbf4

Browse files
committed
[sqlite] Sequential DB transaction support
1 parent acade00 commit c65dbf4

File tree

10 files changed

+120
-51
lines changed

10 files changed

+120
-51
lines changed

src/persisters.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ import {
66
PersisterStats,
77
} from './types/persisters.d';
88
import {arrayPush, arrayShift} from './common/array';
9+
import {mapEnsure, mapGet, mapNew, mapSet} from './common/map';
910
import {objFreeze, objIsEmpty} from './common/obj';
1011
import {Id} from './types/common.d';
1112

1213
type Action = () => Promise<any>;
1314

15+
const scheduleRunning: Map<any, 0 | 1> = mapNew();
16+
const scheduleActions: Map<any, Action[]> = mapNew();
17+
1418
export const createCustomPersister = <ListeningHandle>(
1519
store: Store,
1620
getPersisted: () => Promise<[Tables, Values] | undefined>,
@@ -21,6 +25,7 @@ export const createCustomPersister = <ListeningHandle>(
2125
addPersisterListener: (listener: PersisterListener) => ListeningHandle,
2226
delPersisterListener: (listeningHandle: ListeningHandle) => void,
2327
onIgnoredError?: (error: any) => void,
28+
scheduleId = [],
2429
): Persister => {
2530
let listenerId: Id | undefined;
2631
let loadSave = 0;
@@ -29,21 +34,29 @@ export const createCustomPersister = <ListeningHandle>(
2934
let listening = 0;
3035
let action;
3136
let listeningHandle: ListeningHandle | undefined;
32-
let running = 0;
33-
const scheduledActions: Action[] = [];
37+
38+
mapEnsure(scheduleRunning, scheduleId, () => 0);
39+
mapEnsure(scheduleActions, scheduleId, () => []);
3440

3541
const run = async (): Promise<void> => {
3642
/*! istanbul ignore else */
37-
if (!running) {
38-
running = 1;
39-
while (!isUndefined((action = arrayShift(scheduledActions)))) {
43+
if (!mapGet(scheduleRunning, scheduleId)) {
44+
mapSet(scheduleRunning, scheduleId, 1);
45+
while (
46+
!isUndefined(
47+
(action = arrayShift(
48+
mapGet(scheduleActions, scheduleId) as Action[],
49+
)),
50+
)
51+
) {
4052
try {
4153
await action();
4254
} catch (error) {
55+
/*! istanbul ignore next */
4356
onIgnoredError?.(error);
4457
}
4558
}
46-
running = 0;
59+
mapSet(scheduleRunning, scheduleId, 0);
4760
}
4861
};
4962

@@ -128,6 +141,7 @@ export const createCustomPersister = <ListeningHandle>(
128141
try {
129142
await setPersisted(store.getContent, getTransactionChanges);
130143
} catch (error) {
144+
/*! istanbul ignore next */
131145
onIgnoredError?.(error);
132146
}
133147
loadSave = 0;
@@ -155,7 +169,7 @@ export const createCustomPersister = <ListeningHandle>(
155169
},
156170

157171
schedule: async (...actions: Action[]): Promise<Persister> => {
158-
arrayPush(scheduledActions, ...actions);
172+
arrayPush(mapGet(scheduleActions, scheduleId) as Action[], ...actions);
159173
await run();
160174
return persister;
161175
},

src/persisters/persister-cr-sqlite-wasm.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ export const createCrSqliteWasmPersister = ((
2222
(removeListener: () => void): void => removeListener(),
2323
logSql,
2424
onIgnoredError,
25+
db,
2526
)) as typeof createCrSqliteWasmPersisterDecl;

src/persisters/persister-expo-sqlite.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ export const createExpoSqlitePersister = ((
2424
(subscription: Subscription) => subscription.remove(),
2525
logSql,
2626
onIgnoredError,
27+
db,
2728
)) as typeof createExpoSqlitePersisterDecl;

src/persisters/persister-sqlite-wasm.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ export const createSqliteWasmPersister = ((
2828
(): void => sqlite3.capi.sqlite3_update_hook(db, () => 0, 0),
2929
logSql,
3030
onIgnoredError,
31+
db,
3132
)) as typeof createSqliteWasmPersisterDecl;

src/persisters/persister-sqlite3.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ export const createSqlite3Persister = ((
3737
(observer: Observer): any => db.off(CHANGE, observer),
3838
logSql,
3939
onIgnoredError,
40+
db,
4041
)) as typeof createSqlite3PersisterDecl;

src/persisters/sqlite/commands.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const WHERE = 'WHERE';
4343
export const getCommandFunctions = (
4444
cmd: Cmd,
4545
managedTableNames: string[],
46+
onIgnoredError: ((error: any) => void) | undefined,
4647
): [
4748
refreshSchema: () => Promise<Schema>,
4849
loadTable: (tableName: string, rowIdColumnName: string) => Promise<Table>,
@@ -54,6 +55,7 @@ export const getCommandFunctions = (
5455
deleteEmptyTable: boolean,
5556
partial?: boolean,
5657
) => Promise<void>,
58+
transaction: <Return>(actions: () => Promise<Return>) => Promise<Return>,
5759
] => {
5860
const schemaMap: Schema = mapNew();
5961

@@ -266,7 +268,21 @@ export const getCommandFunctions = (
266268
}
267269
};
268270

269-
return [refreshSchema, loadTable, saveTable];
271+
const transaction = async <Return>(
272+
actions: () => Promise<Return>,
273+
): Promise<Return> => {
274+
let result;
275+
await cmd('BEGIN');
276+
try {
277+
result = await actions();
278+
} catch (error) {
279+
onIgnoredError?.(error);
280+
}
281+
await cmd('END');
282+
return result as Return;
283+
};
284+
285+
return [refreshSchema, loadTable, saveTable, transaction];
270286
};
271287

272288
const upsert = async (

src/persisters/sqlite/create.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ export const createSqlitePersister = <UpdateListeningHandle>(
2424
cmd: Cmd,
2525
addUpdateListener: (listener: UpdateListener) => UpdateListeningHandle,
2626
delUpdateListener: (listeningHandle: UpdateListeningHandle) => void,
27-
logSql?: (sql: string, args?: any[]) => void,
28-
onIgnoredError?: (error: any) => void,
27+
logSql: ((sql: string, args?: any[]) => void) | undefined,
28+
onIgnoredError: ((error: any) => void) | undefined,
29+
scheduleId: any,
2930
): Persister => {
3031
let dataVersion: number | null;
3132
let schemaVersion: number | null;
@@ -85,5 +86,6 @@ export const createSqlitePersister = <UpdateListeningHandle>(
8586
onIgnoredError,
8687
defaultedConfig as any,
8788
collValues(managedTableNamesSet),
89+
scheduleId,
8890
);
8991
};

src/persisters/sqlite/json.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,45 @@ export const createJsonSqlitePersister = <ListeningHandle>(
1616
onIgnoredError: ((error: any) => void) | undefined,
1717
[storeTableName]: DefaultedJsonConfig,
1818
managedTableNames: string[],
19+
scheduleId: any,
1920
): Persister => {
20-
const [refreshSchema, loadTable, saveTable] = getCommandFunctions(
21-
cmd,
22-
managedTableNames,
23-
);
21+
const [refreshSchema, loadTable, saveTable, transaction] =
22+
getCommandFunctions(cmd, managedTableNames, onIgnoredError);
2423

25-
const getPersisted = async (): Promise<[Tables, Values]> => {
26-
await refreshSchema();
27-
return jsonParse(
28-
(await loadTable(storeTableName, DEFAULT_ROW_ID_COLUMN_NAME))[
29-
SINGLE_ROW_ID
30-
]?.[STORE_COLUMN] as string,
31-
);
32-
};
24+
const getPersisted = async (): Promise<[Tables, Values]> =>
25+
await transaction(async () => {
26+
await refreshSchema();
27+
return jsonParse(
28+
((await loadTable(storeTableName, DEFAULT_ROW_ID_COLUMN_NAME))[
29+
SINGLE_ROW_ID
30+
]?.[STORE_COLUMN] as string) ?? 'null',
31+
);
32+
});
3333

3434
const setPersisted = async (
3535
getContent: () => [Tables, Values],
3636
): Promise<void> =>
37-
persister.schedule(refreshSchema, async () => {
37+
await transaction(async () => {
38+
await refreshSchema();
3839
await saveTable(
3940
storeTableName,
4041
DEFAULT_ROW_ID_COLUMN_NAME,
4142
{
42-
[SINGLE_ROW_ID]: {[STORE_COLUMN]: jsonString(getContent())},
43+
[SINGLE_ROW_ID]: {[STORE_COLUMN]: jsonString(getContent() ?? null)},
4344
},
4445
true,
4546
true,
4647
);
4748
});
4849

49-
const persister: any = createCustomPersister(
50+
const persister: any = (createCustomPersister as any)(
5051
store,
5152
getPersisted,
5253
setPersisted,
5354
addPersisterListener,
5455
delPersisterListener,
5556
onIgnoredError,
57+
scheduleId,
5658
);
5759

5860
return persister;

src/persisters/sqlite/tabular.ts

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ export const createTabularSqlitePersister = <ListeningHandle>(
2929
[valuesLoad, valuesSave, valuesTableName],
3030
]: DefaultedTabularConfig,
3131
managedTableNames: string[],
32+
scheduleId: any,
3233
): Persister => {
33-
const [refreshSchema, loadTable, saveTable] = getCommandFunctions(
34-
cmd,
35-
managedTableNames,
36-
);
34+
const [refreshSchema, loadTable, saveTable, transaction] =
35+
getCommandFunctions(cmd, managedTableNames, onIgnoredError);
3736

3837
const saveTables = async (
3938
tables:
@@ -109,38 +108,41 @@ export const createTabularSqlitePersister = <ListeningHandle>(
109108
]
110109
: {};
111110

112-
const getPersisted = async (): Promise<[Tables, Values] | undefined> => {
113-
await refreshSchema();
114-
const tables = await loadTables();
115-
const values = await loadValues();
116-
return !objIsEmpty(tables) || !isUndefined(values)
117-
? [tables as Tables, values as Values]
118-
: undefined;
119-
};
111+
const getPersisted = async (): Promise<[Tables, Values] | undefined> =>
112+
await transaction(async () => {
113+
await refreshSchema();
114+
const tables = await loadTables();
115+
const values = await loadValues();
116+
return !objIsEmpty(tables) || !isUndefined(values)
117+
? [tables as Tables, values as Values]
118+
: undefined;
119+
});
120120

121121
const setPersisted = async (
122122
getContent: () => [Tables, Values],
123123
getTransactionChanges?: GetTransactionChanges,
124-
): Promise<void> => {
125-
await refreshSchema();
126-
if (!isUndefined(getTransactionChanges)) {
127-
const [tableChanges, valueChanges] = getTransactionChanges();
128-
await saveTables(tableChanges, true);
129-
await saveValues(valueChanges, true);
130-
} else {
131-
const [tables, values] = getContent();
132-
await saveTables(tables);
133-
await saveValues(values);
134-
}
135-
};
124+
): Promise<void> =>
125+
await transaction(async () => {
126+
await refreshSchema();
127+
if (!isUndefined(getTransactionChanges)) {
128+
const [tableChanges, valueChanges] = getTransactionChanges();
129+
await saveTables(tableChanges, true);
130+
await saveValues(valueChanges, true);
131+
} else {
132+
const [tables, values] = getContent();
133+
await saveTables(tables);
134+
await saveValues(values);
135+
}
136+
});
136137

137-
const persister: any = createCustomPersister(
138+
const persister: any = (createCustomPersister as any)(
138139
store,
139140
getPersisted,
140141
setPersisted,
141142
addPersisterListener,
142143
delPersisterListener,
143144
onIgnoredError,
145+
scheduleId,
144146
);
145147

146148
return persister;

0 commit comments

Comments
 (0)