Skip to content

Commit 6828d53

Browse files
wip
1 parent 43f24fd commit 6828d53

File tree

9 files changed

+314
-0
lines changed

9 files changed

+314
-0
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"prettier": "^3.2.5",
4141
"prettier-plugin-embed": "^0.4.15",
4242
"prettier-plugin-sql": "^0.18.1",
43+
"source-map-support": "^0.5.21",
4344
"typescript": "^5.7.2",
4445
"vitest": "^3.2.4"
4546
}

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import {
3535
type PowerSyncConnectionOptions,
3636
type RequiredAdditionalConnectionOptions
3737
} from './sync/stream/AbstractStreamingSyncImplementation.js';
38+
import { TriggerManager } from './triggers/TriggerManager.js';
39+
import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';
40+
import { WhenReadyTriggerManager } from './triggers/WhenReadyTriggerManager.js';
3841
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
3942
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
4043
import { WatchedQueryComparator } from './watched/processors/comparators.js';
@@ -198,6 +201,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
198201

199202
protected runExclusiveMutex: Mutex;
200203

204+
protected triggerManager: WhenReadyTriggerManager;
205+
206+
get triggers(): TriggerManager {
207+
return this.triggerManager;
208+
}
209+
201210
constructor(options: PowerSyncDatabaseOptionsWithDBAdapter);
202211
constructor(options: PowerSyncDatabaseOptionsWithOpenFactory);
203212
constructor(options: PowerSyncDatabaseOptionsWithSettings);
@@ -229,6 +238,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
229238
this.ready = false;
230239
this.sdkVersion = '';
231240
this.runExclusiveMutex = new Mutex();
241+
232242
// Start async init
233243
this.connectionManager = new ConnectionManager({
234244
createSyncImplementation: async (connector, options) => {
@@ -255,7 +265,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
255265
},
256266
logger: this.logger
257267
});
268+
258269
this._isReadyPromise = this.initialize();
270+
271+
this.triggerManager = new WhenReadyTriggerManager({
272+
manager: new TriggerManagerImpl({
273+
db: this._database
274+
}),
275+
readyPromise: this.waitForReady()
276+
});
259277
}
260278

