Skip to content

Commit 1e0ca33

Browse files
committed
Fixed single write transanaction operations in ps_crud not being processed. Batching update notifications per write lock.
1 parent 3f8f748 commit 1e0ca33

File tree

3 files changed

+85
-38
lines changed

3 files changed

+85
-38
lines changed

.changeset/metal-fans-argue.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/op-sqlite': minor
3+
---
4+
5+
Fixed single write transanaction operations in `ps_crud` not being processed. Batching update notifications per write lock.

packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,85 @@
1-
import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite';
2-
import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common';
1+
import { DB, SQLBatchTuple, UpdateHookOperation } from '@op-engineering/op-sqlite';
2+
import {
3+
BaseObserver,
4+
BatchedUpdateNotification,
5+
DBAdapterListener,
6+
QueryResult,
7+
RowUpdateType,
8+
UpdateNotification
9+
} from '@powersync/common';
310

411
export type OPSQLiteConnectionOptions = {
512
baseDB: DB;
613
};
714

15+
export type OPSQLiteUpdateNotification = {
16+
table: string;
17+
operation: UpdateHookOperation;
18+
row?: any;
19+
rowId: number;
20+
};
21+
822
export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
923
protected DB: DB;
24+
private updateBuffer: UpdateNotification[];
25+
1026
constructor(protected options: OPSQLiteConnectionOptions) {
1127
super();
1228
this.DB = options.baseDB;
29+
this.updateBuffer = [];
30+
31+
this.DB.rollbackHook(() => {
32+
this.updateBuffer = [];
33+
});
1334

14-
// link table update commands
1535
this.DB.updateHook((update) => {
16-
this.iterateListeners((cb) => {
17-
let opType: RowUpdateType;
18-
switch (update.operation) {
19-
case 'INSERT':
20-
opType = RowUpdateType.SQLITE_INSERT;
21-
break;
22-
case 'DELETE':
23-
opType = RowUpdateType.SQLITE_DELETE;
24-
break;
25-
case 'UPDATE':
26-
opType = RowUpdateType.SQLITE_UPDATE;
27-
break;
28-
}
29-
cb.tablesUpdated?.({
30-
table: update.table,
31-
opType,
32-
rowId: update.rowId
33-
});
34-
});
36+
this.addTableUpdate(update);
3537
});
3638
}
3739

40+
addTableUpdate(update: OPSQLiteUpdateNotification) {
41+
let opType: RowUpdateType;
42+
switch (update.operation) {
43+
case 'INSERT':
44+
opType = RowUpdateType.SQLITE_INSERT;
45+
break;
46+
case 'DELETE':
47+
opType = RowUpdateType.SQLITE_DELETE;
48+
break;
49+
case 'UPDATE':
50+
opType = RowUpdateType.SQLITE_UPDATE;
51+
break;
52+
}
53+
54+
this.updateBuffer.push({
55+
table: update.table,
56+
opType,
57+
rowId: update.rowId
58+
});
59+
}
60+
61+
flushUpdates() {
62+
if (!this.updateBuffer.length) {
63+
return;
64+
}
65+
66+
const groupedUpdates = this.updateBuffer.reduce((grouping: Record<string, UpdateNotification[]>, update) => {
67+
const { table } = update;
68+
const updateGroup = grouping[table] || (grouping[table] = []);
69+
updateGroup.push(update);
70+
return grouping;
71+
}, {});
72+
73+
const batchedUpdate: BatchedUpdateNotification = {
74+
groupedUpdates,
75+
rawUpdates: this.updateBuffer,
76+
tables: Object.keys(groupedUpdates)
77+
};
78+
79+
this.updateBuffer = [];
80+
this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate));
81+
}
82+
3883
close() {
3984
return this.DB.close();
4085
}

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
1-
import {
2-
BaseObserver,
3-
DBAdapter,
4-
DBAdapterListener,
5-
DBLockOptions,
6-
QueryResult,
7-
SQLOpenOptions,
8-
Transaction
9-
} from '@powersync/common';
1+
import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common';
102
import { ANDROID_DATABASE_PATH, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite';
113
import Lock from 'async-lock';
124
import { OPSQLiteConnection } from './OPSQLiteConnection';
@@ -194,13 +186,18 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
194186

195187
return new Promise(async (resolve, reject) => {
196188
try {
197-
await this.locks.acquire(
198-
LockType.WRITE,
199-
async () => {
200-
resolve(await fn(this.writeConnection!));
201-
},
202-
{ timeout: options?.timeoutMs }
203-
);
189+
await this.locks
190+
.acquire(
191+
LockType.WRITE,
192+
async () => {
193+
resolve(await fn(this.writeConnection!));
194+
},
195+
{ timeout: options?.timeoutMs }
196+
)
197+
.then(() => {
198+
// flush updates once a write lock has been released
199+
this.writeConnection!.flushUpdates();
200+
});
204201
} catch (ex) {
205202
reject(ex);
206203
}

0 commit comments

Comments
 (0)