Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-radios-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Deprecated `rawTableNames` field in `SQLWatchOptions`. All tables specified in `tables` will now be watched, including PowerSync tables with prefixes.
22 changes: 7 additions & 15 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ export interface SQLWatchOptions {
/** The minimum interval between queries. */
throttleMs?: number;
/**
* @deprecated All tables specified in {@link tables} will be watched, including PowerSync tables with prefixes.
*
* Allows for watching any SQL table
* by not removing PowerSync table name prefixes
*/
Expand Down Expand Up @@ -889,7 +891,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

const resolvedOptions = options ?? {};
const watchedTables = new Set(resolvedOptions.tables ?? []);
const watchedTables = new Set<string>(
(resolvedOptions?.tables ?? []).flatMap((table) => [table, `ps_data__${table}`, `ps_data_local__${table}`])
);

const changedTables = new Set<string>();
const throttleMs = resolvedOptions.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS;
Expand All @@ -910,8 +914,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const dispose = this.database.registerListener({
tablesUpdated: async (update) => {
try {
const { rawTableNames } = resolvedOptions;
this.processTableUpdates(update, rawTableNames, changedTables);
this.processTableUpdates(update, changedTables);
flushTableUpdates();
} catch (error) {
onError?.(error);
Expand Down Expand Up @@ -976,24 +979,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

private processTableUpdates(
updateNotification: BatchedUpdateNotification | UpdateNotification,
rawTableNames: boolean | undefined,
changedTables: Set<string>
): void {
const tables = isBatchedUpdateNotification(updateNotification)
? updateNotification.tables
: [updateNotification.table];

const filteredTables = rawTableNames ? tables : tables.filter((t) => !!t.match(POWERSYNC_TABLE_MATCH));
if (!filteredTables.length) {
return;
}

// Remove any PowerSync table prefixes if necessary
const mappedTableNames = rawTableNames
? filteredTables
: filteredTables.map((t) => t.replace(POWERSYNC_TABLE_MATCH, ''));

for (const table of mappedTableNames) {
for (const table of tables) {
changedTables.add(table);
}
}
Expand Down
60 changes: 60 additions & 0 deletions packages/web/tests/on_change.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { AbstractPowerSyncDatabase, WatchOnChangeEvent } from '@powersync/common';
import { PowerSyncDatabase } from '@powersync/web';
import { v4 as uuid } from 'uuid';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { testSchema } from './utils/testDb';

const UPLOAD_TIMEOUT_MS = 3000;

describe('OnChange Tests', () => {
let powersync: AbstractPowerSyncDatabase;

beforeEach(async () => {
powersync = new PowerSyncDatabase({
database: { dbFilename: 'test-watch.db' },
schema: testSchema,
flags: {
enableMultiTabs: false
}
});
});

afterEach(async () => {
await powersync.disconnectAndClear();
await powersync.close();
});

async function runOnChangeTest(tablesToWatch: string[], expectedChangedTables: string[]) {
const changedTables: string[] = [];
const abortController = new AbortController();
const onChange = vi.fn((event: WatchOnChangeEvent) => {
changedTables.push(...event.changedTables);
});

powersync.onChange({ onChange }, { tables: tablesToWatch, signal: abortController.signal });
powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]);
await vi.waitFor(
() => {
expect(onChange).toHaveBeenCalled();
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

abortController.abort();
expect(changedTables).toEqual(expectedChangedTables);
}

it('basic onChange test', async () => {
await runOnChangeTest(['assets'], ['ps_data__assets']);
});

it('internal "ps_data" table onChange test', async () => {
await runOnChangeTest(['ps_data__assets'], ['ps_data__assets']);
});

it('internal "ps_oplog" table onChange test', async () => {
await runOnChangeTest(['ps_oplog'], ['ps_oplog']);
});
});
Loading