Skip to content

Commit 6cadbc2

Browse files
committed
WIP support
1 parent 1037e8a commit 6cadbc2

File tree

4 files changed

+683
-45
lines changed

4 files changed

+683
-45
lines changed

demos/example-node/output.txt

Lines changed: 458 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import {
2+
AbstractPowerSyncDatabase,
3+
BaseObserver,
4+
CrudEntry,
5+
PowerSyncBackendConnector,
6+
UpdateType,
7+
type PowerSyncCredentials
8+
} from '@powersync/node';
9+
10+
import { Session, SupabaseClient, createClient } from '@supabase/supabase-js';
11+
12+
export type SupabaseConfig = {
13+
supabaseUrl: string;
14+
supabaseAnonKey: string;
15+
powersyncUrl: string;
16+
};
17+
18+
/// Postgres Response codes that we cannot recover from by retrying.
19+
const FATAL_RESPONSE_CODES = [
20+
// Class 22 — Data Exception
21+
// Examples include data type mismatch.
22+
new RegExp('^22...$'),
23+
// Class 23 — Integrity Constraint Violation.
24+
// Examples include NOT NULL, FOREIGN KEY and UNIQUE violations.
25+
new RegExp('^23...$'),
26+
// INSUFFICIENT PRIVILEGE - typically a row-level security violation
27+
new RegExp('^42501$')
28+
];
29+
30+
export type SupabaseConnectorListener = {
31+
initialized: () => void;
32+
sessionStarted: (session: Session) => void;
33+
};
34+
35+
export class SupabaseConnector extends BaseObserver<SupabaseConnectorListener> implements PowerSyncBackendConnector {
36+
readonly client: SupabaseClient;
37+
readonly config: SupabaseConfig;
38+
39+
ready: boolean;
40+
41+
currentSession: Session | null;
42+
43+
constructor() {
44+
super();
45+
this.config = {
46+
supabaseUrl: process.env.SUPABASE_URL!,
47+
powersyncUrl: process.env.POWERSYNC_URL!,
48+
supabaseAnonKey: process.env.SUPABASE_ANON_KEY!
49+
};
50+
51+
console.log('YOINK' + JSON.stringify(this.config));
52+
this.client = createClient(this.config.supabaseUrl, this.config.supabaseAnonKey, {
53+
auth: {
54+
persistSession: true
55+
}
56+
});
57+
this.currentSession = null;
58+
this.ready = false;
59+
}
60+
61+
async init() {
62+
if (this.ready) {
63+
return;
64+
}
65+
66+
const sessionResponse = await this.client.auth.getSession();
67+
this.updateSession(sessionResponse.data.session);
68+
69+
this.ready = true;
70+
this.iterateListeners((cb) => cb.initialized?.());
71+
}
72+
73+
async login() {
74+
const {
75+
data: { session },
76+
error
77+
} = await this.client.auth.signInWithPassword({
78+
79+
password: '[email protected]'
80+
});
81+
82+
if (error) {
83+
throw error;
84+
}
85+
86+
this.updateSession(session);
87+
}
88+
89+
async fetchCredentials() {
90+
const {
91+
data: { session },
92+
error
93+
} = await this.client.auth.getSession();
94+
95+
if (!session || error) {
96+
throw new Error(`Could not fetch Supabase credentials: ${error}`);
97+
}
98+
99+
console.debug('session expires at', session.expires_at);
100+
101+
return {
102+
endpoint: this.config.powersyncUrl,
103+
token: session.access_token ?? ''
104+
} satisfies PowerSyncCredentials;
105+
}
106+
107+
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
108+
const transaction = await database.getNextCrudTransaction();
109+
110+
if (!transaction) {
111+
return;
112+
}
113+
114+
let lastOp: CrudEntry | null = null;
115+
try {
116+
// Note: If transactional consistency is important, use database functions
117+
// or edge functions to process the entire transaction in a single call.
118+
for (const op of transaction.crud) {
119+
lastOp = op;
120+
const table = this.client.from(op.table);
121+
let result: any;
122+
switch (op.op) {
123+
case UpdateType.PUT:
124+
const record = { ...op.opData, id: op.id };
125+
result = await table.upsert(record);
126+
break;
127+
case UpdateType.PATCH:
128+
result = await table.update(op.opData).eq('id', op.id);
129+
break;
130+
case UpdateType.DELETE:
131+
result = await table.delete().eq('id', op.id);
132+
break;
133+
}
134+
135+
if (result.error) {
136+
console.error(result.error);
137+
result.error.message = `Could not update Supabase. Received error: ${result.error.message}`;
138+
throw result.error;
139+
}
140+
}
141+
142+
await transaction.complete();
143+
} catch (ex: any) {
144+
console.debug(ex);
145+
if (typeof ex.code == 'string' && FATAL_RESPONSE_CODES.some((regex) => regex.test(ex.code))) {
146+
/**
147+
* Instead of blocking the queue with these errors,
148+
* discard the (rest of the) transaction.
149+
*
150+
* Note that these errors typically indicate a bug in the application.
151+
* If protecting against data loss is important, save the failing records
152+
* elsewhere instead of discarding, and/or notify the user.
153+
*/
154+
console.error('Data upload error - discarding:', lastOp, ex);
155+
await transaction.complete();
156+
} else {
157+
// Error may be retryable - e.g. network error or temporary server error.
158+
// Throwing an error here causes this call to be retried after a delay.
159+
throw ex;
160+
}
161+
}
162+
}
163+
164+
updateSession(session: Session | null) {
165+
this.currentSession = session;
166+
if (!session) {
167+
return;
168+
}
169+
this.iterateListeners((cb) => cb.sessionStarted?.(session));
170+
}
171+
}

