Skip to content

Commit 1125911

Browse files
committed
Using iterateAsyncListeners for schema change event.
Moved optionalSync test schema to separate file.
1 parent 73daf49 commit 1125911

File tree

5 files changed

+161
-76
lines changed

5 files changed

+161
-76
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ export interface WatchOnChangeHandler {
103103

104104
export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
105105
initialized: () => void;
106-
schemaChanged: (schema: Schema) => void;
106+
schemaChanged: (schema: Schema) => Promise<void>;
107107
}
108108

109109
export interface PowerSyncCloseOptions {
@@ -359,8 +359,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
359359
this.options.logger?.warn('Schema validation failed. Unexpected behaviour could occur', ex);
360360
}
361361
this._schema = schema;
362+
362363
await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
363-
this.iterateListeners((cb) => cb.schemaChanged?.(schema));
364+
await this.iterateAsyncListeners(async (cb) => cb.schemaChanged?.(schema));
364365
}
365366

366367
/**
@@ -761,7 +762,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
761762
const watchQuery = async (abortSignal: AbortSignal) => {
762763
try {
763764
const resolvedTables = await this.resolveTables(sql, parameters, options);
764-
765765
// Fetch initial data
766766
const result = await this.executeReadOnly(sql, parameters);
767767
onResult(result);
@@ -790,11 +790,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
790790
}
791791
};
792792

793-
const triggerWatchedQuery = () => {
793+
const triggerWatchedQuery = async () => {
794794
const abortController = new AbortController();
795-
796795
let disposeSchemaListener: (() => void) | null = null;
797-
798796
const stopWatching = () => {
799797
abortController.abort('Abort triggered');
800798
disposeSchemaListener?.();
@@ -804,16 +802,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
804802
};
805803

806804
options?.signal?.addEventListener('abort', stopWatching);
807-
808805
disposeSchemaListener = this.registerListener({
809-
schemaChanged: () => {
806+
schemaChanged: async () => {
810807
stopWatching();
811808
// Re trigger the watched query (recursively)
812-
triggerWatchedQuery();
809+
await triggerWatchedQuery();
813810
}
814811
});
815812

816-
return watchQuery(abortController.signal);
813+
await watchQuery(abortController.signal);
817814
};
818815

819816
triggerWatchedQuery();

packages/web/tests/offline.test.ts

Lines changed: 4 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,12 @@
11
import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common';
22
import { PowerSyncDatabase } from '@powersync/web';
33
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
4+
import { makeOptionalSyncSchema } from './utils/optionalSyncTestSchema';
45

56
const assetId = '2290de4f-0488-4e50-abed-f8e8eb1d0b42';
67
const userId = '3390de4f-0488-4e50-abed-f8e8eb1d0b42';
78
const customerId = '4490de4f-0488-4e50-abed-f8e8eb1d0b42';
89

9-
const assetsDef = {
10-
name: 'assets',
11-
columns: {
12-
created_at: column.text,
13-
make: column.text,
14-
model: column.text,
15-
serial_number: column.text,
16-
quantity: column.integer,
17-
user_id: column.text,
18-
customer_id: column.text,
19-
description: column.text
20-
},
21-
options: { indexes: { makemodel: ['make, model'] } }
22-
};
23-
24-
const customersDef = {
25-
name: 'customers',
26-
columns: {
27-
name: column.text,
28-
email: column.text
29-
},
30-
options: {}
31-
};
32-
33-
function makeSchema(synced: boolean) {
34-
const syncedName = (table: string): string => {
35-
if (synced) {
36-
return table;
37-
} else {
38-
return `inactive_synced_${table}`;
39-
}
40-
};
41-
42-
const localName = (table: string): string => {
43-
if (synced) {
44-
return `inactive_local_${table}`;
45-
} else {
46-
return table;
47-
}
48-
};
49-
return new Schema({
50-
assets: new Table(assetsDef.columns, { ...assetsDef.options, viewName: syncedName(assetsDef.name) }),
51-
local_assets: new Table(assetsDef.columns, {
52-
...assetsDef.options,
53-
localOnly: true,
54-
viewName: localName(assetsDef.name)
55-
}),
56-
customers: new Table(customersDef.columns, { ...customersDef.options, viewName: syncedName(customersDef.name) }),
57-
local_customers: new Table(customersDef.columns, {
58-
...customersDef.options,
59-
localOnly: true,
60-
viewName: localName(customersDef.name)
61-
})
62-
});
63-
}
64-
6510
describe('Schema Tests', () => {
6611
let db: AbstractPowerSyncDatabase;
6712

@@ -73,7 +18,7 @@ describe('Schema Tests', () => {
7318
* consistent
7419
*/
7520
database: { dbFilename: 'test.db' },
76-
schema: makeSchema(false),
21+
schema: makeOptionalSyncSchema(false),
7722
flags: {
7823
enableMultiTabs: false
7924
}
@@ -98,7 +43,7 @@ describe('Schema Tests', () => {
9843
expect(await db.getAll('SELECT data FROM ps_crud ORDER BY id')).toEqual([]);
9944

10045
// Now switch to the "online" schema
101-
await db.updateSchema(makeSchema(true));
46+
await db.updateSchema(makeOptionalSyncSchema(true));
10247

10348
// Note that updateSchema cannot be called inside a transaction, and there
10449
// is a possibility of crash between updating the schema, and when the data
@@ -148,7 +93,7 @@ describe('Schema Tests', () => {
14893
const customerWatchTables = await getSourceTables(db, 'SELECT * FROM customers');
14994
expect(customerWatchTables.includes('ps_data_local__local_customers')).toBeTruthy();
15095

151-
await db.updateSchema(makeSchema(true));
96+
await db.updateSchema(makeOptionalSyncSchema(true));
15297

15398
const onlineCustomerWatchTables = await getSourceTables(db, 'SELECT * FROM customers');
15499
expect(onlineCustomerWatchTables.includes('ps_data__customers')).toBeTruthy();
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { column, Schema, Table } from '@powersync/common';
2+
3+
const assetsDef = {
4+
name: 'assets',
5+
columns: {
6+
created_at: column.text,
7+
make: column.text,
8+
model: column.text,
9+
serial_number: column.text,
10+
quantity: column.integer,
11+
user_id: column.text,
12+
customer_id: column.text,
13+
description: column.text
14+
},
15+
options: {}
16+
};
17+
18+
const customersDef = {
19+
name: 'customers',
20+
columns: {
21+
name: column.text,
22+
email: column.text
23+
},
24+
options: {}
25+
};
26+
27+
export function makeOptionalSyncSchema(synced: boolean) {
28+
const syncedName = (table: string): string => {
29+
if (synced) {
30+
return table;
31+
} else {
32+
return `inactive_synced_${table}`;
33+
}
34+
};
35+
36+
const localName = (table: string): string => {
37+
if (synced) {
38+
return `inactive_local_${table}`;
39+
} else {
40+
return table;
41+
}
42+
};
43+
44+
return new Schema({
45+
assets: new Table(assetsDef.columns, { ...assetsDef.options, viewName: syncedName(assetsDef.name) }),
46+
local_assets: new Table(assetsDef.columns, {
47+
...assetsDef.options,
48+
localOnly: true,
49+
viewName: localName(assetsDef.name)
50+
}),
51+
customers: new Table(customersDef.columns, { ...customersDef.options, viewName: syncedName(customersDef.name) }),
52+
local_customers: new Table(customersDef.columns, {
53+
...customersDef.options,
54+
localOnly: true,
55+
viewName: localName(customersDef.name)
56+
})
57+
});
58+
}

packages/web/tests/watch.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
22
import { v4 as uuid } from 'uuid';
3-
import { AbstractPowerSyncDatabase } from '@powersync/common';
3+
import { AbstractPowerSyncDatabase, QueryResult } from '@powersync/common';
44
import { PowerSyncDatabase } from '@powersync/web';
55
import { testSchema } from './utils/testDb';
6+
import { makeOptionalSyncSchema } from './utils/optionalSyncTestSchema';
67
vi.useRealTimers();
78

89
/**
@@ -26,6 +27,7 @@ describe('Watch Tests', () => {
2627
enableMultiTabs: false
2728
}
2829
});
30+
await powersync.init();
2931
});
3032

3133
afterEach(async () => {
@@ -83,6 +85,7 @@ describe('Watch Tests', () => {
8385
const receivedUpdates = new Promise<void>((resolve) => {
8486
const onUpdate = () => {
8587
receivedUpdatesCount++;
88+
8689
if (receivedUpdatesCount == updatesCount) {
8790
abortController.abort();
8891
resolve();
@@ -286,8 +289,6 @@ describe('Watch Tests', () => {
286289
});
287290

288291
it('should throttle watch callback overflow', async () => {
289-
const abortController = new AbortController();
290-
291292
const updatesCount = 25;
292293

293294
let receivedWithManagedOverflowCount = 0;
@@ -300,7 +301,7 @@ describe('Watch Tests', () => {
300301
'SELECT count() AS count FROM assets',
301302
[],
302303
{ onResult: onResultOverflow },
303-
{ signal: overflowAbortController.signal, throttleMs: 1 }
304+
{ signal: overflowAbortController.signal, throttleMs: 100 }
304305
);
305306

306307
// Allows us to count the number of updates received without the initial trigger
@@ -313,10 +314,9 @@ describe('Watch Tests', () => {
313314

314315
await new Promise<void>((resolve) => setTimeout(resolve, 1 * throttleDuration));
315316

316-
abortController.abort();
317317
overflowAbortController.abort();
318318

319-
// Initial onResult plus two left after overflow was throttled for onChange triggers
320-
expect(receivedWithManagedOverflowCount).toBe(3);
319+
// Initial onResult plus 1 left after overflow was throttled for onChange triggers
320+
expect(receivedWithManagedOverflowCount).toBe(2);
321321
});
322322
});
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
2+
import { v4 as uuid } from 'uuid';
3+
import { AbstractPowerSyncDatabase, QueryResult } from '@powersync/common';
4+
import { PowerSyncDatabase } from '@powersync/web';
5+
import { testSchema } from './utils/testDb';
6+
import { makeOptionalSyncSchema } from './utils/optionalSyncTestSchema';
7+
vi.useRealTimers();
8+
9+
/**
10+
* There seems to be an issue with Vitest browser mode's setTimeout and
11+
* fake timer functionality.
12+
* e.g. calling:
13+
* await new Promise<void>((resolve) => setTimeout(resolve, 10));
14+
* waits for 1 second instead of 10ms.
15+
* Setting this to 1 second as a work around.
16+
*/
17+
const throttleDuration = 1000;
18+
19+
describe('Watch With Schema Change Tests', () => {
20+
let powersync: AbstractPowerSyncDatabase;
21+
22+
beforeEach(async () => {
23+
powersync = new PowerSyncDatabase({
24+
database: { dbFilename: 'test-watch-optional-sync.db' },
25+
schema: makeOptionalSyncSchema(false),
26+
flags: {
27+
enableMultiTabs: false
28+
}
29+
});
30+
await powersync.init();
31+
});
32+
33+
afterEach(async () => {
34+
await powersync.disconnectAndClear();
35+
await powersync.close();
36+
});
37+
38+
it('should trigger onResult after schema change', async () => {
39+
const userId = uuid();
40+
const abortController = new AbortController();
41+
42+
let lastResult = 0;
43+
44+
const onUpdate = (results: QueryResult) => {
45+
lastResult = results.rows?._array[0].count; // `count` from sql output
46+
};
47+
48+
powersync.watch(
49+
'SELECT count() AS count FROM assets',
50+
[],
51+
{ onResult: onUpdate },
52+
{ signal: abortController.signal, throttleMs: throttleDuration }
53+
);
54+
55+
// Insert 3 records
56+
for (let i = 0; i < 3; i++) {
57+
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', userId]);
58+
}
59+
await new Promise<void>((resolve) => setTimeout(resolve, throttleDuration * 2));
60+
61+
expect(lastResult).toBe(3);
62+
63+
await powersync.updateSchema(makeOptionalSyncSchema(true));
64+
65+
await powersync.writeTransaction(async (tx) => {
66+
// Copy local data to the "online" views.
67+
68+
await tx.execute(
69+
'INSERT INTO assets(id, description, customer_id, user_id) SELECT id, description, customer_id, ? FROM inactive_local_assets',
70+
[userId]
71+
);
72+
73+
// Delete the "local-only" data.
74+
await tx.execute('DELETE FROM inactive_local_customers');
75+
await tx.execute('DELETE FROM inactive_local_assets');
76+
});
77+
78+
for (let i = 0; i < 3; i++) {
79+
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]);
80+
}
81+
await new Promise<void>((resolve) => setTimeout(resolve, throttleDuration * 2));
82+
83+
expect(lastResult).toBe(6);
84+
});
85+
});

0 commit comments

Comments
 (0)