Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/metal-fans-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/op-sqlite': minor
---

Fixed single write transanaction operations in `ps_crud` not being processed. Batching update notifications per write lock.
89 changes: 67 additions & 22 deletions packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,85 @@
import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite';
import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common';
import { DB, SQLBatchTuple, UpdateHookOperation } from '@op-engineering/op-sqlite';
import {
BaseObserver,
BatchedUpdateNotification,
DBAdapterListener,
QueryResult,
RowUpdateType,
UpdateNotification
} from '@powersync/common';

export type OPSQLiteConnectionOptions = {
baseDB: DB;
};

export type OPSQLiteUpdateNotification = {
table: string;
operation: UpdateHookOperation;
row?: any;
rowId: number;
};

export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
protected DB: DB;
private updateBuffer: UpdateNotification[];

constructor(protected options: OPSQLiteConnectionOptions) {
super();
this.DB = options.baseDB;
this.updateBuffer = [];

this.DB.rollbackHook(() => {
this.updateBuffer = [];
});

// link table update commands
this.DB.updateHook((update) => {
this.iterateListeners((cb) => {
let opType: RowUpdateType;
switch (update.operation) {
case 'INSERT':
opType = RowUpdateType.SQLITE_INSERT;
break;
case 'DELETE':
opType = RowUpdateType.SQLITE_DELETE;
break;
case 'UPDATE':
opType = RowUpdateType.SQLITE_UPDATE;
break;
}
cb.tablesUpdated?.({
table: update.table,
opType,
rowId: update.rowId
});
});
this.addTableUpdate(update);
});
}

addTableUpdate(update: OPSQLiteUpdateNotification) {
let opType: RowUpdateType;
switch (update.operation) {
case 'INSERT':
opType = RowUpdateType.SQLITE_INSERT;
break;
case 'DELETE':
opType = RowUpdateType.SQLITE_DELETE;
break;
case 'UPDATE':
opType = RowUpdateType.SQLITE_UPDATE;
break;
}

this.updateBuffer.push({
table: update.table,
opType,
rowId: update.rowId
});
}

flushUpdates() {
if (!this.updateBuffer.length) {
return;
}

const groupedUpdates = this.updateBuffer.reduce((grouping: Record<string, UpdateNotification[]>, update) => {
const { table } = update;
const updateGroup = grouping[table] || (grouping[table] = []);
updateGroup.push(update);
return grouping;
}, {});

const batchedUpdate: BatchedUpdateNotification = {
groupedUpdates,
rawUpdates: this.updateBuffer,
tables: Object.keys(groupedUpdates)
};

this.updateBuffer = [];
this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate));
}

close() {
return this.DB.close();
}
Expand Down
29 changes: 13 additions & 16 deletions packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import {
BaseObserver,
DBAdapter,
DBAdapterListener,
DBLockOptions,
QueryResult,
SQLOpenOptions,
Transaction
} from '@powersync/common';
import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common';
import { ANDROID_DATABASE_PATH, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite';
import Lock from 'async-lock';
import { OPSQLiteConnection } from './OPSQLiteConnection';
Expand Down Expand Up @@ -194,13 +186,18 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement

return new Promise(async (resolve, reject) => {
try {
await this.locks.acquire(
LockType.WRITE,
async () => {
resolve(await fn(this.writeConnection!));
},
{ timeout: options?.timeoutMs }
);
await this.locks
.acquire(
LockType.WRITE,
async () => {
resolve(await fn(this.writeConnection!));
},
{ timeout: options?.timeoutMs }
)
.then(() => {
// flush updates once a write lock has been released
this.writeConnection!.flushUpdates();
});
} catch (ex) {
reject(ex);
}
Expand Down
Loading