demos/example-node/src/main.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,20 @@ import repl_factory from 'node:repl';
33

44
import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
55
import { exit } from 'node:process';
6-
import { AppSchema, DemoConnector } from './powersync.js';
6+
import { AppSchema } from './powersync.js';
7+
import { SupabaseConnector } from './SupabaseConnector.js';
78

89
const main = async () => {
910
const baseLogger = createBaseLogger();
1011
const logger = createLogger('PowerSyncDemo');
1112
baseLogger.useDefaults({ defaultLevel: logger.WARN });
1213

13-
if (!('BACKEND' in process.env) || !('SYNC_SERVICE' in process.env)) {
14-
console.warn(
15-
'Set the BACKEND and SYNC_SERVICE environment variables to point to a sync service and a running demo backend.'
16-
);
17-
return;
18-
}
14+
// if (!('BACKEND' in process.env) || !('SYNC_SERVICE' in process.env)) {
15+
// console.warn(
16+
// 'Set the BACKEND and SYNC_SERVICE environment variables to point to a sync service and a running demo backend.'
17+
// );
18+
// return;
19+
// }
1920

2021
const db = new PowerSyncDatabase({
2122
schema: AppSchema,
@@ -26,7 +27,15 @@ const main = async () => {
2627
});
2728
console.log(await db.get('SELECT powersync_rs_version();'));
2829

29-
await db.connect(new DemoConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
30+
db.registerListener({
31+
statusChanged(status) {
32+
console.log('status changed', status);
33+
}
34+
});
35+
36+
const connector = new SupabaseConnector();
37+
await connector.login();
38+
await db.connect(connector, { connectionMethod: SyncStreamConnectionMethod.HTTP });
3039
await db.waitForFirstSync();
3140
console.log('First sync complete!');
3241

demos/example-node/src/powersync.ts

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,48 @@
11
import { AbstractPowerSyncDatabase, column, PowerSyncBackendConnector, Schema, Table } from '@powersync/node';
22

3-
export class DemoConnector implements PowerSyncBackendConnector {
4-
async fetchCredentials() {
5-
const response = await fetch(`${process.env.BACKEND}/api/auth/token`);
6-
if (response.status != 200) {
7-
throw 'Could not fetch token';
8-
}
3+
// export class DemoConnector implements PowerSyncBackendConnector {
4+
// async fetchCredentials() {
5+
// const response = await fetch(`${process.env.BACKEND}/api/auth/token`);
6+
// if (response.status != 200) {
7+
// throw 'Could not fetch token';
8+
// }
99

10-
const { token } = await response.json();
10+
// const { token } = await response.json();
1111

12-
return {
13-
endpoint: process.env.SYNC_SERVICE!,
14-
token: token
15-
};
16-
}
12+
// return {
13+
// endpoint: process.env.SYNC_SERVICE!,
14+
// token: token
15+
// };
16+
// }
1717

18-
async uploadData(database: AbstractPowerSyncDatabase) {
19-
const batch = await database.getCrudBatch();
20-
if (batch == null) {
21-
return;
22-
}
18+
// async uploadData(database: AbstractPowerSyncDatabase) {
19+
// const batch = await database.getCrudBatch();
20+
// if (batch == null) {
21+
// return;
22+
// }
2323

24-
const entries: any[] = [];
25-
for (const op of batch.crud) {
26-
entries.push({
27-
table: op.table,
28-
op: op.op,
29-
id: op.id,
30-
data: op.opData
31-
});
32-
}
24+
// const entries: any[] = [];
25+
// for (const op of batch.crud) {
26+
// entries.push({
27+
// table: op.table,
28+
// op: op.op,
29+
// id: op.id,
30+
// data: op.opData
31+
// });
32+
// }
3333

34-
const response = await fetch(`${process.env.BACKEND}/api/data/`, {
35-
method: 'POST',
36-
headers: { 'Content-Type': 'application/json' },
37-
body: JSON.stringify({ batch: entries })
38-
});
39-
if (response.status !== 200) {
40-
throw new Error(`Server returned HTTP ${response.status}: ${await response.text()}`);
41-
}
34+
// const response = await fetch(`${process.env.BACKEND}/api/data/`, {
35+
// method: 'POST',
36+
// headers: { 'Content-Type': 'application/json' },
37+
// body: JSON.stringify({ batch: entries })
38+
// });
39+
// if (response.status !== 200) {
40+
// throw new Error(`Server returned HTTP ${response.status}: ${await response.text()}`);
41+
// }
4242

43-
await batch?.complete();
44-
}
45-
}
43+
// await batch?.complete();
44+
// }
45+
// }
4646

4747
export const LIST_TABLE = 'lists';
4848
export const TODO_TABLE = 'todos';

0 commit comments

Comments
 (0)