Skip to content

Commit 578be54

Browse files
committed
Merge branch 'raw_tables_skilldevs' into example-raw-tables
2 parents a7a9d77 + 58de160 commit 578be54

File tree

9 files changed

+58
-35
lines changed

9 files changed

+58
-35
lines changed

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23
import 'dart:isolate';
34
import 'package:meta/meta.dart';
45

@@ -266,7 +267,7 @@ class PowerSyncDatabaseImpl
266267
options,
267268
crudMutex.shared,
268269
syncMutex.shared,
269-
schema,
270+
jsonEncode(schema),
270271
),
271272
debugName: 'Sync ${database.openFactory.path}',
272273
onError: receiveUnhandledErrors.sendPort,
@@ -310,15 +311,15 @@ class _PowerSyncDatabaseIsolateArgs {
310311
final ResolvedSyncOptions options;
311312
final SerializedMutex crudMutex;
312313
final SerializedMutex syncMutex;
313-
final Schema schema;
314+
final String schemaJson;
314315

315316
_PowerSyncDatabaseIsolateArgs(
316317
this.sPort,
317318
this.dbRef,
318319
this.options,
319320
this.crudMutex,
320321
this.syncMutex,
321-
this.schema,
322+
this.schemaJson,
322323
);
323324
}
324325

