Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fuzzy-buses-own.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Add `clientImplementation` field to `SyncStatus`.
5 changes: 5 additions & 0 deletions .changeset/large-toes-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/diagnostics-app': patch
---

Support Rust client.
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,10 @@ The next upload iteration will be delayed.`);
...DEFAULT_STREAM_CONNECTION_OPTIONS,
...(options ?? {})
};
const clientImplementation = resolvedOptions.clientImplementation;
this.updateSyncStatus({ clientImplementation });

if (resolvedOptions.clientImplementation == SyncClientImplementation.JAVASCRIPT) {
if (clientImplementation == SyncClientImplementation.JAVASCRIPT) {
await this.legacyStreamingSyncIteration(signal, resolvedOptions);
} else {
await this.requireKeyFormat(true);
Expand Down Expand Up @@ -1168,7 +1170,8 @@ The next upload iteration will be delayed.`);
...this.syncStatus.dataFlowStatus,
...options.dataFlow
},
priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries
priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries,
clientImplementation: options.clientImplementation ?? this.syncStatus.clientImplementation
});

if (!this.syncStatus.isEqual(updatedStatus)) {
Expand Down
12 changes: 12 additions & 0 deletions packages/common/src/db/crud/SyncStatus.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js';
import { InternalProgressInformation, SyncProgress } from './SyncProgress.js';

export type SyncDataFlowStatus = Partial<{
Expand Down Expand Up @@ -35,11 +36,22 @@ export type SyncStatusOptions = {
lastSyncedAt?: Date;
hasSynced?: boolean;
priorityStatusEntries?: SyncPriorityStatus[];
clientImplementation?: SyncClientImplementation;
};

export class SyncStatus {
constructor(protected options: SyncStatusOptions) {}

/**
* Returns the used sync client implementation (either the one implemented in JavaScript or the newer Rust-based
* implementation).
*
* This information is only available after a connection has been requested.
*/
get clientImplementation() {
return this.options.clientImplementation;
}

/**
* Indicates if the client is currently connected to the PowerSync service.
*
Expand Down
1 change: 1 addition & 0 deletions tools/diagnostics-app/src/app/views/layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
<Box sx={{ flexGrow: 1 }}>
<Typography>{title}</Typography>
</Box>
{syncStatus?.clientImplementation && <Typography>Client: {syncStatus?.clientImplementation}</Typography>}
<NorthIcon
sx={{ marginRight: '-10px' }}
color={syncStatus?.dataFlowStatus.uploading ? 'primary' : 'inherit'}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
import React from 'react';
import { Box, Button, ButtonGroup, FormGroup, Paper, TextField, Typography, styled } from '@mui/material';
import {
Box,
Button,
ButtonGroup,
FormControlLabel,
FormGroup,
Paper,
Switch,
TextField,
Typography,
styled
} from '@mui/material';
import { Formik, FormikErrors } from 'formik';
import { SyncClientImplementation } from '@powersync/web';

export type LoginDetailsFormValues = {
token: string;
endpoint: string;
clientImplementation: SyncClientImplementation;
};

export type LoginAction = {
Expand All @@ -25,7 +38,7 @@ export const LoginDetailsWidget: React.FC<LoginDetailsWidgetProps> = (props) =>
<S.Logo alt="PowerSync Logo" width={400} height={100} src="/powersync-logo.svg" />
</S.LogoBox>
<Formik<LoginDetailsFormValues>
initialValues={{ token: '', endpoint: '' }}
initialValues={{ token: '', endpoint: '', clientImplementation: SyncClientImplementation.RUST }}
validateOnChange={false}
validateOnBlur={false}
validate={(values) => {
Expand All @@ -44,15 +57,16 @@ export const LoginDetailsWidget: React.FC<LoginDetailsWidgetProps> = (props) =>
}
await props.onSubmit({
token: values.token,
endpoint
endpoint,
clientImplementation: values.clientImplementation
});
} catch (ex: any) {
console.error(ex);
setSubmitting(false);
setFieldError('endpoint', ex.message);
}
}}>
{({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit }) => (
{({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit, setFieldValue }) => (
<form onSubmit={handleSubmit}>
<FormGroup>
<S.TextInput
Expand Down Expand Up @@ -84,6 +98,34 @@ export const LoginDetailsWidget: React.FC<LoginDetailsWidgetProps> = (props) =>
/>
</FormGroup>
<S.ActionButtonGroup>
<FormControlLabel
control={
<Switch
checked={values.clientImplementation == SyncClientImplementation.RUST}
onChange={() =>
setFieldValue(
'clientImplementation',
values.clientImplementation == SyncClientImplementation.RUST
? SyncClientImplementation.JAVASCRIPT
: SyncClientImplementation.RUST
)
}
/>
}
label={
<span>
Rust sync client (
<a
style={{ color: 'lightblue' }}
target="_blank"
href="https://releases.powersync.com/announcements/improved-sync-performance-in-our-client-sdks">
what's that?
</a>
)
</span>
}
/>

<Button variant="outlined" type="submit" disabled={isSubmitting}>
Proceed
</Button>
Expand Down Expand Up @@ -143,7 +185,7 @@ namespace S {
margin-top: 20px;
width: 100%;
display: flex;
justify-content: end;
justify-content: space-between;
`;

export const TextInput = styled(TextField)`
Expand Down
14 changes: 13 additions & 1 deletion tools/diagnostics-app/src/library/powersync/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
createBaseLogger,
LogLevel,
PowerSyncDatabase,
SyncClientImplementation,
TemporaryStorageOption,
WASQLiteOpenFactory,
WASQLiteVFS,
Expand All @@ -14,6 +15,7 @@ import { safeParse } from '../safeParse/safeParse';
import { DynamicSchemaManager } from './DynamicSchemaManager';
import { RecordingStorageAdapter } from './RecordingStorageAdapter';
import { TokenConnector } from './TokenConnector';
import { RustClientInterceptor } from './RustClientInterceptor';

const baseLogger = createBaseLogger();
baseLogger.useDefaults();
Expand Down Expand Up @@ -57,9 +59,19 @@ if (connector.hasCredentials()) {
}

export async function connect() {
const client =
localStorage.getItem('preferred_client_implementation') == SyncClientImplementation.RUST
? SyncClientImplementation.RUST
: SyncClientImplementation.JAVASCRIPT;

const params = getParams();
await sync?.disconnect();
const remote = new WebRemote(connector);
const adapter =
client == SyncClientImplementation.JAVASCRIPT
? new RecordingStorageAdapter(db.database, schemaManager)
: new RustClientInterceptor(db.database, remote, schemaManager);

const syncOptions: WebStreamingSyncImplementationOptions = {
adapter,
remote,
Expand All @@ -69,7 +81,7 @@ export async function connect() {
identifier: 'diagnostics'
};
sync = new WebStreamingSyncImplementation(syncOptions);
await sync.connect({ params });
await sync.connect({ params, clientImplementation: client });
if (!sync.syncStatus.connected) {
const error = sync.syncStatus.dataFlowStatus.downloadError ?? new Error('Failed to connect');
// Disconnect but don't wait for it
Expand Down
126 changes: 126 additions & 0 deletions tools/diagnostics-app/src/library/powersync/RustClientInterceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {
AbstractPowerSyncDatabase,
AbstractRemote,
BucketChecksum,
Checkpoint,
ColumnType,
DBAdapter,
isStreamingSyncCheckpoint,
isStreamingSyncCheckpointDiff,
isStreamingSyncData,
PowerSyncControlCommand,
SqliteBucketStorage,
StreamingSyncLine,
SyncDataBucket
} from '@powersync/web';
import { DynamicSchemaManager } from './DynamicSchemaManager';

/**
* Tracks per-byte and per-operation progress for the Rust client.
*
* While per-operation progress is reported by the SDK as well, we need those counters for each bucket. Since that
* information is internal to the Rust client and inaccessible to JavaScript, this intercepts the raw
* `powersync_control` calls to decode sync lines and derive progress information.
*/
export class RustClientInterceptor extends SqliteBucketStorage {
private rdb: DBAdapter;
private lastStartedCheckpoint: Checkpoint | null = null;

public tables: Record<string, Record<string, ColumnType>> = {};

constructor(
db: DBAdapter,
private remote: AbstractRemote,
private schemaManager: DynamicSchemaManager
) {
super(db, (AbstractPowerSyncDatabase as any).transactionMutex);
this.rdb = db;
}

async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
const response = await super.control(op, payload);

if (op == PowerSyncControlCommand.PROCESS_TEXT_LINE) {
await this.processTextLine(payload as string);
} else if (op == PowerSyncControlCommand.PROCESS_BSON_LINE) {
await this.processBinaryLine(payload as Uint8Array);
}

return response;
}

private processTextLine(line: string) {
return this.processParsedLine(JSON.parse(line));
}

private async processBinaryLine(line: Uint8Array) {
const bson = await this.remote.getBSON();
await this.processParsedLine(bson.deserialize(line) as StreamingSyncLine);
}

private async processParsedLine(line: StreamingSyncLine) {
if (isStreamingSyncCheckpoint(line)) {
this.lastStartedCheckpoint = line.checkpoint;
await this.trackCheckpoint(line.checkpoint);
} else if (isStreamingSyncCheckpointDiff(line) && this.lastStartedCheckpoint) {
const diff = line.checkpoint_diff;
const newBuckets = new Map<string, BucketChecksum>();
for (const checksum of this.lastStartedCheckpoint.buckets) {
newBuckets.set(checksum.bucket, checksum);
}
for (const checksum of diff.updated_buckets) {
newBuckets.set(checksum.bucket, checksum);
}
for (const bucket of diff.removed_buckets) {
newBuckets.delete(bucket);
}

const newCheckpoint: Checkpoint = {
last_op_id: diff.last_op_id,
buckets: [...newBuckets.values()],
write_checkpoint: diff.write_checkpoint
};
this.lastStartedCheckpoint = newCheckpoint;
await this.trackCheckpoint(newCheckpoint);
} else if (isStreamingSyncData(line)) {
const batch = { buckets: [SyncDataBucket.fromRow(line.data)] };

await this.rdb.writeTransaction(async (tx) => {
for (const bucket of batch.buckets) {
// Record metrics
const size = JSON.stringify(bucket.data).length;
await tx.execute(
`UPDATE local_bucket_data SET
download_size = IFNULL(download_size, 0) + ?,
last_op = ?,
downloading = ?,
downloaded_operations = IFNULL(downloaded_operations, 0) + ?
WHERE id = ?`,
[size, bucket.next_after, bucket.has_more, bucket.data.length, bucket.bucket]
);
}
});

await this.schemaManager.updateFromOperations(batch);
}
}

private async trackCheckpoint(checkpoint: Checkpoint) {
await this.rdb.writeTransaction(async (tx) => {
for (const bucket of checkpoint.buckets) {
await tx.execute(
`INSERT OR REPLACE INTO local_bucket_data(id, total_operations, last_op, download_size, downloading, downloaded_operations)
VALUES (
?,
?,
IFNULL((SELECT last_op FROM local_bucket_data WHERE id = ?), '0'),
IFNULL((SELECT download_size FROM local_bucket_data WHERE id = ?), 0),
IFNULL((SELECT downloading FROM local_bucket_data WHERE id = ?), TRUE),
IFNULL((SELECT downloaded_operations FROM local_bucket_data WHERE id = ?), TRUE)
)`,
[bucket.bucket, bucket.count, bucket.bucket, bucket.bucket, bucket.bucket, bucket.bucket]
);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AbstractPowerSyncDatabase, PowerSyncBackendConnector } from '@powersync/web';
import { connect } from './ConnectionManager';
import { LoginDetailsFormValues } from '@/components/widgets/LoginDetailsWidget';

export interface Credentials {
token: string;
Expand All @@ -21,11 +22,12 @@ export class TokenConnector implements PowerSyncBackendConnector {
await tx?.complete();
}

async signIn(credentials: Credentials) {
async signIn(credentials: LoginDetailsFormValues) {
validateSecureContext(credentials.endpoint);
checkJWT(credentials.token);
try {
localStorage.setItem('powersync_credentials', JSON.stringify(credentials));
localStorage.setItem('preferred_client_implementation', credentials.clientImplementation);
await connect();
} catch (e) {
this.clearCredentials();
Expand All @@ -39,6 +41,7 @@ export class TokenConnector implements PowerSyncBackendConnector {

clearCredentials() {
localStorage.removeItem('powersync_credentials');
localStorage.removeItem('preferred_client_implementation');
}
}

Expand Down