From 1e0ca334892b46ab0e4491fca09b7fbc1b42401f Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 8 Jan 2025 09:44:16 +0200 Subject: [PATCH 1/3] Fixed single write transanaction operations in `ps_crud` not being processed. Batching update notifications per write lock. --- .changeset/metal-fans-argue.md | 5 ++ .../src/db/OPSQLiteConnection.ts | 89 ++++++++++++++----- .../src/db/OPSqliteAdapter.ts | 29 +++--- 3 files changed, 85 insertions(+), 38 deletions(-) create mode 100644 .changeset/metal-fans-argue.md diff --git a/.changeset/metal-fans-argue.md b/.changeset/metal-fans-argue.md new file mode 100644 index 000000000..330da9d30 --- /dev/null +++ b/.changeset/metal-fans-argue.md @@ -0,0 +1,5 @@ +--- +'@powersync/op-sqlite': minor +--- + +Fixed single write transanaction operations in `ps_crud` not being processed. Batching update notifications per write lock. diff --git a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts index 52f73531b..bb95dd16e 100644 --- a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts +++ b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts @@ -1,40 +1,85 @@ -import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite'; -import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common'; +import { DB, SQLBatchTuple, UpdateHookOperation } from '@op-engineering/op-sqlite'; +import { + BaseObserver, + BatchedUpdateNotification, + DBAdapterListener, + QueryResult, + RowUpdateType, + UpdateNotification +} from '@powersync/common'; export type OPSQLiteConnectionOptions = { baseDB: DB; }; +export type OPSQLiteUpdateNotification = { + table: string; + operation: UpdateHookOperation; + row?: any; + rowId: number; +}; + export class OPSQLiteConnection extends BaseObserver { protected DB: DB; + private updateBuffer: UpdateNotification[]; + constructor(protected options: OPSQLiteConnectionOptions) { super(); this.DB = options.baseDB; + this.updateBuffer = []; + + this.DB.rollbackHook(() => { + this.updateBuffer = []; + }); - // link table update commands this.DB.updateHook((update) => { - this.iterateListeners((cb) => { - let opType: RowUpdateType; - switch (update.operation) { - case 'INSERT': - opType = RowUpdateType.SQLITE_INSERT; - break; - case 'DELETE': - opType = RowUpdateType.SQLITE_DELETE; - break; - case 'UPDATE': - opType = RowUpdateType.SQLITE_UPDATE; - break; - } - cb.tablesUpdated?.({ - table: update.table, - opType, - rowId: update.rowId - }); - }); + this.addTableUpdate(update); }); } + addTableUpdate(update: OPSQLiteUpdateNotification) { + let opType: RowUpdateType; + switch (update.operation) { + case 'INSERT': + opType = RowUpdateType.SQLITE_INSERT; + break; + case 'DELETE': + opType = RowUpdateType.SQLITE_DELETE; + break; + case 'UPDATE': + opType = RowUpdateType.SQLITE_UPDATE; + break; + } + + this.updateBuffer.push({ + table: update.table, + opType, + rowId: update.rowId + }); + } + + flushUpdates() { + if (!this.updateBuffer.length) { + return; + } + + const groupedUpdates = this.updateBuffer.reduce((grouping: Record, update) => { + const { table } = update; + const updateGroup = grouping[table] || (grouping[table] = []); + updateGroup.push(update); + return grouping; + }, {}); + + const batchedUpdate: BatchedUpdateNotification = { + groupedUpdates, + rawUpdates: this.updateBuffer, + tables: Object.keys(groupedUpdates) + }; + + this.updateBuffer = []; + this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate)); + } + close() { return this.DB.close(); } diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts index 37a962ef1..4b5be5ca9 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts @@ -1,12 +1,4 @@ -import { - BaseObserver, - DBAdapter, - DBAdapterListener, - DBLockOptions, - QueryResult, - SQLOpenOptions, - Transaction -} from '@powersync/common'; +import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common'; import { ANDROID_DATABASE_PATH, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite'; import Lock from 'async-lock'; import { OPSQLiteConnection } from './OPSQLiteConnection'; @@ -194,13 +186,18 @@ export class OPSQLiteDBAdapter extends BaseObserver implement return new Promise(async (resolve, reject) => { try { - await this.locks.acquire( - LockType.WRITE, - async () => { - resolve(await fn(this.writeConnection!)); - }, - { timeout: options?.timeoutMs } - ); + await this.locks + .acquire( + LockType.WRITE, + async () => { + resolve(await fn(this.writeConnection!)); + }, + { timeout: options?.timeoutMs } + ) + .then(() => { + // flush updates once a write lock has been released + this.writeConnection!.flushUpdates(); + }); } catch (ex) { reject(ex); } From cf7595cb426a10097cebfc5561ea86dfd511931b Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 8 Jan 2025 11:05:14 +0200 Subject: [PATCH 2/3] Update metal-fans-argue.md --- .changeset/metal-fans-argue.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.changeset/metal-fans-argue.md b/.changeset/metal-fans-argue.md index 330da9d30..c20b55d27 100644 --- a/.changeset/metal-fans-argue.md +++ b/.changeset/metal-fans-argue.md @@ -2,4 +2,5 @@ '@powersync/op-sqlite': minor --- -Fixed single write transanaction operations in `ps_crud` not being processed. Batching update notifications per write lock. +Fixed single write transaction operations in `ps_crud` not being processed. Batching update notifications per write lock. +This will also affect/fix downstream features such as watched queries and reactive query hooks. From 8441600aa32727643e557a6066e271de214668c8 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 8 Jan 2025 11:07:25 +0200 Subject: [PATCH 3/3] Update metal-fans-argue.md --- .changeset/metal-fans-argue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/metal-fans-argue.md b/.changeset/metal-fans-argue.md index c20b55d27..e933212c1 100644 --- a/.changeset/metal-fans-argue.md +++ b/.changeset/metal-fans-argue.md @@ -3,4 +3,4 @@ --- Fixed single write transaction operations in `ps_crud` not being processed. Batching update notifications per write lock. -This will also affect/fix downstream features such as watched queries and reactive query hooks. +This will also fix downstream features such as watched queries and reactive query hooks in cases where the query is fired before the data was committed, and batching will improve performance specifically in cases where a lot of data changes occur.