@@ -414,7 +415,7 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
414415
final storage = BucketStorage(connection);
415416
final sync = StreamingSyncImplementation(
416417
adapter: storage,
417-
schema: args.schema,
418+
schemaJson: args.schemaJson,
418419
connector: InternalConnector(
419420
getCredentialsCached: getCredentialsCached,
420421
prefetchCredentials: prefetchCredentials,

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
134134
await _afterSchemaReady();
135135
}
136136

137-
void _assertSchemaIsReady() {
137+
void _checkSchemaIsReady() {
138138
if (!manualSchemaManagement || _manualSchemaManagementCompleted) {
139139
return;
140140
}
141141

142-
throw AssertionError(
142+
throw StateError(
143143
'In manual schema management mode, you need to mark the powersync database as ready');
144144
}
145145

@@ -319,7 +319,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
319319
// the lock for the connection.
320320
await initialize();
321321

322-
_assertSchemaIsReady();
322+
_checkSchemaIsReady();
323323

324324
final resolvedOptions = ResolvedSyncOptions.resolve(
325325
options,
@@ -484,15 +484,15 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
484484
/// Get an unique id for this client.
485485
/// This id is only reset when the database is deleted.
486486
Future<String> getClientId() async {
487-
_assertSchemaIsReady(); // TODO(skilldevs): Needed?
487+
_checkSchemaIsReady(); // TODO(skilldevs): Needed?
488488
final row = await get('SELECT powersync_client_id() as client_id');
489489
return row['client_id'] as String;
490490
}
491491

492492
/// Get upload queue size estimate and count.
493493
Future<UploadQueueStats> getUploadQueueStats(
494494
{bool includeSize = false}) async {
495-
_assertSchemaIsReady();
495+
_checkSchemaIsReady();
496496
if (includeSize) {
497497
final row = await getOptional(
498498
'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');
@@ -520,7 +520,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
520520
/// data by transaction. One batch may contain data from multiple transactions,
521521
/// and a single transaction may be split over multiple batches.
522522
Future<CrudBatch?> getCrudBatch({int limit = 100}) async {
523-
_assertSchemaIsReady();
523+
_checkSchemaIsReady();
524524
final rows = await getAll(
525525
'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?',
526526
[limit + 1]);
@@ -567,7 +567,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
567567
/// Unlike [getCrudBatch], this only returns data from a single transaction at a time.
568568
/// All data for the transaction is loaded into memory.
569569
Future<CrudTransaction?> getNextCrudTransaction() async {
570-
_assertSchemaIsReady();
570+
_checkSchemaIsReady();
571571
return await readTransaction((tx) async {
572572
final first = await tx.getOptional(
573573
'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23
import 'package:meta/meta.dart';
34
import 'package:http/browser_client.dart';
45
import 'package:logging/logging.dart';
@@ -159,7 +160,7 @@ class PowerSyncDatabaseImpl
159160

160161
sync = StreamingSyncImplementation(
161162
adapter: storage,
162-
schema: schema,
163+
schemaJson: jsonEncode(schema),
163164
connector: InternalConnector.wrap(connector, this),
164165
crudUpdateTriggerStream: crudStream,
165166
options: options,

packages/powersync_core/lib/src/schema.dart

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,8 @@ class Column {
316316
Map<String, dynamic> toJson() => {'name': name, 'type': type.sqlite};
317317
}
318318

319-
class RawTable {
320-
final String
321-
name; // TODO: it does not need to be the same name as the raw table
319+
final class RawTable {
320+
final String name;
322321
final PendingStatement put;
323322
final PendingStatement delete;
324323

@@ -335,7 +334,7 @@ class RawTable {
335334
};
336335
}
337336

338-
class PendingStatement {
337+
final class PendingStatement {
339338
final String sql;
340339
final List<PendingStatementValue> params;
341340

@@ -348,12 +347,15 @@ class PendingStatement {
348347
}
349348

350349
sealed class PendingStatementValue {
350+
factory PendingStatementValue.id() = _PendingStmtValueId;
351+
factory PendingStatementValue.column(String column) = _PendingStmtValueColumn;
352+
351353
dynamic toJson();
352354
}
353355

354-
class PendingStmtValueColumn extends PendingStatementValue {
356+
class _PendingStmtValueColumn implements PendingStatementValue {
355357
final String column;
356-
PendingStmtValueColumn(this.column);
358+
const _PendingStmtValueColumn(this.column);
357359

358360
@override
359361
dynamic toJson() {
@@ -363,7 +365,9 @@ class PendingStmtValueColumn extends PendingStatementValue {
363365
}
364366
}
365367

366-
class PendingStmtValueId extends PendingStatementValue {
368+
class _PendingStmtValueId implements PendingStatementValue {
369+
const _PendingStmtValueId();
370+
367371
@override
368372
dynamic toJson() {
369373
return 'Id';

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import 'dart:typed_data';
55
import 'package:http/http.dart' as http;
66
import 'package:logging/logging.dart';
77
import 'package:meta/meta.dart';
8-
import 'package:powersync_core/powersync_core.dart';
98
import 'package:powersync_core/src/abort_controller.dart';
109
import 'package:powersync_core/src/exceptions.dart';
1110
import 'package:powersync_core/src/log_internal.dart';
@@ -33,7 +32,7 @@ abstract interface class StreamingSync {
3332

3433
@internal
3534
class StreamingSyncImplementation implements StreamingSync {
36-
final Schema? schema; //TODO(SkillDevs): pass in all implementations
35+
final String schemaJson;
3736
final BucketStorage adapter;
3837
final InternalConnector connector;
3938
final ResolvedSyncOptions options;
@@ -64,7 +63,7 @@ class StreamingSyncImplementation implements StreamingSync {
6463
String? clientId;
6564

6665
StreamingSyncImplementation({
67-
required this.schema,
66+
required this.schemaJson,
6867
required this.adapter,
6968
required this.connector,
7069
required this.crudUpdateTriggerStream,
@@ -601,7 +600,7 @@ final class _ActiveRustStreamingIteration {
601600
'start',
602601
convert.json.encode({
603602
'parameters': sync.options.params,
604-
'schema': sync.schema,
603+
'schema': convert.json.decode(sync.schemaJson),
605604
}),
606605
);
607606
assert(_completedStream.isCompleted, 'Should have started streaming');

packages/powersync_core/lib/src/web/sync_controller.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ class SyncWorkerHandle implements StreamingSync {
113113
@override
114114
Future<void> streamingSync() async {
115115
await _channel.startSynchronization(
116-
database.database.openFactory.path, ResolvedSyncOptions(options));
116+
database.database.openFactory.path,
117+
ResolvedSyncOptions(options),
118+
database.schema,
119+
);
117120
}
118121
}

packages/powersync_core/lib/src/web/sync_worker.dart

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,16 @@ class _SyncWorker {
4545
});
4646
}
4747

48-
_SyncRunner referenceSyncTask(
49-
String databaseIdentifier, SyncOptions options, _ConnectedClient client) {
48+
_SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options,
49+
String schemaJson, _ConnectedClient client) {
5050
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
5151
return _SyncRunner(databaseIdentifier);
5252
})
53-
..registerClient(client, options);
53+
..registerClient(
54+
client,
55+
options,
56+
schemaJson,
57+
);
5458
}
5559
}
5660

@@ -86,8 +90,8 @@ class _ConnectedClient {
8690
},
8791
);
8892

89-
_runner = _worker.referenceSyncTask(
90-
request.databaseName, recoveredOptions, this);
93+
_runner = _worker.referenceSyncTask(request.databaseName,
94+
recoveredOptions, request.schemaJson, this);
9195
return (JSObject(), null);
9296
case SyncWorkerMessageType.abortSynchronization:
9397
_runner?.disconnectClient(this);
@@ -128,6 +132,7 @@ class _ConnectedClient {
128132
class _SyncRunner {
129133
final String identifier;
130134
ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions());
135+
String schemaJson = '{}';
131136

132137
final StreamGroup<_RunnerEvent> _group = StreamGroup();
133138
final StreamController<_RunnerEvent> _mainEvents = StreamController();
@@ -146,10 +151,12 @@ class _SyncRunner {
146151
case _AddConnection(
147152
:final client,
148153
:final options,
154+
:final schemaJson,
149155
):
150156
connections.add(client);
151157
final (newOptions, reconnect) = this.options.applyFrom(options);
152158
this.options = newOptions;
159+
this.schemaJson = schemaJson;
153160

154161
if (sync == null) {
155162
await _requestDatabase(client);
@@ -264,7 +271,7 @@ class _SyncRunner {
264271

265272
sync = StreamingSyncImplementation(
266273
adapter: WebBucketStorage(database),
267-
schema: null,
274+
schemaJson: client._runner!.schemaJson,
268275
connector: InternalConnector(
269276
getCredentialsCached: client.channel.credentialsCallback,
270277
prefetchCredentials: ({required bool invalidate}) async {
@@ -287,8 +294,9 @@ class _SyncRunner {
287294
sync!.streamingSync();
288295
}
289296

290-
void registerClient(_ConnectedClient client, SyncOptions options) {
291-
_mainEvents.add(_AddConnection(client, options));
297+
void registerClient(
298+
_ConnectedClient client, SyncOptions options, String schemaJson) {
299+
_mainEvents.add(_AddConnection(client, options, schemaJson));
292300
}
293301

294302
/// Remove a client, disconnecting if no clients remain..
@@ -307,8 +315,9 @@ sealed class _RunnerEvent {}
307315
final class _AddConnection implements _RunnerEvent {
308316
final _ConnectedClient client;
309317
final SyncOptions options;
318+
final String schemaJson;
310319

311-
_AddConnection(this.client, this.options);
320+
_AddConnection(this.client, this.options, this.schemaJson);
312321
}
313322

314323
final class _RemoveConnection implements _RunnerEvent {

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:convert';
33
import 'dart:js_interop';
44

55
import 'package:logging/logging.dart';
6+
import 'package:powersync_core/src/schema.dart';
67
import 'package:powersync_core/src/sync/options.dart';
78
import 'package:web/web.dart';
89

@@ -71,6 +72,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
7172
required int requestId,
7273
required int retryDelayMs,
7374
required String implementationName,
75+
required String schemaJson,
7476
String? syncParamsEncoded,
7577
});
7678

@@ -79,6 +81,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
7981
external int get crudThrottleTimeMs;
8082
external int? get retryDelayMs;
8183
external String? get implementationName;
84+
external String get schemaJson;
8285
external String? get syncParamsEncoded;
8386
}
8487

@@ -410,7 +413,7 @@ final class WorkerCommunicationChannel {
410413
}
411414

412415
Future<void> startSynchronization(
413-
String databaseName, ResolvedSyncOptions options) async {
416+
String databaseName, ResolvedSyncOptions options, Schema schema) async {
414417
final (id, completion) = _newRequest();
415418
port.postMessage(SyncWorkerMessage(
416419
type: SyncWorkerMessageType.startSynchronization.name,
@@ -420,6 +423,7 @@ final class WorkerCommunicationChannel {
420423
retryDelayMs: options.retryDelay.inMilliseconds,
421424
requestId: id,
422425
implementationName: options.source.syncImplementation.name,
426+
schemaJson: jsonEncode(schema),
423427
syncParamsEncoded: switch (options.source.params) {
424428
null => null,
425429
final params => jsonEncode(params),

packages/powersync_core/test/utils/abstract_test_utils.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:convert';
2+
13
import 'package:http/http.dart';
24
import 'package:logging/logging.dart';
35
import 'package:powersync_core/powersync_core.dart';
@@ -154,7 +156,7 @@ extension MockSync on PowerSyncDatabase {
154156
}) {
155157
final impl = StreamingSyncImplementation(
156158
adapter: BucketStorage(this),
157-
schema: schema,
159+
schemaJson: jsonEncode(schema),
158160
client: client,
159161
options: ResolvedSyncOptions(options),
160162
connector: InternalConnector.wrap(connector, this),

0 commit comments

Comments
 (0)