diff --git a/.changeset/fuzzy-buses-own.md b/.changeset/fuzzy-buses-own.md new file mode 100644 index 000000000..a89344e9d --- /dev/null +++ b/.changeset/fuzzy-buses-own.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': minor +--- + +Add `clientImplementation` field to `SyncStatus`. diff --git a/.changeset/large-toes-drive.md b/.changeset/large-toes-drive.md new file mode 100644 index 000000000..a13eb4992 --- /dev/null +++ b/.changeset/large-toes-drive.md @@ -0,0 +1,5 @@ +--- +'@powersync/diagnostics-app': patch +--- + +Support Rust client. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 113148b42..ef282030b 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -632,8 +632,10 @@ The next upload iteration will be delayed.`); ...DEFAULT_STREAM_CONNECTION_OPTIONS, ...(options ?? {}) }; + const clientImplementation = resolvedOptions.clientImplementation; + this.updateSyncStatus({ clientImplementation }); - if (resolvedOptions.clientImplementation == SyncClientImplementation.JAVASCRIPT) { + if (clientImplementation == SyncClientImplementation.JAVASCRIPT) { await this.legacyStreamingSyncIteration(signal, resolvedOptions); } else { await this.requireKeyFormat(true); @@ -1168,7 +1170,8 @@ The next upload iteration will be delayed.`); ...this.syncStatus.dataFlowStatus, ...options.dataFlow }, - priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries + priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries, + clientImplementation: options.clientImplementation ?? this.syncStatus.clientImplementation }); if (!this.syncStatus.isEqual(updatedStatus)) { diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index 775c9fa36..4c687e18d 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -1,3 +1,4 @@ +import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js'; import { InternalProgressInformation, SyncProgress } from './SyncProgress.js'; export type SyncDataFlowStatus = Partial<{ @@ -35,11 +36,22 @@ export type SyncStatusOptions = { lastSyncedAt?: Date; hasSynced?: boolean; priorityStatusEntries?: SyncPriorityStatus[]; + clientImplementation?: SyncClientImplementation; }; export class SyncStatus { constructor(protected options: SyncStatusOptions) {} + /** + * Returns the used sync client implementation (either the one implemented in JavaScript or the newer Rust-based + * implementation). + * + * This information is only available after a connection has been requested. + */ + get clientImplementation() { + return this.options.clientImplementation; + } + /** * Indicates if the client is currently connected to the PowerSync service. * diff --git a/tools/diagnostics-app/src/app/views/layout.tsx b/tools/diagnostics-app/src/app/views/layout.tsx index f018cffdd..51e0526cf 100644 --- a/tools/diagnostics-app/src/app/views/layout.tsx +++ b/tools/diagnostics-app/src/app/views/layout.tsx @@ -155,6 +155,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) {title} + {syncStatus?.clientImplementation && Client: {syncStatus?.clientImplementation}} = (props) => - initialValues={{ token: '', endpoint: '' }} + initialValues={{ token: '', endpoint: '', clientImplementation: SyncClientImplementation.RUST }} validateOnChange={false} validateOnBlur={false} validate={(values) => { @@ -44,7 +57,8 @@ export const LoginDetailsWidget: React.FC = (props) => } await props.onSubmit({ token: values.token, - endpoint + endpoint, + clientImplementation: values.clientImplementation }); } catch (ex: any) { console.error(ex); @@ -52,7 +66,7 @@ export const LoginDetailsWidget: React.FC = (props) => setFieldError('endpoint', ex.message); } }}> - {({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit }) => ( + {({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit, setFieldValue }) => (
= (props) => /> + + setFieldValue( + 'clientImplementation', + values.clientImplementation == SyncClientImplementation.RUST + ? SyncClientImplementation.JAVASCRIPT + : SyncClientImplementation.RUST + ) + } + /> + } + label={ + + Rust sync client ( + + what's that? + + ) + + } + /> + @@ -143,7 +185,7 @@ namespace S { margin-top: 20px; width: 100%; display: flex; - justify-content: end; + justify-content: space-between; `; export const TextInput = styled(TextField)` diff --git a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts index 2db1926f5..661c4b09b 100644 --- a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts +++ b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts @@ -3,6 +3,7 @@ import { createBaseLogger, LogLevel, PowerSyncDatabase, + SyncClientImplementation, TemporaryStorageOption, WASQLiteOpenFactory, WASQLiteVFS, @@ -14,6 +15,7 @@ import { safeParse } from '../safeParse/safeParse'; import { DynamicSchemaManager } from './DynamicSchemaManager'; import { RecordingStorageAdapter } from './RecordingStorageAdapter'; import { TokenConnector } from './TokenConnector'; +import { RustClientInterceptor } from './RustClientInterceptor'; const baseLogger = createBaseLogger(); baseLogger.useDefaults(); @@ -57,9 +59,19 @@ if (connector.hasCredentials()) { } export async function connect() { + const client = + localStorage.getItem('preferred_client_implementation') == SyncClientImplementation.RUST + ? SyncClientImplementation.RUST + : SyncClientImplementation.JAVASCRIPT; + const params = getParams(); await sync?.disconnect(); const remote = new WebRemote(connector); + const adapter = + client == SyncClientImplementation.JAVASCRIPT + ? new RecordingStorageAdapter(db.database, schemaManager) + : new RustClientInterceptor(db.database, remote, schemaManager); + const syncOptions: WebStreamingSyncImplementationOptions = { adapter, remote, @@ -69,7 +81,7 @@ export async function connect() { identifier: 'diagnostics' }; sync = new WebStreamingSyncImplementation(syncOptions); - await sync.connect({ params }); + await sync.connect({ params, clientImplementation: client }); if (!sync.syncStatus.connected) { const error = sync.syncStatus.dataFlowStatus.downloadError ?? new Error('Failed to connect'); // Disconnect but don't wait for it diff --git a/tools/diagnostics-app/src/library/powersync/RustClientInterceptor.ts b/tools/diagnostics-app/src/library/powersync/RustClientInterceptor.ts new file mode 100644 index 000000000..97d636271 --- /dev/null +++ b/tools/diagnostics-app/src/library/powersync/RustClientInterceptor.ts @@ -0,0 +1,126 @@ +import { + AbstractPowerSyncDatabase, + AbstractRemote, + BucketChecksum, + Checkpoint, + ColumnType, + DBAdapter, + isStreamingSyncCheckpoint, + isStreamingSyncCheckpointDiff, + isStreamingSyncData, + PowerSyncControlCommand, + SqliteBucketStorage, + StreamingSyncLine, + SyncDataBucket +} from '@powersync/web'; +import { DynamicSchemaManager } from './DynamicSchemaManager'; + +/** + * Tracks per-byte and per-operation progress for the Rust client. + * + * While per-operation progress is reported by the SDK as well, we need those counters for each bucket. Since that + * information is internal to the Rust client and inaccessible to JavaScript, this intercepts the raw + * `powersync_control` calls to decode sync lines and derive progress information. + */ +export class RustClientInterceptor extends SqliteBucketStorage { + private rdb: DBAdapter; + private lastStartedCheckpoint: Checkpoint | null = null; + + public tables: Record> = {}; + + constructor( + db: DBAdapter, + private remote: AbstractRemote, + private schemaManager: DynamicSchemaManager + ) { + super(db, (AbstractPowerSyncDatabase as any).transactionMutex); + this.rdb = db; + } + + async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise { + const response = await super.control(op, payload); + + if (op == PowerSyncControlCommand.PROCESS_TEXT_LINE) { + await this.processTextLine(payload as string); + } else if (op == PowerSyncControlCommand.PROCESS_BSON_LINE) { + await this.processBinaryLine(payload as Uint8Array); + } + + return response; + } + + private processTextLine(line: string) { + return this.processParsedLine(JSON.parse(line)); + } + + private async processBinaryLine(line: Uint8Array) { + const bson = await this.remote.getBSON(); + await this.processParsedLine(bson.deserialize(line) as StreamingSyncLine); + } + + private async processParsedLine(line: StreamingSyncLine) { + if (isStreamingSyncCheckpoint(line)) { + this.lastStartedCheckpoint = line.checkpoint; + await this.trackCheckpoint(line.checkpoint); + } else if (isStreamingSyncCheckpointDiff(line) && this.lastStartedCheckpoint) { + const diff = line.checkpoint_diff; + const newBuckets = new Map(); + for (const checksum of this.lastStartedCheckpoint.buckets) { + newBuckets.set(checksum.bucket, checksum); + } + for (const checksum of diff.updated_buckets) { + newBuckets.set(checksum.bucket, checksum); + } + for (const bucket of diff.removed_buckets) { + newBuckets.delete(bucket); + } + + const newCheckpoint: Checkpoint = { + last_op_id: diff.last_op_id, + buckets: [...newBuckets.values()], + write_checkpoint: diff.write_checkpoint + }; + this.lastStartedCheckpoint = newCheckpoint; + await this.trackCheckpoint(newCheckpoint); + } else if (isStreamingSyncData(line)) { + const batch = { buckets: [SyncDataBucket.fromRow(line.data)] }; + + await this.rdb.writeTransaction(async (tx) => { + for (const bucket of batch.buckets) { + // Record metrics + const size = JSON.stringify(bucket.data).length; + await tx.execute( + `UPDATE local_bucket_data SET + download_size = IFNULL(download_size, 0) + ?, + last_op = ?, + downloading = ?, + downloaded_operations = IFNULL(downloaded_operations, 0) + ? + WHERE id = ?`, + [size, bucket.next_after, bucket.has_more, bucket.data.length, bucket.bucket] + ); + } + }); + + await this.schemaManager.updateFromOperations(batch); + } + } + + private async trackCheckpoint(checkpoint: Checkpoint) { + await this.rdb.writeTransaction(async (tx) => { + for (const bucket of checkpoint.buckets) { + await tx.execute( + `INSERT OR REPLACE INTO local_bucket_data(id, total_operations, last_op, download_size, downloading, downloaded_operations) + VALUES ( + ?, + ?, + IFNULL((SELECT last_op FROM local_bucket_data WHERE id = ?), '0'), + IFNULL((SELECT download_size FROM local_bucket_data WHERE id = ?), 0), + IFNULL((SELECT downloading FROM local_bucket_data WHERE id = ?), TRUE), + IFNULL((SELECT downloaded_operations FROM local_bucket_data WHERE id = ?), TRUE) + )`, + [bucket.bucket, bucket.count, bucket.bucket, bucket.bucket, bucket.bucket, bucket.bucket] + ); + } + }); + } +} diff --git a/tools/diagnostics-app/src/library/powersync/TokenConnector.ts b/tools/diagnostics-app/src/library/powersync/TokenConnector.ts index 49055d0f6..1b088cafc 100644 --- a/tools/diagnostics-app/src/library/powersync/TokenConnector.ts +++ b/tools/diagnostics-app/src/library/powersync/TokenConnector.ts @@ -1,5 +1,6 @@ import { AbstractPowerSyncDatabase, PowerSyncBackendConnector } from '@powersync/web'; import { connect } from './ConnectionManager'; +import { LoginDetailsFormValues } from '@/components/widgets/LoginDetailsWidget'; export interface Credentials { token: string; @@ -21,11 +22,12 @@ export class TokenConnector implements PowerSyncBackendConnector { await tx?.complete(); } - async signIn(credentials: Credentials) { + async signIn(credentials: LoginDetailsFormValues) { validateSecureContext(credentials.endpoint); checkJWT(credentials.token); try { localStorage.setItem('powersync_credentials', JSON.stringify(credentials)); + localStorage.setItem('preferred_client_implementation', credentials.clientImplementation); await connect(); } catch (e) { this.clearCredentials(); @@ -39,6 +41,7 @@ export class TokenConnector implements PowerSyncBackendConnector { clearCredentials() { localStorage.removeItem('powersync_credentials'); + localStorage.removeItem('preferred_client_implementation'); } }