Skip to content

Commit febcd22

Browse files
committed
Added watch method to Kysely DB Wrapper API.
1 parent c841612 commit febcd22

File tree

3 files changed

+288
-4
lines changed

3 files changed

+288
-4
lines changed

packages/kysely-driver/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { wrapPowerSyncWithKysely } from './sqlite/db';
1+
import { wrapPowerSyncWithKysely, type PowerSyncKyselyDatabase } from './sqlite/db';
22
import {
33
type ColumnType,
44
type Insertable,
@@ -19,5 +19,6 @@ export {
1919
KyselyConfig,
2020
sql,
2121
Kysely,
22+
PowerSyncKyselyDatabase,
2223
wrapPowerSyncWithKysely
2324
};
Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { type AbstractPowerSyncDatabase } from '@powersync/common';
1+
import {
2+
CompilableQuery,
3+
compilableQueryWatch,
4+
CompilableQueryWatchHandler,
5+
SQLWatchOptions,
6+
type AbstractPowerSyncDatabase
7+
} from '@powersync/common';
28
import { Dialect, Kysely, type KyselyConfig } from 'kysely';
39
import { PowerSyncDialect } from './sqlite-dialect';
410

@@ -9,11 +15,24 @@ export type PowerSyncKyselyOptions = Omit<KyselyConfig, 'dialect'> & {
915
dialect?: Dialect;
1016
};
1117

12-
export const wrapPowerSyncWithKysely = <T>(db: AbstractPowerSyncDatabase, options?: PowerSyncKyselyOptions) => {
13-
return new Kysely<T>({
18+
export type PowerSyncKyselyDatabase<T> = Kysely<T> & {
19+
watch: <K>(query: CompilableQuery<K>, handler: CompilableQueryWatchHandler<K>, options?: SQLWatchOptions) => void;
20+
};
21+
22+
export const wrapPowerSyncWithKysely = <T>(
23+
db: AbstractPowerSyncDatabase,
24+
options?: PowerSyncKyselyOptions
25+
): PowerSyncKyselyDatabase<T> => {
26+
const kysely = new Kysely<T>({
1427
dialect: new PowerSyncDialect({
1528
db
1629
}),
1730
...options
1831
});
32+
33+
return Object.assign(kysely, {
34+
watch: <K>(query: CompilableQuery<K>, handler: CompilableQueryWatchHandler<K>, options?: SQLWatchOptions) => {
35+
compilableQueryWatch(db, query, handler, options);
36+
}
37+
});
1938
};
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common';
2+
import { PowerSyncDatabase } from '@powersync/web';
3+
import { sql } from 'kysely';
4+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
5+
import * as SUT from '../../src/sqlite/db';
6+
7+
vi.useRealTimers();
8+
9+
const assetsPs = new Table(
10+
{
11+
created_at: column.text,
12+
make: column.text,
13+
model: column.text,
14+
serial_number: column.text,
15+
quantity: column.integer,
16+
user_id: column.text,
17+
customer_id: column.text,
18+
description: column.text
19+
},
20+
{ indexes: { makemodel: ['make, model'] } }
21+
);
22+
23+
const customersPs = new Table({
24+
name: column.text,
25+
email: column.text
26+
});
27+
28+
const PsSchema = new Schema({ assets: assetsPs, customers: customersPs });
29+
export type Database = (typeof PsSchema)['types'];
30+
31+
/**
32+
* There seems to be an issue with Vitest browser mode's setTimeout and
33+
* fake timer functionality.
34+
* e.g. calling:
35+
* await new Promise<void>((resolve) => setTimeout(resolve, 10));
36+
* waits for 1 second instead of 10ms.
37+
* Setting this to 1 second as a work around.
38+
*/
39+
const throttleDuration = 1000;
40+
41+
describe('Watch Tests', () => {
42+
let powerSyncDb: AbstractPowerSyncDatabase;
43+
let db: SUT.PowerSyncKyselyDatabase<Database>;
44+
45+
beforeEach(async () => {
46+
powerSyncDb = new PowerSyncDatabase({
47+
database: {
48+
dbFilename: 'test.db'
49+
},
50+
schema: PsSchema
51+
});
52+
db = SUT.wrapPowerSyncWithKysely<Database>(powerSyncDb);
53+
54+
await powerSyncDb.init();
55+
});
56+
57+
afterEach(async () => {
58+
await powerSyncDb.disconnectAndClear();
59+
});
60+
61+
it('watch outside throttle limits', async () => {
62+
const abortController = new AbortController();
63+
64+
const updatesCount = 2;
65+
let receivedUpdatesCount = 0;
66+
67+
/**
68+
* Promise which resolves once we received the same amount of update
69+
* notifications as there are inserts.
70+
*/
71+
const receivedUpdates = new Promise<void>((resolve) => {
72+
const onUpdate = () => {
73+
receivedUpdatesCount++;
74+
75+
if (receivedUpdatesCount == updatesCount) {
76+
abortController.abort();
77+
resolve();
78+
}
79+
};
80+
81+
const query = db
82+
.selectFrom('assets')
83+
.innerJoin('customers', 'customers.id', 'assets.customer_id')
84+
.select(db.fn.count('assets.id').as('count'));
85+
86+
db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration });
87+
});
88+
89+
for (let updateCount = 0; updateCount < updatesCount; updateCount++) {
90+
await db
91+
.insertInto('assets')
92+
.values({
93+
id: sql`uuid()`,
94+
make: 'test',
95+
customer_id: sql`uuid()`
96+
})
97+
.execute();
98+
99+
// Wait the throttle duration, ensuring a watch update for each insert
100+
await new Promise<void>((resolve) => setTimeout(resolve, throttleDuration));
101+
}
102+
103+
await receivedUpdates;
104+
expect(receivedUpdatesCount).equals(updatesCount);
105+
});
106+
107+
it('watch inside throttle limits', async () => {
108+
const abortController = new AbortController();
109+
110+
const updatesCount = 5;
111+
let receivedUpdatesCount = 0;
112+
113+
const onUpdate = () => {
114+
receivedUpdatesCount++;
115+
};
116+
117+
const query = db
118+
.selectFrom('assets')
119+
.innerJoin('customers', 'customers.id', 'assets.customer_id')
120+
.select(db.fn.count('assets.id').as('count'));
121+
122+
db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration });
123+
124+
// Create the inserts as fast as possible
125+
for (let updateCount = 0; updateCount < updatesCount; updateCount++) {
126+
await db
127+
.insertInto('assets')
128+
.values({
129+
id: sql`uuid()`,
130+
make: 'test',
131+
customer_id: sql`uuid()`
132+
})
133+
.execute();
134+
}
135+
136+
await new Promise<void>((resolve) => setTimeout(resolve, throttleDuration * 2));
137+
abortController.abort();
138+
139+
// There should be one initial result plus one throttled result
140+
expect(receivedUpdatesCount).equals(2);
141+
});
142+
143+
it('should only watch tables inside query', async () => {
144+
const assetsAbortController = new AbortController();
145+
146+
let receivedAssetsUpdatesCount = 0;
147+
const onWatchAssets = () => {
148+
receivedAssetsUpdatesCount++;
149+
};
150+
151+
const queryAssets = db.selectFrom('assets').select(db.fn.count('assets.id').as('count'));
152+
db.watch(
153+
queryAssets,
154+
{ onResult: onWatchAssets },
155+
{
156+
signal: assetsAbortController.signal
157+
}
158+
);
159+
160+
const customersAbortController = new AbortController();
161+
162+
let receivedCustomersUpdatesCount = 0;
163+
const onWatchCustomers = () => {
164+
receivedCustomersUpdatesCount++;
165+
};
166+
167+
const queryCustomers = db.selectFrom('customers').select(db.fn.count('customers.id').as('count'));
168+
169+
db.watch(
170+
queryCustomers,
171+
{ onResult: onWatchCustomers },
172+
{
173+
signal: customersAbortController.signal
174+
}
175+
);
176+
177+
// Ensures insert doesn't form part of initial result
178+
await new Promise<void>((resolve) => setTimeout(resolve, throttleDuration));
179+
180+
await db
181+
.insertInto('assets')
182+
.values({
183+
id: sql`uuid()`,
184+
make: 'test',
185+
customer_id: sql`uuid()`
186+
})
187+
.execute();
188+
189+
await new Promise<void>((resolve) => setTimeout(resolve, throttleDuration * 2));
190+
assetsAbortController.abort();
191+
customersAbortController.abort();
192+
193+
// There should be one initial result plus one throttled result
194+
expect(receivedAssetsUpdatesCount).equals(2);
195+
196+
// Only the initial result should have yielded.
197+
expect(receivedCustomersUpdatesCount).equals(1);
198+
});
199+
200+
it('should handle watch onError', async () => {
201+
const abortController = new AbortController();
202+
const onResult = () => {}; // no-op
203+
let receivedErrorCount = 0;
204+
205+
const receivedError = new Promise<void>(async (resolve) => {
206+
const onError = () => {
207+
receivedErrorCount++;
208+
resolve();
209+
};
210+
211+
const query = db.selectFrom('assets').select([
212+
() => {
213+
const fullName = sql<string>`fakeFunction()`; // Simulate an error with invalid function
214+
return fullName.as('full_name');
215+
}
216+
]);
217+
218+
db.watch(query, { onResult, onError }, { signal: abortController.signal, throttleMs: throttleDuration });
219+
});
220+
abortController.abort();
221+
222+
await receivedError;
223+
expect(receivedErrorCount).equals(1);
224+
});
225+
226+
it('should throttle watch overflow', async () => {
227+
const overflowAbortController = new AbortController();
228+
const updatesCount = 25;
229+
230+
let receivedWithManagedOverflowCount = 0;
231+
const firstResultReceived = new Promise<void>((resolve) => {
232+
const onResultOverflow = () => {
233+
if (receivedWithManagedOverflowCount === 0) {
234+
resolve();
235+
}
236+
receivedWithManagedOverflowCount++;
237+
};
238+
239+
const query = db.selectFrom('assets').select(db.fn.count('assets.id').as('count'));
240+
db.watch(query, { onResult: onResultOverflow }, { signal: overflowAbortController.signal, throttleMs: 1 });
241+
});
242+
243+
await firstResultReceived;
244+
245+
// Perform a large number of inserts to trigger overflow
246+
for (let i = 0; i < updatesCount; i++) {
247+
db.insertInto('assets')
248+
.values({
249+
id: sql`uuid()`,
250+
make: 'test',
251+
customer_id: sql`uuid()`
252+
})
253+
.execute();
254+
}
255+
256+
await new Promise<void>((resolve) => setTimeout(resolve, 1 * throttleDuration));
257+
258+
overflowAbortController.abort();
259+
260+
// This fluctuates between 3 and 4 based on timing, but should never be 25
261+
expect(receivedWithManagedOverflowCount).greaterThan(2);
262+
expect(receivedWithManagedOverflowCount).toBeLessThanOrEqual(4);
263+
});
264+
});

0 commit comments

Comments
 (0)