Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions demos/example-node/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { once } from 'node:events';
import repl_factory from 'node:repl';

import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
import {
createBaseLogger,
createLogger,
PowerSyncDatabase,
SyncClientImplementation,
SyncStreamConnectionMethod
} from '@powersync/node';
import { exit } from 'node:process';
import { AppSchema, DemoConnector } from './powersync.js';
import { enableUncidiDiagnostics } from './UndiciDiagnostics.js';
Expand Down Expand Up @@ -34,7 +40,8 @@ const main = async () => {
console.log(await db.get('SELECT powersync_rs_version();'));

await db.connect(new DemoConnector(), {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
clientImplementation: SyncClientImplementation.RUST
});
// Example using a proxy agent for more control over the connection:
// const proxyAgent = new (await import('undici')).ProxyAgent({
Expand Down
1 change: 0 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
"async-mutex": "^0.4.0",
"bson": "^6.6.0",
"buffer": "^6.0.3",
"can-ndjson-stream": "^1.0.2",
"cross-fetch": "^4.0.0",
"event-iterator": "^2.0.0",
"rollup": "4.14.3",
Expand Down
18 changes: 17 additions & 1 deletion packages/common/src/client/sync/bucket/BucketStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,31 @@ export enum PSInternalTable {
UNTYPED = 'ps_untyped'
}

export enum PowerSyncControlCommand {
PROCESS_TEXT_LINE = 'line_text',
PROCESS_BSON_LINE = 'line_binary',
STOP = 'stop',
START = 'start',
NOTIFY_TOKEN_REFRESHED = 'refreshed_token',
NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload'
}

export interface BucketStorageListener extends BaseListener {
crudUpdate: () => void;
}

export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener>, Disposable {
init(): Promise<void>;
saveSyncData(batch: SyncDataBatch): Promise<void>;
saveSyncData(batch: SyncDataBatch, fixedKeyFormat?: boolean): Promise<void>;
removeBuckets(buckets: string[]): Promise<void>;
setTargetCheckpoint(checkpoint: Checkpoint): Promise<void>;

startSession(): void;

getBucketStates(): Promise<BucketState[]>;
getBucketOperationProgress(): Promise<BucketOperationProgress>;
hasMigratedSubkeys(): Promise<boolean>;
migrateToFixedSubkeys(): Promise<void>;

syncLocalDatabase(
checkpoint: Checkpoint,
Expand Down Expand Up @@ -101,4 +112,9 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
* Get an unique client id.
*/
getClientId(): Promise<string>;

/**
* Invokes the `powersync_control` function for the sync client.
*/
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string>;
}
12 changes: 7 additions & 5 deletions packages/common/src/client/sync/bucket/OplogEntry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export interface OplogEntryJSON {
object_type?: string;
op_id: string;
op: OpTypeJSON;
subkey?: string | object;
subkey?: string;
}

export class OplogEntry {
Expand All @@ -17,7 +17,7 @@ export class OplogEntry {
row.op_id,
OpType.fromJSON(row.op),
row.checksum,
typeof row.subkey == 'string' ? row.subkey : JSON.stringify(row.subkey),
row.subkey,
row.object_type,
row.object_id,
row.data
Expand All @@ -28,21 +28,23 @@ export class OplogEntry {
public op_id: OpId,
public op: OpType,
public checksum: number,
public subkey: string,
public subkey?: string,
public object_type?: string,
public object_id?: string,
public data?: string
) {}

toJSON(): OplogEntryJSON {
toJSON(fixedKeyEncoding = false): OplogEntryJSON {
return {
op_id: this.op_id,
op: this.op.toJSON(),
object_type: this.object_type,
object_id: this.object_id,
checksum: this.checksum,
data: this.data,
subkey: JSON.stringify(this.subkey)
// Older versions of the JS SDK used to always JSON.stringify here. That has always been wrong,
// but we need to migrate gradually to not break existing databases.
subkey: fixedKeyEncoding ? this.subkey : JSON.stringify(this.subkey)
};
}
}
31 changes: 29 additions & 2 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
BucketStorageAdapter,
BucketStorageListener,
Checkpoint,
PowerSyncControlCommand,
PSInternalTable,
SyncLocalDatabaseResult
} from './BucketStorageAdapter.js';
Expand Down Expand Up @@ -99,13 +100,13 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return Object.fromEntries(rows.map((r) => [r.name, { atLast: r.count_at_last, sinceLast: r.count_since_last }]));
}

async saveSyncData(batch: SyncDataBatch) {
async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean = false) {
await this.writeTransaction(async (tx) => {
let count = 0;
for (const b of batch.buckets) {
const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
'save',
JSON.stringify({ buckets: [b.toJSON()] })
JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] })
]);
this.logger.debug('saveSyncData', JSON.stringify(result));
count += b.data.length;
Expand Down Expand Up @@ -413,6 +414,32 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
async setTargetCheckpoint(checkpoint: Checkpoint) {
// No-op for now
}

async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
return await this.writeTransaction(async (tx) => {
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
return raw;
});
}

async hasMigratedSubkeys(): Promise<boolean> {
const { r } = await this.db.get<{ r: number }>('SELECT EXISTS(SELECT * FROM ps_kv WHERE key = ?) as r', [
SqliteBucketStorage._subkeyMigrationKey
]);
return r != 0;
}

async migrateToFixedSubkeys(): Promise<void> {
await this.writeTransaction(async (tx) => {
await tx.execute('UPDATE ps_oplog SET key = powersync_remove_duplicate_key_encoding(key);');
await tx.execute('INSERT OR REPLACE INTO ps_kv (key, value) VALUES (?, ?);', [
SqliteBucketStorage._subkeyMigrationKey,
'1'
]);
});
}

static _subkeyMigrationKey = 'powersync_js_migrated_subkeys';
}

function hasMatchingPriority(priority: number, bucket: BucketChecksum) {
Expand Down
4 changes: 2 additions & 2 deletions packages/common/src/client/sync/bucket/SyncDataBucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ export class SyncDataBucket {
public next_after?: OpId
) {}

toJSON(): SyncDataBucketJSON {
toJSON(fixedKeyEncoding = false): SyncDataBucketJSON {
return {
bucket: this.bucket,
has_more: this.has_more,
after: this.after,
next_after: this.next_after,
data: this.data.map((entry) => entry.toJSON())
data: this.data.map((entry) => entry.toJSON(fixedKeyEncoding))
};
}
}
Loading
Loading