Skip to content

Commit 79ce6a5

Browse files
committed
Merge remote-tracking branch 'origin/main' into improve-logging
2 parents 1312157 + 1c42a69 commit 79ce6a5

File tree

12 files changed

+446
-29
lines changed

12 files changed

+446
-29
lines changed

.changeset/bright-snakes-clean.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@powersync/common': minor
3+
'@powersync/node': minor
4+
'@powersync/web': minor
5+
'@powersync/react-native': minor
6+
---
7+
8+
Add experimental support for raw tables, giving you full control over the table structure to sync into.
9+
While PowerSync manages tables as JSON views by default, raw tables have to be created by the application
10+
developer.
11+
12+
For more information about raw tables, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables).

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { CrudTransaction } from './sync/bucket/CrudTransaction.js';
2727
import {
2828
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
2929
DEFAULT_RETRY_DELAY_MS,
30+
InternalConnectionOptions,
3031
StreamingSyncImplementation,
3132
StreamingSyncImplementationListener,
3233
type AdditionalConnectionOptions,
@@ -461,7 +462,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
461462
* Connects to stream of events from the PowerSync instance.
462463
*/
463464
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
464-
return this.connectionManager.connect(connector, options);
465+
const resolvedOptions: InternalConnectionOptions = options ?? {};
466+
resolvedOptions.serializedSchema = this.schema.toJSON();
467+
468+
return this.connectionManager.connect(connector, resolvedOptions);
465469
}
466470

467471
/**

packages/common/src/client/ConnectionManager.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { ILogger } from 'js-logger';
22
import { BaseListener, BaseObserver } from '../utils/BaseObserver.js';
33
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
44
import {
5-
PowerSyncConnectionOptions,
5+
InternalConnectionOptions,
66
StreamingSyncImplementation
77
} from './sync/stream/AbstractStreamingSyncImplementation.js';
88

@@ -24,14 +24,14 @@ export interface ConnectionManagerSyncImplementationResult {
2424
export interface ConnectionManagerOptions {
2525
createSyncImplementation(
2626
connector: PowerSyncBackendConnector,
27-
options: PowerSyncConnectionOptions
27+
options: InternalConnectionOptions
2828
): Promise<ConnectionManagerSyncImplementationResult>;
2929
logger: ILogger;
3030
}
3131

3232
type StoredConnectionOptions = {
3333
connector: PowerSyncBackendConnector;
34-
options: PowerSyncConnectionOptions;
34+
options: InternalConnectionOptions;
3535
};
3636

3737
/**
@@ -95,7 +95,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
9595
await this.syncDisposer?.();
9696
}
9797

98-
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
98+
async connect(connector: PowerSyncBackendConnector, options: InternalConnectionOptions) {
9999
// Keep track if there were pending operations before this call
100100
const hadPendingOptions = !!this.pendingConnectionOptions;
101101

@@ -140,7 +140,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
140140
}
141141

142142
protected async connectInternal() {
143-
let appliedOptions: PowerSyncConnectionOptions | null = null;
143+
let appliedOptions: InternalConnectionOptions | null = null;
144144

145145
// This method ensures a disconnect before any connection attempt
146146
await this.disconnectInternal();

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ export interface StreamingSyncImplementationListener extends BaseListener {
123123
* Configurable options to be used when connecting to the PowerSync
124124
* backend instance.
125125
*/
126-
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}
126+
export type PowerSyncConnectionOptions = Omit<InternalConnectionOptions, 'serializedSchema'>;
127+
128+
export interface InternalConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}
127129