261279
/**
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
export enum DiffTriggerOperation {
2+
INSERT = 'INSERT',
3+
UPDATE = 'UPDATE',
4+
DELETE = 'DELETE'
5+
}
6+
7+
export type TriggerResultUpdate = {
8+
operation: 'UPDATE';
9+
id: string;
10+
change: string;
11+
};
12+
13+
export type TriggerResultInsert = {
14+
operation: 'INSERT';
15+
id: string;
16+
change: string;
17+
};
18+
19+
export type TriggerResultDelete = {
20+
id: string;
21+
operation: 'DELETE';
22+
};
23+
24+
export type TriggerDiffResult = TriggerResultUpdate | TriggerResultInsert | TriggerResultDelete;
25+
26+
export type CreateDiffTriggerOptions = {
27+
/**
28+
* Source table to trigger and track changes from.
29+
*/
30+
source: string;
31+
32+
/**
33+
* Operations to track changes for.
34+
*/
35+
operations: DiffTriggerOperation[];
36+
37+
/**
38+
* Destination table to track changes to.
39+
* This table is created internally.
40+
*/
41+
destination: string;
42+
43+
/**
44+
* Columns to track and report changes for.
45+
* Defaults to all columns in the source table.
46+
*/
47+
columns?: string[];
48+
49+
/**
50+
* Optional condition to filter when the trigger should fire.
51+
* This is useful for only triggering on specific conditions.
52+
* For example, you can use it to only trigger on certain values in the NEW row.
53+
* Note that for PowerSync the data is stored in a JSON column named `data`.
54+
* @example `NEW.data.status = 'active' AND length(NEW.data.name) > 3`
55+
*/
56+
when?: string;
57+
};
58+
59+
export type TriggerRemoveCallback = () => Promise<void>;
60+
61+
export interface TriggerManager {
62+
/**
63+
* Creates a temporary trigger which tracks changes to a source table
64+
* and writes changes to a destination table.
65+
* The temporary destination table is created internally and will be dropped when the trigger is removed.
66+
* @returns A callback to remove the trigger and drop the destination table.
67+
*/
68+
createDiffTrigger(options: CreateDiffTriggerOptions): Promise<TriggerRemoveCallback>;
69+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { DBAdapter, LockContext } from '../../db/DBAdapter.js';
2+
import { CreateDiffTriggerOptions, DiffTriggerOperation, TriggerManager } from './TriggerManager.js';
3+
4+
export type TriggerManagerImplOptions = {
5+
db: DBAdapter;
6+
};
7+
8+
export class TriggerManagerImpl implements TriggerManager {
9+
constructor(protected options: TriggerManagerImplOptions) {}
10+
11+
protected get db() {
12+
return this.options.db;
13+
}
14+
15+
protected async removeTriggers(tx: LockContext, triggerIds: string[]) {
16+
for (const triggerId of triggerIds) {
17+
await tx.execute(/* sql */ `DROP TRIGGER IF EXISTS ${triggerId}; `);
18+
}
19+
}
20+
21+
async createDiffTrigger(options: CreateDiffTriggerOptions) {
22+
const { source, destination, columns, operations, when } = options;
23+
24+
if (operations.length == 0) {
25+
throw new Error('At least one operation must be specified for the trigger.');
26+
}
27+
28+
if (columns && columns.length == 0) {
29+
throw new Error('At least one column must be specified for the trigger.');
30+
}
31+
32+
const whenCondition = when ? `WHEN ${when}` : '';
33+
34+
const { id: uuid } = await this.db.get<{ id: string }>(/* sql */ `
35+
SELECT
36+
uuid () as id
37+
`);
38+
39+
const id = uuid.replace(/-/g, '_'); // Replace dashes with underscores for SQLite compatibility
40+
41+
const triggerIds: string[] = [];
42+
43+
const jsonFragment = (source: 'NEW' | 'OLD' = 'NEW') =>
44+
columns
45+
? `json_object(${columns.map((col) => `'${col}', json_extract(${source}.data, '$.${col}')`).join(', ')})`
46+
: `${source}.data`;
47+
48+
/**
49+
* Declare the cleanup function early since if any of the init steps fail,
50+
* we need to ensure we can cleanup the created resources.
51+
* We unfortunately cannot rely on transaction rollback.
52+
*/
53+
const cleanup = async () =>
54+
this.db.writeLock(async (tx) => {
55+
await this.removeTriggers(tx, triggerIds);
56+
await tx.execute(/* sql */ `DROP TABLE IF EXISTS ${destination};`);
57+
});
58+
59+
try {
60+
await this.db.writeLock(async (tx) => {
61+
await tx.execute(/* sql */ `CREATE TEMP TABLE ${destination} (id TEXT, operation TEXT, change TEXT);`);
62+
63+
if (operations.includes(DiffTriggerOperation.INSERT)) {
64+
const insertTriggerId = `ps_temp_trigger_insert_${id}`;
65+
triggerIds.push(insertTriggerId);
66+
67+
await tx.execute(/* sql */ `
68+
CREATE TEMP TRIGGER ${insertTriggerId} AFTER INSERT ON ${source} ${whenCondition} BEGIN
69+
INSERT INTO
70+
${destination} (id, operation, change)
71+
VALUES
72+
(
73+
NEW.id,
74+
'INSERT',
75+
${jsonFragment('NEW')}
76+
);
77+
78+
END;
79+
`);
80+
}
81+
82+
if (operations.includes(DiffTriggerOperation.UPDATE)) {
83+
const updateTriggerId = `ps_temp_trigger_update_${id}`;
84+
triggerIds.push(updateTriggerId);
85+
86+
await tx.execute(/* sql */ `
87+
CREATE TEMP TRIGGER ${updateTriggerId} AFTER
88+
UPDATE ON ${source} ${whenCondition} BEGIN
89+
INSERT INTO
90+
${destination} (id, operation, change)
91+
VALUES
92+
(
93+
NEW.id,
94+
'UPDATE',
95+
--- Reports both the new and old values in JSON format
96+
json_object (
97+
'new',
98+
${jsonFragment('NEW')},
99+
'old',
100+
${jsonFragment('OLD')}
101+
)
102+
);
103+
104+
END;
105+
`);
106+
}
107+
108+
if (operations.includes(DiffTriggerOperation.DELETE)) {
109+
const deleteTriggerId = `ps_temp_trigger_delete_${id}`;
110+
triggerIds.push(deleteTriggerId);
111+
112+
// Create delete trigger for basic JSON
113+
await tx.execute(/* sql */ `
114+
CREATE TEMP TRIGGER ${deleteTriggerId} AFTER DELETE ON ${source} ${whenCondition} BEGIN
115+
INSERT INTO
116+
${destination} (id, operation)
117+
VALUES
118+
(NEW.id, 'DELETE');
119+
120+
END;
121+
`);
122+
}
123+
});
124+
} catch (error) {
125+
await cleanup();
126+
throw error;
127+
}
128+
129+
return cleanup;
130+
}
131+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { CreateDiffTriggerOptions, TriggerManager, TriggerRemoveCallback } from './TriggerManager.js';
2+
import { TriggerManagerImpl } from './TriggerManagerImpl.js';
3+
4+
export interface WhenReadyTriggerManagerOptions {
5+
manager: TriggerManagerImpl;
6+
readyPromise: Promise<void>;
7+
}
8+
9+
export class WhenReadyTriggerManager implements TriggerManager {
10+
protected readyPromise: Promise<void>;
11+
manager: TriggerManager;
12+
13+
constructor(options: WhenReadyTriggerManagerOptions) {
14+
this.manager = options.manager;
15+
// Wait for the DB to be ready, then initialize the trigger manager.
16+
this.readyPromise = options.readyPromise;
17+
}
18+
19+
async createDiffTrigger(options: CreateDiffTriggerOptions): Promise<TriggerRemoveCallback> {
20+
await this.readyPromise;
21+
return this.manager.createDiffTrigger(options);
22+
}
23+
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export * from './db/schema/Table.js';
3232
export * from './db/schema/TableV2.js';
3333

3434
export * from './client/Query.js';
35+
export * from './client/triggers/TriggerManager.js';
3536
export * from './client/watched/GetAllQuery.js';
3637
export * from './client/watched/processors/AbstractQueryProcessor.js';
3738
export * from './client/watched/processors/comparators.js';
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { DiffTriggerOperation, TriggerDiffResult } from '@powersync/common';
2+
// import 'source-map-support/register';
3+
import { vi } from 'vitest';
4+
import { databaseTest } from './utils';
5+
6+
databaseTest('triggers', { timeout: 100_000 }, async ({ database }) => {
7+
const tempTable = 'remote_lists';
8+
9+
await database.triggers.createDiffTrigger({
10+
source: 'ps_data__lists',
11+
destination: tempTable,
12+
columns: ['name'],
13+
operations: [DiffTriggerOperation.INSERT, DiffTriggerOperation.UPDATE]
14+
});
15+
16+
const results = [] as TriggerDiffResult[];
17+
18+
database.onChange(
19+
{
20+
onChange: async (change) => {
21+
// This callback async processed
22+
await database.writeLock(async (tx) => {
23+
// API exposes a context to run things here.
24+
// using execute seems to be important on Node.js
25+
// the temp table is not present if using getAll
26+
const changes = await tx.execute(/* sql */ `
27+
SELECT
28+
*
29+
FROM
30+
${tempTable}
31+
`);
32+
33+
results.push(...(changes.rows?._array || []));
34+
35+
await tx.execute(/* sql */ ` DELETE FROM ${tempTable}; `);
36+
});
37+
}
38+
},
39+
{ tables: [tempTable] }
40+
);
41+
42+
await database.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?);', ['test list']);
43+
44+
await database.execute(`UPDATE lists SET name = 'wooo'`);
45+
46+
vi.waitFor(
47+
() => {
48+
expect(results.length).toEqual(2);
49+
expect(results[0].operation).toEqual('INSERT');
50+
expect(results[1].operation).toEqual('UPDATE');
51+
},
52+
{ timeout: 1000 }
53+
);
54+
});

packages/node/tests/tsconfig.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"references": [
3+
{
4+
"path": "../../common"
5+
},
6+
{
7+
"path": "../"
8+
}
9+
]
10+
}

pnpm-lock.yaml

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)