Skip to content

Commit f6b5b02

Browse files
committed
Node: Support using node:sqlite3 APIs
1 parent 03831dc commit f6b5b02

File tree

8 files changed

+1186
-735
lines changed

8 files changed

+1186
-735
lines changed

packages/node/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"devDependencies": {
7070
"@powersync/drizzle-driver": "workspace:*",
7171
"@types/async-lock": "^1.4.0",
72+
"@types/node": "^24.2.0",
7273
"drizzle-orm": "^0.35.2",
7374
"rollup": "4.14.3",
7475
"typescript": "^5.5.3",

packages/node/src/db/AsyncDatabase.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { QueryResult } from '@powersync/common';
2+
import { NodeDatabaseImplementation } from './options.js';
23

34
export type ProxiedQueryResult = Omit<QueryResult, 'rows'> & {
45
rows?: {
@@ -7,8 +8,14 @@ export type ProxiedQueryResult = Omit<QueryResult, 'rows'> & {
78
};
89
};
910

11+
export interface AsyncDatabaseOpenOptions {
12+
path: string;
13+
isWriter: boolean;
14+
implementation: NodeDatabaseImplementation;
15+
}
16+
1017
export interface AsyncDatabaseOpener {
11-
open(path: string, isWriter: boolean): Promise<AsyncDatabase>;
18+
open(options: AsyncDatabaseOpenOptions): Promise<AsyncDatabase>;
1219
}
1320

1421
export interface AsyncDatabase {

packages/node/src/db/BetterSQLite3DBAdapter.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ export class BetterSQLite3DBAdapter extends BaseObserver<DBAdapterListener> impl
117117
console.error('Unexpected PowerSync database worker error', e);
118118
});
119119

120-
const database = (await comlink.open(dbFilePath, isWriter)) as Remote<AsyncDatabase>;
120+
const database = (await comlink.open({
121+
path: dbFilePath,
122+
isWriter,
123+
implementation: this.options.implementation ?? 'node'
124+
})) as Remote<AsyncDatabase>;
121125
return new RemoteConnection(worker, comlink, database);
122126
};
123127

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import * as Comlink from 'comlink';
2+
import BetterSQLite3Database, { Database } from '@powersync/better-sqlite3';
3+
import { AsyncDatabase, AsyncDatabaseOpener, AsyncDatabaseOpenOptions } from './AsyncDatabase.js';
4+
import { PowerSyncWorkerOptions } from './SqliteWorker.js';
5+
import { threadId } from 'node:worker_threads';
6+
7+
class BlockingAsyncDatabase implements AsyncDatabase {
8+
private readonly db: Database;
9+
10+
private readonly uncommittedUpdatedTables = new Set<string>();
11+
private readonly committedUpdatedTables = new Set<string>();
12+
13+
constructor(db: Database) {
14+
this.db = db;
15+
16+
db.function('node_thread_id', () => threadId);
17+
}
18+
19+
collectCommittedUpdates() {
20+
const resolved = Promise.resolve([...this.committedUpdatedTables]);
21+
this.committedUpdatedTables.clear();
22+
return resolved;
23+
}
24+
25+
installUpdateHooks() {
26+
this.db.updateHook((_op: string, _dbName: string, tableName: string, _rowid: bigint) => {
27+
this.uncommittedUpdatedTables.add(tableName);
28+
});
29+
30+
this.db.commitHook(() => {
31+
for (const tableName of this.uncommittedUpdatedTables) {
32+
this.committedUpdatedTables.add(tableName);
33+
}
34+
this.uncommittedUpdatedTables.clear();
35+
return true;
36+
});
37+
38+
this.db.rollbackHook(() => {
39+
this.uncommittedUpdatedTables.clear();
40+
});
41+
}
42+
43+
async close() {
44+
this.db.close();
45+
}
46+
47+
async execute(query: string, params: any[]) {
48+
const stmt = this.db.prepare(query);
49+
if (stmt.reader) {
50+
const rows = stmt.all(params);
51+
return {
52+
rowsAffected: 0,
53+
rows: {
54+
_array: rows,
55+
length: rows.length
56+
}
57+
};
58+
} else {
59+
const info = stmt.run(params);
60+
return {
61+
rowsAffected: info.changes,
62+
insertId: Number(info.lastInsertRowid)
63+
};
64+
}
65+
}
66+
67+
async executeRaw(query: string, params: any[]) {
68+
const stmt = this.db.prepare(query);
69+
70+
if (stmt.reader) {
71+
return stmt.raw().all(params);
72+
} else {
73+
stmt.raw().run(params);
74+
return [];
75+
}
76+
}
77+
78+
async executeBatch(query: string, params: any[][]) {
79+
params = params ?? [];
80+
81+
let rowsAffected = 0;
82+
83+
const stmt = this.db.prepare(query);
84+
for (const paramSet of params) {
85+
const info = stmt.run(paramSet);
86+
rowsAffected += info.changes;
87+
}
88+
89+
return { rowsAffected };
90+
}
91+
}
92+
93+
export async function openDatabase(worker: PowerSyncWorkerOptions, options: AsyncDatabaseOpenOptions) {
94+
const baseDB = new BetterSQLite3Database(options.path);
95+
baseDB.pragma('journal_mode = WAL');
96+
baseDB.loadExtension(worker.extensionPath(), 'sqlite3_powersync_init');
97+
if (!options.isWriter) {
98+
baseDB.pragma('query_only = true');
99+
}
100+
101+
const asyncDb = new BlockingAsyncDatabase(baseDB);
102+
asyncDb.installUpdateHooks();
103+
return asyncDb;
104+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { threadId } from 'node:worker_threads';
2+
import type { DatabaseSync } from 'node:sqlite';
3+
4+
import * as Comlink from 'comlink';
5+
import { AsyncDatabase, AsyncDatabaseOpener, AsyncDatabaseOpenOptions } from './AsyncDatabase.js';
6+
import { PowerSyncWorkerOptions } from './SqliteWorker.js';
7+
8+
class BlockingNodeDatabase implements AsyncDatabase {
9+
private readonly db: DatabaseSync;
10+
11+
constructor(db: DatabaseSync, write: boolean) {
12+
this.db = db;
13+
14+
db.function('node_thread_id', () => threadId);
15+
if (write) {
16+
db.exec("SELECT powersync_update_hooks('install');");
17+
}
18+
}
19+
20+
async collectCommittedUpdates() {
21+
const stmt = this.db.prepare("SELECT powersync_update_hooks('get') AS r;");
22+
const row = stmt.get()!;
23+
24+
return JSON.parse(row['r'] as string) as string[];
25+
}
26+
27+
async close() {
28+
this.db.close();
29+
}
30+
31+
async execute(query: string, params: any[]) {
32+
const stmt = this.db.prepare(query);
33+
const rows = stmt.all(...params);
34+
return {
35+
rowsAffected: 0,
36+
rows: {
37+
_array: rows,
38+
length: rows.length
39+
}
40+
};
41+
}
42+
43+
async executeRaw(query: string, params: any[]) {
44+
const stmt = this.db.prepare(query);
45+
(stmt as any).setReturnArrays(true); // Missing in @types/node, https://nodejs.org/api/sqlite.html#statementsetreturnarraysenabled
46+
return stmt.all(...params) as any as any[][];
47+
}
48+
49+
async executeBatch(query: string, params: any[][]) {
50+
params = params ?? [];
51+
52+
let rowsAffected = 0;
53+
54+
const stmt = this.db.prepare(query);
55+
for (const paramSet of params) {
56+
const info = stmt.run(...paramSet);
57+
rowsAffected += info.changes as number;
58+
}
59+
60+
return { rowsAffected };
61+
}
62+
}
63+
64+
export async function openDatabase(worker: PowerSyncWorkerOptions, options: AsyncDatabaseOpenOptions) {
65+
// NOTE: We want to import node:sqlite dynamically, to avoid bundlers unconditionally requiring node:sqlite in the
66+
// end, since that would make us incompatible with older Node.JS versions.
67+
const { DatabaseSync } = await import('node:sqlite');
68+
69+
const baseDB = new DatabaseSync(options.path, { allowExtension: true });
70+
baseDB.exec('pragma journal_mode = WAL');
71+
baseDB.loadExtension(worker.extensionPath());
72+
if (!options.isWriter) {
73+
baseDB.exec('pragma query_only = true');
74+
}
75+
76+
return new BlockingNodeDatabase(baseDB, options.isWriter);
77+
}
Lines changed: 30 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,118 +1,11 @@
11
import * as path from 'node:path';
2-
import BetterSQLite3Database, { Database } from '@powersync/better-sqlite3';
32
import * as Comlink from 'comlink';
4-
import { parentPort, threadId } from 'node:worker_threads';
3+
import { parentPort } from 'node:worker_threads';
54
import OS from 'node:os';
65
import url from 'node:url';
7-
import { AsyncDatabase, AsyncDatabaseOpener } from './AsyncDatabase.js';
8-
9-
class BlockingAsyncDatabase implements AsyncDatabase {
10-
private readonly db: Database;
11-
12-
private readonly uncommittedUpdatedTables = new Set<string>();
13-
private readonly committedUpdatedTables = new Set<string>();
14-
15-
constructor(db: Database) {
16-
this.db = db;
17-
18-
db.function('node_thread_id', () => threadId);
19-
}
20-
21-
collectCommittedUpdates() {
22-
const resolved = Promise.resolve([...this.committedUpdatedTables]);
23-
this.committedUpdatedTables.clear();
24-
return resolved;
25-
}
26-
27-
installUpdateHooks() {
28-
this.db.updateHook((_op: string, _dbName: string, tableName: string, _rowid: bigint) => {
29-
this.uncommittedUpdatedTables.add(tableName);
30-
});
31-
32-
this.db.commitHook(() => {
33-
for (const tableName of this.uncommittedUpdatedTables) {
34-
this.committedUpdatedTables.add(tableName);
35-
}
36-
this.uncommittedUpdatedTables.clear();
37-
return true;
38-
});
39-
40-
this.db.rollbackHook(() => {
41-
this.uncommittedUpdatedTables.clear();
42-
});
43-
}
44-
45-
async close() {
46-
this.db.close();
47-
}
48-
49-
async execute(query: string, params: any[]) {
50-
const stmt = this.db.prepare(query);
51-
if (stmt.reader) {
52-
const rows = stmt.all(params);
53-
return {
54-
rowsAffected: 0,
55-
rows: {
56-
_array: rows,
57-
length: rows.length
58-
}
59-
};
60-
} else {
61-
const info = stmt.run(params);
62-
return {
63-
rowsAffected: info.changes,
64-
insertId: Number(info.lastInsertRowid)
65-
};
66-
}
67-
}
68-
69-
async executeRaw(query: string, params: any[]) {
70-
const stmt = this.db.prepare(query);
71-
72-
if (stmt.reader) {
73-
return stmt.raw().all(params);
74-
} else {
75-
stmt.raw().run(params);
76-
return [];
77-
}
78-
}
79-
80-
async executeBatch(query: string, params: any[][]) {
81-
params = params ?? [];
82-
83-
let rowsAffected = 0;
84-
85-
const stmt = this.db.prepare(query);
86-
for (const paramSet of params) {
87-
const info = stmt.run(paramSet);
88-
rowsAffected += info.changes;
89-
}
90-
91-
return { rowsAffected };
92-
}
93-
}
94-
95-
class BetterSqliteWorker implements AsyncDatabaseOpener {
96-
options: PowerSyncWorkerOptions;
97-
98-
constructor(options: PowerSyncWorkerOptions) {
99-
this.options = options;
100-
}
101-
102-
async open(path: string, isWriter: boolean): Promise<AsyncDatabase> {
103-
const baseDB = new BetterSQLite3Database(path);
104-
baseDB.pragma('journal_mode = WAL');
105-
baseDB.loadExtension(this.options.extensionPath(), 'sqlite3_powersync_init');
106-
if (!isWriter) {
107-
baseDB.pragma('query_only = true');
108-
}
109-
110-
const asyncDb = new BlockingAsyncDatabase(baseDB);
111-
asyncDb.installUpdateHooks();
112-
113-
return Comlink.proxy(asyncDb);
114-
}
115-
}
6+
import { openDatabase as openBetterSqliteDatabase } from './BetterSqliteWorker.js';
7+
import { openDatabase as openNodeDatabase } from './NodeSqliteWorker.js';
8+
import { AsyncDatabase, AsyncDatabaseOpener, AsyncDatabaseOpenOptions } from './AsyncDatabase.js';
1169

11710
export interface PowerSyncWorkerOptions {
11811
/**
@@ -152,5 +45,30 @@ export function startPowerSyncWorker(options?: Partial<PowerSyncWorkerOptions>)
15245
...options
15346
};
15447

155-
Comlink.expose(new BetterSqliteWorker(resolvedOptions), parentPort! as Comlink.Endpoint);
48+
Comlink.expose(new DatabaseOpenHelper(resolvedOptions), parentPort! as Comlink.Endpoint);
49+
}
50+
51+
class DatabaseOpenHelper implements AsyncDatabaseOpener {
52+
private options: PowerSyncWorkerOptions;
53+
54+
constructor(options: PowerSyncWorkerOptions) {
55+
this.options = options;
56+
}
57+
58+
async open(options: AsyncDatabaseOpenOptions): Promise<AsyncDatabase> {
59+
let database: AsyncDatabase;
60+
61+
switch (options.implementation) {
62+
case 'better-sqlite3':
63+
database = await openBetterSqliteDatabase(this.options, options);
64+
break;
65+
case 'node':
66+
database = await openNodeDatabase(this.options, options);
67+
break;
68+
default:
69+
throw new Error(`Unknown database implementation: ${options.implementation}.`);
70+
}
71+
72+
return Comlink.proxy(database);
73+
}
15674
}

packages/node/src/db/options.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ import { SQLOpenOptions } from '@powersync/common';
33

44
export type WorkerOpener = (...args: ConstructorParameters<typeof Worker>) => InstanceType<typeof Worker>;
55

6+
export type NodeDatabaseImplementation = 'better-sqlite3' | 'node';
7+
68
/**
79
* The {@link SQLOpenOptions} available across all PowerSync SDKs for JavaScript extended with
810
* Node.JS-specific options.
911
*/
1012
export interface NodeSQLOpenOptions extends SQLOpenOptions {
13+
implementation?: NodeDatabaseImplementation;
14+
1115
/**
1216
* The Node.JS SDK will use one worker to run writing queries and additional workers to run reads.
1317
* This option controls how many workers to use for reads.

0 commit comments

Comments
 (0)