Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/friendly-chairs-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Support bucket priorities
63 changes: 51 additions & 12 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
UpdateNotification,
isBatchedUpdateNotification
} from '../db/DBAdapter.js';
import { SyncStatus } from '../db/crud/SyncStatus.js';
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
Expand Down Expand Up @@ -146,6 +146,11 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power
return typeof test == 'object' && isSQLOpenOptions(test.database);
};

/**
* The priority used by the core extension to indicate that a full sync was completed.
*/
const FULL_SYNC_PRIORITY = 2147483647;

export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
/**
* Transactions should be queued in the DBAdapter, but we also want to prevent
Expand Down Expand Up @@ -260,16 +265,30 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

/**
* Wait for the first sync operation to complete.
*
* @argument request Either an abort signal (after which the promise will complete regardless of
* whether a full sync was completed) or an object providing an abort signal and a priority target.
* When a priority target is set, the promise may complete when all buckets with the given (or higher)
* priorities have been synchronized. This can be earlier than a complete sync.
* @returns A promise which will resolve once the first full sync has completed.
*/
async waitForFirstSync(signal?: AbortSignal): Promise<void> {
if (this.currentStatus.hasSynced) {
async waitForFirstSync(request?: AbortSignal | { signal?: AbortSignal; priority?: number }): Promise<void> {
const signal = request instanceof AbortSignal ? request : request?.signal;
const priority = request && 'priority' in request ? request.priority : undefined;

const statusMatches =
priority === undefined
? (status: SyncStatus) => status.hasSynced
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;

if (statusMatches(this.currentStatus)) {
return;
}
return new Promise((resolve) => {
const dispose = this.registerListener({
statusChanged: (status) => {
if (status.hasSynced) {
if (statusMatches(status)) {
dispose();
resolve();
}
Expand Down Expand Up @@ -329,14 +348,33 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

protected async updateHasSynced() {
const result = await this.database.get<{ synced_at: string | null }>(
'SELECT powersync_last_synced_at() as synced_at'
const result = await this.database.getAll<{ priority: number; last_synced_at: string }>(
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC'
);
const hasSynced = result.synced_at != null;
const syncedAt = result.synced_at != null ? new Date(result.synced_at! + 'Z') : undefined;
let lastCompleteSync: Date | undefined;
const priorityStatuses: SyncPriorityStatus[] = [];

for (const { priority, last_synced_at } of result) {
const parsedDate = new Date(last_synced_at + 'Z');

if (priority == FULL_SYNC_PRIORITY) {
// This lowest-possible priority represents a complete sync.
lastCompleteSync = parsedDate;
} else {
priorityStatuses.push({ priority, hasSynced: true, lastSyncedAt: parsedDate });
}
}

const hasSynced = lastCompleteSync != null;
const updatedStatus = new SyncStatus({
...this.currentStatus.toJSON(),
hasSynced,
priorityStatuses,
lastSyncedAt: lastCompleteSync
});

if (hasSynced != this.currentStatus.hasSynced) {
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced, lastSyncedAt: syncedAt });
if (!updatedStatus.isEqual(this.currentStatus)) {
this.currentStatus = updatedStatus;
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
}
}
Expand Down Expand Up @@ -379,7 +417,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
return {
retryDelayMs: options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
retryDelayMs:
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
crudUploadThrottleMs:
options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs ?? DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};
Expand All @@ -401,7 +440,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
retryDelayMs,
crudUploadThrottleMs,
crudUploadThrottleMs
});
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
Expand Down
11 changes: 10 additions & 1 deletion packages/common/src/client/sync/bucket/BucketStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { CrudBatch } from './CrudBatch.js';
import { CrudEntry, OpId } from './CrudEntry.js';
import { SyncDataBatch } from './SyncDataBatch.js';

export interface BucketDescription {
name: string;
priority: number;
}

export interface Checkpoint {
last_op_id: OpId;
buckets: BucketChecksum[];
Expand All @@ -27,6 +32,7 @@ export interface SyncLocalDatabaseResult {

export interface BucketChecksum {
bucket: string;
priority?: number;
/**
* 32-bit unsigned hash.
*/
Expand Down Expand Up @@ -60,7 +66,10 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener

getBucketStates(): Promise<BucketState[]>;

syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
syncLocalDatabase(
checkpoint: Checkpoint,
priority?: number
): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;

nextCrudItem(): Promise<CrudEntry | undefined>;
hasCrud(): Promise<boolean>;
Expand Down
49 changes: 39 additions & 10 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapt
import { BaseObserver } from '../../../utils/BaseObserver.js';
import { MAX_OP_ID } from '../../constants.js';
import {
BucketChecksum,
BucketState,
BucketStorageAdapter,
BucketStorageListener,
Expand Down Expand Up @@ -135,8 +136,8 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return completed;
}

async syncLocalDatabase(checkpoint: Checkpoint): Promise<SyncLocalDatabaseResult> {
const r = await this.validateChecksums(checkpoint);
async syncLocalDatabase(checkpoint: Checkpoint, priority?: number): Promise<SyncLocalDatabaseResult> {
const r = await this.validateChecksums(checkpoint, priority);
if (!r.checkpointValid) {
this.logger.error('Checksums failed for', r.checkpointFailures);
for (const b of r.checkpointFailures ?? []) {
Expand All @@ -145,19 +146,23 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures };
}

const bucketNames = checkpoint.buckets.map((b) => b.bucket);
const buckets = checkpoint.buckets;
if (priority !== undefined) {
buckets.filter((b) => hasMatchingPriority(priority, b));
}
const bucketNames = buckets.map((b) => b.bucket);
await this.writeTransaction(async (tx) => {
await tx.execute(`UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))`, [
checkpoint.last_op_id,
JSON.stringify(bucketNames)
]);

if (checkpoint.write_checkpoint) {
if (priority == null && checkpoint.write_checkpoint) {
await tx.execute("UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", [checkpoint.write_checkpoint]);
}
});

const valid = await this.updateObjectsFromBuckets(checkpoint);
const valid = await this.updateObjectsFromBuckets(checkpoint, priority);
if (!valid) {
this.logger.debug('Not at a consistent checkpoint - cannot update local db');
return { ready: false, checkpointValid: true };
Expand All @@ -176,21 +181,41 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
*
* This includes creating new tables, dropping old tables, and copying data over from the oplog.
*/
private async updateObjectsFromBuckets(checkpoint: Checkpoint) {
private async updateObjectsFromBuckets(checkpoint: Checkpoint, priority: number | undefined) {
let arg = '';
if (priority !== undefined) {
const affectedBuckets: string[] = [];
for (const desc of checkpoint.buckets) {
if (hasMatchingPriority(priority, desc)) {
affectedBuckets.push(desc.bucket);
}
}

arg = JSON.stringify({ priority, buckets: affectedBuckets });
}

return this.writeTransaction(async (tx) => {
const { insertId: result } = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
'sync_local',
''
arg
]);
return result == 1;
});
}

async validateChecksums(checkpoint: Checkpoint): Promise<SyncLocalDatabaseResult> {
const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [JSON.stringify(checkpoint)]);
async validateChecksums(checkpoint: Checkpoint, priority: number | undefined): Promise<SyncLocalDatabaseResult> {
if (priority !== undefined) {
// Only validate the buckets within the priority we care about
const newBuckets = checkpoint.buckets.filter((cs) => hasMatchingPriority(priority, cs));
checkpoint = { ...checkpoint, buckets: newBuckets };
}

const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [
JSON.stringify({ ...checkpoint })
]);

const resultItem = rs.rows?.item(0);
this.logger.debug('validateChecksums result item', resultItem);
this.logger.debug('validateChecksums priority, checkpoint, result item', priority, checkpoint, resultItem);
if (!resultItem) {
return {
checkpointValid: false,
Expand Down Expand Up @@ -367,3 +392,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
// No-op for now
}
}

function hasMatchingPriority(priority: number, bucket: BucketChecksum) {
return bucket.priority != null && bucket.priority <= priority;
}
Loading