Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/friendly-chairs-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Support bucket priorities
58 changes: 46 additions & 12 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
UpdateNotification,
isBatchedUpdateNotification
} from '../db/DBAdapter.js';
import { SyncStatus } from '../db/crud/SyncStatus.js';
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
Expand Down Expand Up @@ -260,16 +260,30 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

/**
* Wait for the first sync operation to complete.
*
* @argument request Either an abort signal (after which the promise will complete regardless of
* whether a full sync was completed) or an object providing an abort signal and a priority target.
* When a priority target is set, the promise may complete when all buckets with the given (or higher)
* priorities have been synchronized. This can be earlier than a complete sync.
* @returns A promise which will resolve once the first full sync has completed.
*/
async waitForFirstSync(signal?: AbortSignal): Promise<void> {
if (this.currentStatus.hasSynced) {
async waitForFirstSync(request?: AbortSignal | { signal?: AbortSignal; priority?: number }): Promise<void> {
const signal = request instanceof AbortSignal ? request : request?.signal;
const priority = request && 'priority' in request ? request.priority : undefined;

const statusMatches =
priority === undefined
? (status: SyncStatus) => status.hasSynced
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;

if (statusMatches(this.currentStatus)) {
return;
}
return new Promise((resolve) => {
const dispose = this.registerListener({
statusChanged: (status) => {
if (status.hasSynced) {
if (statusMatches(status)) {
dispose();
resolve();
}
Expand Down Expand Up @@ -329,14 +343,31 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

protected async updateHasSynced() {
const result = await this.database.get<{ synced_at: string | null }>(
'SELECT powersync_last_synced_at() as synced_at'
const result = await this.database.getAll<{ priority: number; last_synced_at: string }>(
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC'
);
const hasSynced = result.synced_at != null;
const syncedAt = result.synced_at != null ? new Date(result.synced_at! + 'Z') : undefined;
let lastCompleteSync: Date | undefined;
const priorityStatus: SyncPriorityStatus[] = [];

if (hasSynced != this.currentStatus.hasSynced) {
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced, lastSyncedAt: syncedAt });
for (const { priority, last_synced_at } of result) {
const parsedDate = new Date(last_synced_at + 'Z');

if (priority === 2147483647) {
// This lowest-possible priority represents a complete sync.
lastCompleteSync = parsedDate;
} else {
priorityStatus.push({ priority, hasSynced: true, lastSyncedAt: parsedDate });
}
}

const hasSynced = lastCompleteSync != null;
if (hasSynced != this.currentStatus.hasSynced || priorityStatus != this.currentStatus.statusInPriority) {
this.currentStatus = new SyncStatus({
...this.currentStatus.toJSON(),
hasSynced,
lastSyncedAt: lastCompleteSync,
statusInPriority: priorityStatus
});
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
}
}
Expand Down Expand Up @@ -379,7 +410,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
return {
retryDelayMs: options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
retryDelayMs:
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
crudUploadThrottleMs:
options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs ?? DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};
Expand All @@ -401,7 +433,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
retryDelayMs,
crudUploadThrottleMs,
crudUploadThrottleMs
});
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
Expand Down Expand Up @@ -448,6 +480,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// TODO DB name, verify this is necessary with extension
await this.database.writeTransaction(async (tx) => {
await tx.execute('SELECT powersync_clear(?)', [clearLocal ? 1 : 0]);
// TODO: Remove after updating core extension to include https://github.com/powersync-ja/powersync-sqlite-core/pull/61
await tx.execute('DELETE FROM ps_sync_state;');
});

// The data has been deleted - reset the sync status
Expand Down
11 changes: 10 additions & 1 deletion packages/common/src/client/sync/bucket/BucketStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { CrudBatch } from './CrudBatch.js';
import { CrudEntry, OpId } from './CrudEntry.js';
import { SyncDataBatch } from './SyncDataBatch.js';

export interface BucketDescription {
name: string;
priority: number;
}

export interface Checkpoint {
last_op_id: OpId;
buckets: BucketChecksum[];
Expand All @@ -27,6 +32,7 @@ export interface SyncLocalDatabaseResult {

export interface BucketChecksum {
bucket: string;
priority: number;
/**
* 32-bit unsigned hash.
*/
Expand Down Expand Up @@ -60,7 +66,10 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener

getBucketStates(): Promise<BucketState[]>;

syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
syncLocalDatabase(
checkpoint: Checkpoint,
priority?: number
): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;

nextCrudItem(): Promise<CrudEntry | undefined>;
hasCrud(): Promise<boolean>;
Expand Down
44 changes: 34 additions & 10 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return completed;
}

async syncLocalDatabase(checkpoint: Checkpoint): Promise<SyncLocalDatabaseResult> {
const r = await this.validateChecksums(checkpoint);
async syncLocalDatabase(checkpoint: Checkpoint, priority?: number): Promise<SyncLocalDatabaseResult> {
const r = await this.validateChecksums(checkpoint, priority);
if (!r.checkpointValid) {
this.logger.error('Checksums failed for', r.checkpointFailures);
for (const b of r.checkpointFailures ?? []) {
Expand All @@ -145,19 +145,23 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures };
}

const bucketNames = checkpoint.buckets.map((b) => b.bucket);
const buckets = checkpoint.buckets;
if (priority !== undefined) {
buckets.filter((b) => b.priority <= priority);
}
const bucketNames = buckets.map((b) => b.bucket);
await this.writeTransaction(async (tx) => {
await tx.execute(`UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))`, [
checkpoint.last_op_id,
JSON.stringify(bucketNames)
]);

if (checkpoint.write_checkpoint) {
if (priority == null && checkpoint.write_checkpoint) {
await tx.execute("UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", [checkpoint.write_checkpoint]);
}
});

const valid = await this.updateObjectsFromBuckets(checkpoint);
const valid = await this.updateObjectsFromBuckets(checkpoint, priority);
if (!valid) {
this.logger.debug('Not at a consistent checkpoint - cannot update local db');
return { ready: false, checkpointValid: true };
Expand All @@ -176,21 +180,41 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
*
* This includes creating new tables, dropping old tables, and copying data over from the oplog.
*/
private async updateObjectsFromBuckets(checkpoint: Checkpoint) {
private async updateObjectsFromBuckets(checkpoint: Checkpoint, priority: number | undefined) {
let arg = '';
if (priority !== undefined) {
const affectedBuckets: string[] = [];
for (const desc of checkpoint.buckets) {
if (desc.priority <= priority) {
affectedBuckets.push(desc.bucket);
}
}

arg = JSON.stringify({ priority, buckets: affectedBuckets });
}

return this.writeTransaction(async (tx) => {
const { insertId: result } = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
'sync_local',
''
arg
]);
return result == 1;
});
}

async validateChecksums(checkpoint: Checkpoint): Promise<SyncLocalDatabaseResult> {
const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [JSON.stringify(checkpoint)]);
async validateChecksums(checkpoint: Checkpoint, priority: number | undefined): Promise<SyncLocalDatabaseResult> {
if (priority !== undefined) {
// Only validate the buckets within the priority we care about
const newBuckets = checkpoint.buckets.filter((cs) => cs.priority <= priority);
checkpoint = {...checkpoint, buckets: newBuckets};
}

const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [
JSON.stringify({ ...checkpoint })
]);

const resultItem = rs.rows?.item(0);
this.logger.debug('validateChecksums result item', resultItem);
this.logger.debug('validateChecksums priority, checkpoint, result item', priority, checkpoint, resultItem);
if (!resultItem) {
return {
checkpointValid: false,
Expand Down
Loading
Loading