Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 77 additions & 5 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
/**
* Wait for the first sync operation to complete.
*
* @argument request Either an abort signal (after which the promise will complete regardless of
* @param request Either an abort signal (after which the promise will complete regardless of
* whether a full sync was completed) or an object providing an abort signal and a priority target.
* When a priority target is set, the promise may complete when all buckets with the given (or higher)
* priorities have been synchronized. This can be earlier than a complete sync.
Expand Down Expand Up @@ -540,7 +540,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

/**
* Get a batch of crud data to upload.
* Get a batch of CRUD data to upload.
*
* Returns null if there is no data to upload.
*
Expand All @@ -555,6 +555,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* This method does include transaction ids in the result, but does not group
* data by transaction. One batch may contain data from multiple transactions,
* and a single transaction may be split over multiple batches.
*
* @param limit Maximum number of CRUD entries to include in the batch
* @returns A batch of CRUD operations to upload, or null if there are none
*/
async getCrudBatch(limit: number = DEFAULT_CRUD_BATCH_LIMIT): Promise<CrudBatch | null> {
const result = await this.getAll<CrudEntryJSON>(
Expand Down Expand Up @@ -591,6 +594,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* Unlike {@link getCrudBatch}, this only returns data from a single transaction at a time.
* All data for the transaction is loaded into memory.
*
* @returns A transaction of CRUD operations to upload, or null if there are none
*/
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
return await this.readTransaction(async (tx) => {
Expand Down Expand Up @@ -628,6 +633,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Get an unique client id for this database.
*
* The id is not reset when the database is cleared, only when the database is deleted.
*
* @returns A unique identifier for the database instance
*/
async getClientId(): Promise<string> {
return this.bucketStorageAdapter.getClientId();
Expand All @@ -652,14 +659,27 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

/**
* Execute a write (INSERT/UPDATE/DELETE) query
* Execute a SQL write (INSERT/UPDATE/DELETE) query
* and optionally return results.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The query result as an object with structured key-value pairs
*/
async execute(sql: string, parameters?: any[]) {
await this.waitForReady();
return this.database.execute(sql, parameters);
}

/**
* Execute a SQL write (INSERT/UPDATE/DELETE) query directly on the database without any PowerSync processing.
* This bypasses certain PowerSync abstractions and is useful for direct database access.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The raw query result from the underlying database as a nested array of raw values, where each row is
* represented as an array of column values without field names.
*/
async executeRaw(sql: string, parameters?: any[]) {
await this.waitForReady();
return this.database.executeRaw(sql, parameters);
Expand All @@ -669,6 +689,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a write query (INSERT/UPDATE/DELETE) multiple times with each parameter set
* and optionally return results.
* This is faster than executing separately with each parameter set.
*
* @param sql The SQL query to execute
* @param parameters Optional 2D array of parameter sets, where each inner array is a set of parameters for one execution
* @returns The query result
*/
async executeBatch(sql: string, parameters?: any[][]) {
await this.waitForReady();
Expand All @@ -677,6 +701,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return results.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns An array of results
*/
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
await this.waitForReady();
Expand All @@ -685,6 +713,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The first result if found, or null if no results are returned
*/
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
await this.waitForReady();
Expand All @@ -693,6 +725,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The first result matching the query
* @throws Error if no rows are returned
*/
async get<T>(sql: string, parameters?: any[]): Promise<T> {
await this.waitForReady();
Expand Down Expand Up @@ -724,6 +761,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Open a read-only transaction.
* Read transactions can run concurrently to a write transaction.
* Changes from any write transaction are not visible to read transactions started before it.
*
* @param callback Function to execute within the transaction
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
* @returns The result of the callback
* @throws Error if the lock cannot be obtained within the timeout period
*/
async readTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
Expand All @@ -744,6 +786,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Open a read-write transaction.
* This takes a global lock - only one write transaction can execute against the database at a time.
* Statements within the transaction must be done on the provided {@link Transaction} interface.
*
* @param callback Function to execute within the transaction
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
* @returns The result of the callback
* @throws Error if the lock cannot be obtained within the timeout period
*/
async writeTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
Expand Down Expand Up @@ -818,6 +865,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
*
* Note that the `onChange` callback member of the handler is required.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @param handler Callbacks for handling results and errors
* @param options Options for configuring watch behavior
*/
watchWithCallback(sql: string, parameters?: any[], handler?: WatchHandler, options?: SQLWatchOptions): void {
const { onResult, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {};
Expand Down Expand Up @@ -863,6 +915,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a read query every time the source tables are modified.
* Use {@link SQLWatchOptions.throttleMs} to specify the minimum interval between queries.
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @param options Options for configuring watch behavior
* @returns An AsyncIterable that yields QueryResults whenever the data changes
*/
watchWithAsyncGenerator(sql: string, parameters?: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
return new EventIterator<QueryResult>((eventOptions) => {
Expand All @@ -883,6 +940,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
});
}

/**
* Resolves the list of tables that are used in a SQL query.
* If tables are specified in the options, those are used directly.
* Otherwise, analyzes the query using EXPLAIN to determine which tables are accessed.
*
* @param sql The SQL query to analyze
* @param parameters Optional parameters for the SQL query
* @param options Optional watch options that may contain explicit table list
* @returns Array of table names that the query depends on
*/
async resolveTables(sql: string, parameters?: any[], options?: SQLWatchOptions): Promise<string[]> {
const resolvedTables = options?.tables ? [...options.tables] : [];
if (!options?.tables) {
Expand Down Expand Up @@ -955,7 +1022,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* Note that the `onChange` callback member of the handler is required.
*
* Returns dispose function to stop watching.
* @param handler Callbacks for handling change events and errors
* @param options Options for configuring watch behavior
* @returns A dispose function to stop watching for changes
*/
onChangeWithCallback(handler?: WatchOnChangeHandler, options?: SQLWatchOptions): () => void {
const { onChange, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {};
Expand Down Expand Up @@ -1008,8 +1077,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* This is preferred over {@link watchWithAsyncGenerator} when multiple queries need to be performed
* together when data is changed.
*
* Note: do not declare this as `async *onChange` as it will not work in React Native.
*
* Note, do not declare this as `async *onChange` as it will not work in React Native
* @param options Options for configuring watch behavior
* @returns An AsyncIterable that yields change events whenever the specified tables change
*/
onChangeWithAsyncGenerator(options?: SQLWatchOptions): AsyncIterable<WatchOnChangeEvent> {
const resolvedOptions = options ?? {};
Expand Down
72 changes: 57 additions & 15 deletions packages/common/src/db/crud/SyncStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,50 @@ export class SyncStatus {
constructor(protected options: SyncStatusOptions) {}

/**
* true if currently connected.
* Indicates if the client is currently connected to the PowerSync service.
*
* @returns {boolean} True if connected, false otherwise. Defaults to false if not specified.
*/
get connected() {
return this.options.connected ?? false;
}

/**
* Indicates if the client is in the process of establishing a connection to the PowerSync service.
*
* @returns {boolean} True if connecting, false otherwise. Defaults to false if not specified.
*/
get connecting() {
return this.options.connecting ?? false;
}

/**
* Time that a last sync has fully completed, if any.
* Currently this is reset to null after a restart.
* Time that a last sync has fully completed, if any.
* This timestamp is reset to null after a restart of the PowerSync service.
*
* @returns {Date | undefined} The timestamp of the last successful sync, or undefined if no sync has completed.
*/
get lastSyncedAt() {
return this.options.lastSyncedAt;
}

/**
* Indicates whether there has been at least one full sync, if any.
* Is undefined when unknown, for example when state is still being loaded from the database.
* Indicates whether there has been at least one full sync completed since initialization.
*
* @returns {boolean | undefined} True if at least one sync has completed, false if no sync has completed,
* or undefined when the state is still being loaded from the database.
*/
get hasSynced() {
return this.options.hasSynced;
}

/**
* Upload/download status
* Provides the current data flow status regarding uploads and downloads.
*
* @returns {SyncDataFlowStatus} An object containing:
* - downloading: True if actively downloading changes (only when connected is also true)
* - uploading: True if actively uploading changes
* Defaults to {downloading: false, uploading: false} if not specified.
*/
get dataFlowStatus() {
return (
Expand All @@ -68,26 +84,33 @@ export class SyncStatus {
}

/**
* Partial sync status for involved bucket priorities.
* Provides sync status information for all bucket priorities, sorted by priority (highest first).
*
* @returns {SyncPriorityStatus[]} An array of status entries for different sync priority levels,
* sorted with highest priorities (lower numbers) first.
*/
get priorityStatusEntries() {
return (this.options.priorityStatusEntries ?? []).slice().sort(SyncStatus.comparePriorities);
}

/**
* Reports a pair of {@link SyncStatus#hasSynced} and {@link SyncStatus#lastSyncedAt} fields that apply
* to a specific bucket priority instead of the entire sync operation.
*
* Reports the sync status (a pair of {@link SyncStatus#hasSynced} and {@link SyncStatus#lastSyncedAt} fields)
* for a specific bucket priority level.
*
* When buckets with different priorities are declared, PowerSync may choose to synchronize higher-priority
* buckets first. When a consistent view over all buckets for all priorities up until the given priority is
* reached, PowerSync makes data from those buckets available before lower-priority buckets have finished
* synchronizing.
* When PowerSync makes data for a given priority available, all buckets in higher-priorities are guaranteed to
* be consistent with that checkpoint. For this reason, this method may also return the status for lower priorities.
* In a state where the PowerSync just finished synchronizing buckets in priority level 3, calling this method
* syncing.
*
* This method returns the status for the requested priority or the next higher priority level that has
* status information available. This is because when PowerSync makes data for a given priority available,
* all buckets in higher-priorities are guaranteed to be consistent with that checkpoint.
*
* For example, if PowerSync just finished synchronizing buckets in priority level 3, calling this method
* with a priority of 1 may return information for priority level 3.
*
* @param priority The bucket priority for which the status should be reported.
* @param {number} priority The bucket priority for which the status should be reported
* @returns {SyncPriorityStatus} Status information for the requested priority level or the next higher level with available status
*/
statusForPriority(priority: number): SyncPriorityStatus {
// priorityStatusEntries are sorted by ascending priorities (so higher numbers to lower numbers).
Expand All @@ -106,15 +129,34 @@ export class SyncStatus {
};
}

/**
* Compares this SyncStatus instance with another to determine if they are equal.
* Equality is determined by comparing the serialized JSON representation of both instances.
*
* @param {SyncStatus} status The SyncStatus instance to compare against
* @returns {boolean} True if the instances are considered equal, false otherwise
*/
isEqual(status: SyncStatus) {
return JSON.stringify(this.options) == JSON.stringify(status.options);
}

/**
* Creates a human-readable string representation of the current sync status.
* Includes information about connection state, sync completion, and data flow.
*
* @returns {string} A string representation of the sync status
*/
getMessage() {
const dataFlow = this.dataFlowStatus;
return `SyncStatus<connected: ${this.connected} connecting: ${this.connecting} lastSyncedAt: ${this.lastSyncedAt} hasSynced: ${this.hasSynced}. Downloading: ${dataFlow.downloading}. Uploading: ${dataFlow.uploading}`;
}

/**
* Serializes the SyncStatus instance to a plain object.
* This can be used for persistence, logging, or transmission.
*
* @returns {SyncStatusOptions} A plain object representation of the sync status
*/
toJSON(): SyncStatusOptions {
return {
connected: this.connected,
Expand Down