Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion demos/angular-supabase-todolist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"@angular/platform-browser-dynamic": "^18.1.1",
"@angular/router": "^18.1.1",
"@angular/service-worker": "^18.1.1",
"@journeyapps/wa-sqlite": "^0.2.0",
"@journeyapps/wa-sqlite": "^0.3.0",
"@powersync/web": "workspace:*",
"@supabase/supabase-js": "^2.44.4",
"rxjs": "~7.8.1",
Expand Down
2 changes: 1 addition & 1 deletion demos/django-react-native-todolist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"dependencies": {
"@azure/core-asynciterator-polyfill": "^1.0.2",
"@expo/vector-icons": "^14.0.0",
"@journeyapps/react-native-quick-sqlite": "^1.1.7",
"@journeyapps/react-native-quick-sqlite": "^1.3.0",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*",
"@powersync/react-native": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion demos/example-capacitor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"@capacitor/core": "latest",
"@capacitor/ios": "^6.0.0",
"@capacitor/splash-screen": "latest",
"@journeyapps/wa-sqlite": "^0.2.0",
"@journeyapps/wa-sqlite": "^0.3.0",
"@powersync/react": "workspace:*",
"@powersync/web": "workspace:*",
"js-logger": "^1.6.1",
Expand Down
2 changes: 1 addition & 1 deletion demos/example-electron/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"dependencies": {
"@emotion/react": "^11.13.0",
"@emotion/styled": "^11.13.0",
"@journeyapps/wa-sqlite": "~0.2.0",
"@journeyapps/wa-sqlite": "^0.3.0",
"@mui/icons-material": "^5.15.16",
"@mui/material": "^5.15.16",
"@mui/x-data-grid": "^6.19.11",
Expand Down
2 changes: 1 addition & 1 deletion demos/example-nextjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"@emotion/react": "^11.11.4",
"@emotion/styled": "^11.11.5",
"@fontsource/roboto": "^5.0.13",
"@journeyapps/wa-sqlite": "~0.2.0",
"@journeyapps/wa-sqlite": "^0.3.0",
"@lexical/react": "^0.15.0",
"@mui/icons-material": "^5.15.18",
"@mui/material": "^5.15.18",
Expand Down
2 changes: 1 addition & 1 deletion demos/react-multi-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"test:build": "pnpm build"
},
"dependencies": {
"@journeyapps/wa-sqlite": "~0.2.0",
"@journeyapps/wa-sqlite": "^0.3.0",
"@powersync/react": "workspace:*",
"@powersync/web": "workspace:*",
"@supabase/supabase-js": "^2.43.1",
Expand Down
2 changes: 1 addition & 1 deletion demos/react-native-supabase-group-chat/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"dependencies": {
"@azure/core-asynciterator-polyfill": "^1.0.2",
"@faker-js/faker": "8.3.1",
"@journeyapps/react-native-quick-sqlite": "^1.1.7",
"@journeyapps/react-native-quick-sqlite": "^1.3.0",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*",
"@powersync/react-native": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion demos/react-native-supabase-todolist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"dependencies": {
"@azure/core-asynciterator-polyfill": "^1.0.2",
"@expo/vector-icons": "^14.0.0",
"@journeyapps/react-native-quick-sqlite": "^1.1.7",
"@journeyapps/react-native-quick-sqlite": "^1.3.0",
"@powersync/attachments": "workspace:*",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion demos/react-supabase-todolist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"@powersync/web": "workspace:*",
"@emotion/react": "11.11.4",
"@emotion/styled": "11.11.5",
"@journeyapps/wa-sqlite": "~0.2.0",
"@journeyapps/wa-sqlite": "^0.3.0",
"@mui/icons-material": "^5.15.12",
"@mui/material": "^5.15.12",
"@mui/x-data-grid": "^6.19.6",
Expand Down
2 changes: 1 addition & 1 deletion demos/yjs-react-supabase-text-collab/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"@fontsource/roboto": "^5.0.12",
"@powersync/react": "workspace:*",
"@powersync/web": "workspace:*",
"@journeyapps/wa-sqlite": "~0.1.1",
"@journeyapps/wa-sqlite": "^0.3.0",
"@lexical/react": "^0.11.3",
"@mui/icons-material": "^5.15.12",
"@mui/material": "^5.15.12",
Expand Down
67 changes: 41 additions & 26 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { Schema } from '../db/schema/Schema';
import { BaseObserver } from '../utils/BaseObserver';
import { ControlledExecutor } from '../utils/ControlledExecutor';
import { mutexRunExclusive } from '../utils/mutex';
import { quoteIdentifier } from '../utils/strings';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector';
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter';
Expand Down Expand Up @@ -292,21 +291,47 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected async initialize() {
await this._initialize();
await this.bucketStorageAdapter.init();
const version = await this.database.execute('SELECT powersync_rs_version()');
this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? '';
await this._loadVersion();
await this.updateSchema(this.options.schema);
await this.updateHasSynced();
await this.database.execute('PRAGMA RECURSIVE_TRIGGERS=TRUE');
this.ready = true;
this.iterateListeners((cb) => cb.initialized?.());
}

private async _loadVersion() {
try {
const { version } = await this.database.get<{ version: string }>('SELECT powersync_rs_version() as version');
this.sdkVersion = version;
} catch (e) {
throw new Error(`The powersync extension is not loaded correctly. Details: ${e.message}`);
}
let versionInts: number[];
try {
versionInts = this.sdkVersion!.split(/[.\/]/)
.slice(0, 3)
.map((n) => parseInt(n));
} catch (e) {
throw new Error(
`Unsupported powersync extension version. Need ^0.2.0, got: ${this.sdkVersion}. Details: ${e.message}`
);
}

// Validate ^0.2.0
if (versionInts[0] != 0 || versionInts[1] != 2 || versionInts[2] < 0) {
throw new Error(`Unsupported powersync extension version. Need ^0.2.0, got: ${this.sdkVersion}`);
}
}

protected async updateHasSynced() {
const result = await this.database.getOptional('SELECT 1 FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1');
const hasSynced = !!result;
const result = await this.database.get<{ synced_at: string | null }>(
'SELECT powersync_last_synced_at() as synced_at'
);
const hasSynced = result.synced_at != null;
const syncedAt = result.synced_at != null ? new Date(result.synced_at! + 'Z') : undefined;

if (hasSynced != this.currentStatus.hasSynced) {
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced });
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced, lastSyncedAt: syncedAt });
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
}
}
Expand Down Expand Up @@ -400,26 +425,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