128130
/** @internal */
129131
export interface BaseConnectionOptions {
@@ -152,6 +154,11 @@ export interface BaseConnectionOptions {
152154
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
153155
*/
154156
params?: Record<string, StreamingSyncRequestParameterType>;
157+
158+
/**
159+
* The serialized schema - mainly used to forward information about raw tables to the sync client.
160+
*/
161+
serializedSchema?: any;
155162
}
156163

157164
/** @internal */
@@ -176,7 +183,7 @@ export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncI
176183
/**
177184
* Connects to the sync service
178185
*/
179-
connect(options?: PowerSyncConnectionOptions): Promise<void>;
186+
connect(options?: InternalConnectionOptions): Promise<void>;
180187
/**
181188
* Disconnects from the sync services.
182189
* @throws if not connected or if abort is not controlled internally
@@ -208,7 +215,8 @@ export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptio
208215
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
209216
clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION,
210217
fetchStrategy: FetchStrategy.Buffered,
211-
params: {}
218+
params: {},
219+
serializedSchema: undefined
212220
};
213221

214222
// The priority we assume when we receive checkpoint lines where no priority is set.
@@ -621,6 +629,10 @@ The next upload iteration will be delayed.`);
621629
}
622630

623631
private async legacyStreamingSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) {
632+
if (resolvedOptions.serializedSchema?.raw_tables != null) {
633+
this.logger.warn('Raw tables require the Rust-based sync client. The JS client will ignore them.');
634+
}
635+
624636
this.logger.debug('Streaming sync iteration started');
625637
this.options.adapter.startSession();
626638
let [req, bucketMap] = await this.collectLocalBucketState();
@@ -714,6 +726,8 @@ The next upload iteration will be delayed.`);
714726

715727
if (isStreamingSyncCheckpoint(line)) {
716728
targetCheckpoint = line.checkpoint;
729+
// New checkpoint - existing validated checkpoint is no longer valid
730+
pendingValidatedCheckpoint = null;
717731
const bucketsToDelete = new Set<string>(bucketMap.keys());
718732
const newBuckets = new Map<string, BucketDescription>();
719733
for (const checksum of line.checkpoint.buckets) {
@@ -735,7 +749,13 @@ The next upload iteration will be delayed.`);
735749
if (result.endIteration) {
736750
return;
737751
} else if (!result.applied) {
752+
// "Could not apply checkpoint due to local data". We need to retry after
753+
// finishing uploads.
738754
pendingValidatedCheckpoint = targetCheckpoint;
755+
} else {
756+
// Nothing to retry later. This would likely already be null from the last
757+
// checksum or checksum_diff operation, but we make sure.
758+
pendingValidatedCheckpoint = null;
739759
}
740760
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
741761
const priority = line.partial_checkpoint_complete.priority;
@@ -771,6 +791,8 @@ The next upload iteration will be delayed.`);
771791
if (targetCheckpoint == null) {
772792
throw new Error('Checkpoint diff without previous checkpoint');
773793
}
794+
// New checkpoint - existing validated checkpoint is no longer valid
795+
pendingValidatedCheckpoint = null;
774796
const diff = line.checkpoint_diff;
775797
const newBuckets = new Map<string, BucketChecksum>();
776798
for (const checksum of targetCheckpoint.buckets) {
@@ -1025,12 +1047,12 @@ The next upload iteration will be delayed.`);
10251047
}
10261048

10271049
try {
1028-
await control(
1029-
PowerSyncControlCommand.START,
1030-
JSON.stringify({
1031-
parameters: resolvedOptions.params
1032-
})
1033-
);
1050+
const options: any = { parameters: resolvedOptions.params };
1051+
if (resolvedOptions.serializedSchema) {
1052+
options.schema = resolvedOptions.serializedSchema;
1053+
}
1054+
1055+
await control(PowerSyncControlCommand.START, JSON.stringify(options));
10341056

10351057
this.notifyCompletedUploads = () => {
10361058
controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED });
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* A pending variant of a {@link RawTable} that doesn't have a name (because it would be inferred when creating the
3+
* schema).
4+
*/
5+
export type RawTableType = {
6+
/**
7+
* The statement to run when PowerSync detects that a row needs to be inserted or updated.
8+
*/
9+
put: PendingStatement;
10+
/**
11+
* The statement to run when PowerSync detects that a row needs to be deleted.
12+
*/
13+
delete: PendingStatement;
14+
};
15+
16+
/**
17+
* A parameter to use as part of {@link PendingStatement}.
18+
*
19+
* For delete statements, only the `"Id"` value is supported - the sync client will replace it with the id of the row to
20+
* be synced.
21+
*
22+
* For insert and replace operations, the values of columns in the table are available as parameters through
23+
* `{Column: 'name'}`.
24+
*/
25+
export type PendingStatementParameter = 'Id' | { Column: string };
26+
27+
/**
28+
* A statement that the PowerSync client should use to insert or delete data into a table managed by the user.
29+
*/
30+
export type PendingStatement = {
31+
sql: string;
32+
params: PendingStatementParameter[];
33+
};
34+
35+
/**
36+
* Instructs PowerSync to sync data into a "raw" table.
37+
*
38+
* Since raw tables are not backed by JSON, running complex queries on them may be more efficient. Further, they allow
39+
* using client-side table and column constraints.
40+
*
41+
* To collect local writes to raw tables with PowerSync, custom triggers are required. See
42+
* {@link https://docs.powersync.com/usage/use-case-examples/raw-tables the documentation} for details and an example on
43+
* using raw tables.
44+
*
45+
* Note that raw tables are only supported when using the new `SyncClientImplementation.rust` sync client.
46+
*
47+
* @experimental Please note that this feature is experimental at the moment, and not covered by PowerSync semver or
48+
* stability guarantees.
49+
*/
50+
export class RawTable implements RawTableType {
51+
/**
52+
* The name of the table.
53+
*
54+
* This does not have to match the actual table name in the schema - {@link put} and {@link delete} are free to use
55+
* another table. Instead, this name is used by the sync client to recognize that operations on this table (as it
56+
* appears in the source / backend database) are to be handled specially.
57+
*/
58+
name: string;
59+
put: PendingStatement;
60+
delete: PendingStatement;
61+
62+
constructor(name: string, type: RawTableType) {
63+
this.name = name;
64+
this.put = type.put;
65+
this.delete = type.delete;
66+
}
67+
}

packages/common/src/db/schema/Schema.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { RawTable, RawTableType } from './RawTable.js';
12
import { RowType, Table } from './Table.js';
23

34
type SchemaType = Record<string, Table<any>>;
@@ -16,6 +17,7 @@ export class Schema<S extends SchemaType = SchemaType> {
1617
readonly types: SchemaTableType<S>;
1718
readonly props: S;
1819
readonly tables: Table[];
20+
readonly rawTables: RawTable[];
1921

2022
constructor(tables: Table[] | S) {
2123
if (Array.isArray(tables)) {
@@ -36,6 +38,24 @@ export class Schema<S extends SchemaType = SchemaType> {
3638
this.props = tables as S;
3739
this.tables = this.convertToClassicTables(this.props);
3840
}
41+
42+
this.rawTables = [];
43+
}
44+
45+
/**
46+
* Adds raw tables to this schema. Raw tables are identified by their name, but entirely managed by the application
47+
* developer instead of automatically by PowerSync.
48+
* Since raw tables are not backed by JSON, running complex queries on them may be more efficient. Further, they allow
49+
* using client-side table and column constraints.
50+
* Note that raw tables are only supported when using the new `SyncClientImplementation.rust` sync client.
51+
*
52+
* @param tables An object of (table name, raw table definition) entries.
53+
* @experimental Note that the raw tables API is still experimental and may change in the future.
54+
*/
55+
withRawTables(tables: Record<string, RawTableType>) {
56+
for (const [name, rawTableDefinition] of Object.entries(tables)) {
57+
this.rawTables.push(new RawTable(name, rawTableDefinition));
58+
}
3959
}
4060

4161
validate() {
@@ -47,7 +67,8 @@ export class Schema<S extends SchemaType = SchemaType> {
4767
toJSON() {
4868
return {
4969
// This is required because "name" field is not present in TableV2
50-
tables: this.tables.map((t) => t.toJSON())
70+
tables: this.tables.map((t) => t.toJSON()),
71+
raw_tables: this.rawTables
5172
};
5273
}
5374

packages/common/tests/db/schema/Schema.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ describe('Schema', () => {
8080
content: column.text
8181
})
8282
});
83+
schema.withRawTables({
84+
lists: {
85+
put: {
86+
sql: 'SELECT 1',
87+
params: [{ Column: 'foo' }]
88+
},
89+
delete: {
90+
sql: 'SELECT 2',
91+
params: ['Id']
92+
}
93+
}
94+
});
8395

8496
const json = schema.toJSON();
8597

@@ -115,6 +127,19 @@ describe('Schema', () => {
115127
],
116128
indexes: []
117129
}
130+
],
131+
raw_tables: [
132+
{
133+
name: 'lists',
134+
delete: {
135+
sql: 'SELECT 2',
136+
params: ['Id']
137+
},
138+
put: {
139+
sql: 'SELECT 1',
140+
params: [{ Column: 'foo' }]
141+
}
142+
}
118143
]
119144
});
120145
});

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
7878
connector: PowerSyncBackendConnector,
7979
options: NodeAdditionalConnectionOptions
8080
): AbstractStreamingSyncImplementation {
81-
const remote = new NodeRemote(connector, this.options.logger, {
81+
const logger = this.options.logger;
82+
const remote = new NodeRemote(connector, logger, {
8283
dispatcher: options.dispatcher,
8384
...(this.options as NodePowerSyncDatabaseOptions).remoteOptions
8485
});
@@ -93,7 +94,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
9394
retryDelayMs: this.options.retryDelayMs,
9495
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
9596
identifier: this.database.name,
96-
logger: this.logger
97+
logger
9798
});
9899
}
99100
}

0 commit comments

Comments
 (0)