// TODO DB name, verify this is necessary with extension
await this.database.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.OPLOG}`);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD}`);
await tx.execute(`DELETE FROM ${PSInternalTable.BUCKETS}`);
await tx.execute(`DELETE FROM ${PSInternalTable.UNTYPED}`);

const tableGlob = clearLocal ? 'ps_data_*' : 'ps_data__*';

const existingTableRows = await tx.execute(
`
SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?
`,
[tableGlob]
);

if (!existingTableRows.rows?.length) {
return;
}
for (const row of existingTableRows.rows._array) {
await tx.execute(`DELETE FROM ${quoteIdentifier(row.name)} WHERE 1`);
}
await tx.execute('SELECT powersync_clear(?)', [clearLocal ? 1 : 0]);
});

// The data has been deleted - reset the sync status
Expand Down Expand Up @@ -553,6 +559,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
});
}

/**
* Get an unique client id for this database.
*
* The id is not reset when the database is cleared, only when the database is deleted.
*/
async getClientId(): Promise<string> {
return this.bucketStorageAdapter.getClientId();
}

private async handleCrudCheckpoint(lastClientId: number, writeCheckpoint?: string) {
return this.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [lastClientId]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,9 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
forceCompact(): Promise<void>;

getMaxOpId(): string;

/**
* Get an unique client id.
*/
getClientId(): Promise<string>;
}
41 changes: 27 additions & 14 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
private pendingBucketDeletes: boolean;
private _hasCompletedSync: boolean;
private updateListener: () => void;
private _clientId?: Promise<string>;

/**
* Count up, and do a compact on startup.
Expand Down Expand Up @@ -62,9 +63,22 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
this.updateListener?.();
}

async _getClientId() {
const row = await this.db.get<{ client_id: string }>('SELECT powersync_client_id() as client_id');
return row['client_id'];
}

getClientId() {
if (this._clientId == null) {
this._clientId = this._getClientId();
}
return this._clientId!;
}

getMaxOpId() {
return MAX_OP_ID;
}

/**
* Reset any caches.
*/
Expand Down Expand Up @@ -103,9 +117,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
*/
private async deleteBucket(bucket: string) {
await this.writeTransaction(async (tx) => {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
['delete_bucket', bucket]);
await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', ['delete_bucket', bucket]);
});

this.logger.debug('done deleting bucket');
Expand All @@ -116,8 +128,8 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
if (this._hasCompletedSync) {
return true;
}
const r = await this.db.execute(`SELECT name, last_applied_op FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1`);
const completed = !!r.rows?.length;
const r = await this.db.get<{ synced_at: string | null }>(`SELECT powersync_last_synced_at() as synced_at`);
const completed = r.synced_at != null;
if (completed) {
this._hasCompletedSync = true;
}
Expand Down Expand Up @@ -219,12 +231,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
private async deletePendingBuckets() {
if (this.pendingBucketDeletes !== false) {
await this.writeTransaction(async (tx) => {
await tx.execute(
'DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)'
);
await tx.execute(
'DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op'
);
await tx.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', ['delete_pending_buckets', '']);
});
// Executed once after start-up, and again when there are pending deletes.
this.pendingBucketDeletes = false;
Expand Down Expand Up @@ -284,7 +291,9 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return false;
}

const response = await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [opId]);
const response = await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
opId
]);
this.logger.debug(['[updateLocalTarget] Response from updating target_op ', JSON.stringify(response)]);
return true;
});
Expand Down Expand Up @@ -333,10 +342,14 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
if (writeCheckpoint) {
const crudResult = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
if (crudResult.rows?.length) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
writeCheckpoint
]);
}
} else {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [this.getMaxOpId()]);
await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
this.getMaxOpId()
]);
}
});
}
Expand Down
19 changes: 17 additions & 2 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import type { BSON } from 'bson';
import { AbortOperation } from '../../../utils/AbortOperation';
import { Buffer } from 'buffer';

import { version as POWERSYNC_JS_VERSION } from '../../../../package.json';

export type BSONImplementation = typeof BSON;

export type RemoteConnector = {
Expand Down Expand Up @@ -109,6 +111,10 @@ export abstract class AbstractRemote {
return this.credentials;
}

getUserAgent() {
return `powersync-js/${POWERSYNC_JS_VERSION}`;
}

protected async buildRequest(path: string) {
const credentials = await this.getCredentials();
if (credentials != null && (credentials.endpoint == null || credentials.endpoint == '')) {
Expand All @@ -119,11 +125,14 @@ export abstract class AbstractRemote {
throw error;
}

const userAgent = this.getUserAgent();

return {
url: credentials.endpoint + path,
headers: {
'content-type': 'application/json',
Authorization: `Token ${credentials.token}`
Authorization: `Token ${credentials.token}`,
'x-user-agent': userAgent
}
};
}
Expand Down Expand Up @@ -207,6 +216,11 @@ export abstract class AbstractRemote {

const bson = await this.getBSON();

// Add the user agent in the setup payload - we can't set custom
// headers with websockets on web. The browser userAgent is however added
// automatically as a header.
const userAgent = this.getUserAgent();

const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url: this.options.socketUrlTransformer(request.url)
Expand All @@ -220,7 +234,8 @@ export abstract class AbstractRemote {
data: null,
metadata: Buffer.from(
bson.serialize({
token: request.headers.Authorization
token: request.headers.Authorization,
user_agent: userAgent
})
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ export abstract class AbstractStreamingSyncImplementation
}

async getWriteCheckpoint(): Promise<string> {
const response = await this.options.remote.get('/write-checkpoint2.json');
const clientId = await this.options.adapter.getClientId();
let path = `/write-checkpoint2.json?client_id=${clientId}`;
const response = await this.options.remote.get(path);
return response['data']['write_checkpoint'] as string;
}

Expand Down Expand Up @@ -456,6 +458,8 @@ The next upload iteration will be delayed.`);

let bucketSet = new Set<string>(initialBuckets.keys());

const clientId = await this.options.adapter.getClientId();

this.logger.debug('Requesting stream from server');

const syncOptions: SyncStreamOptions = {
Expand All @@ -465,7 +469,8 @@ The next upload iteration will be delayed.`);
buckets: req,
include_checksum: true,
raw_data: true,
parameters: resolvedOptions.params
parameters: resolvedOptions.params,
client_id: clientId
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export interface StreamingSyncRequest {
* Client parameters to be passed to the sync rules.
*/
parameters?: Record<string, StreamingSyncRequestParameterType>;

client_id?: string;
}

export interface StreamingSyncCheckpoint {
Expand Down
Loading
Loading