Skip to content

Commit c25dd87

Browse files
committed
Web: pass on sync parameters.
1 parent 80de755 commit c25dd87

File tree

4 files changed

+65
-43
lines changed

4 files changed

+65
-43
lines changed

packages/powersync/lib/src/database/web/web_powersync_database.dart

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,11 @@ class PowerSyncDatabaseImpl
148148
// duplicating work across tabs.
149149
try {
150150
sync = await SyncWorkerHandle.start(
151-
this,
152-
connector,
153-
crudThrottleTime.inMilliseconds,
154-
Uri.base.resolve('/powersync_sync.worker.js'),
155-
);
151+
database: this,
152+
connector: connector,
153+
crudThrottleTimeMs: crudThrottleTime.inMilliseconds,
154+
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
155+
syncParams: params);
156156
} catch (e) {
157157
logger.warning(
158158
'Could not use shared worker for synchronization, falling back to locks.',

packages/powersync/lib/src/web/sync_controller.dart

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,29 @@ import '../streaming_sync.dart';
1010
import 'sync_worker_protocol.dart';
1111

1212
class SyncWorkerHandle implements StreamingSync {
13-
final PowerSyncDatabaseImpl _database;
14-
final PowerSyncBackendConnector _connector;
15-
final int _crudThrottleTimeMs;
13+
final PowerSyncDatabaseImpl database;
14+
final PowerSyncBackendConnector connector;
15+
final int crudThrottleTimeMs;
16+
final Map<String, dynamic>? syncParams;
1617

1718
late final WorkerCommunicationChannel _channel;
1819

1920
final StreamController<SyncStatus> _status = StreamController.broadcast();
2021

21-
SyncWorkerHandle._(this._database, this._connector, this._crudThrottleTimeMs,
22-
MessagePort sendToWorker, SharedWorker worker) {
22+
SyncWorkerHandle._(
23+
{required this.database,
24+
required this.connector,
25+
required this.crudThrottleTimeMs,
26+
required MessagePort sendToWorker,
27+
required SharedWorker worker,
28+
this.syncParams}) {
2329
_channel = WorkerCommunicationChannel(
2430
port: sendToWorker,
2531
errors: EventStreamProviders.errorEvent.forTarget(worker),
2632
requestHandler: (type, payload) async {
2733
switch (type) {
2834
case SyncWorkerMessageType.requestEndpoint:
29-
final endpoint = await (_database.database as WebSqliteConnection)
35+
final endpoint = await (database.database as WebSqliteConnection)
3036
.exposeEndpoint();
3137

3238
return (
@@ -38,18 +44,18 @@ class SyncWorkerHandle implements StreamingSync {
3844
[endpoint.connectPort].toJS
3945
);
4046
case SyncWorkerMessageType.uploadCrud:
41-
await _connector.uploadData(_database);
47+
await connector.uploadData(database);
4248
return (JSObject(), null);
4349
case SyncWorkerMessageType.invalidCredentialsCallback:
44-
final credentials = await _connector.fetchCredentials();
50+
final credentials = await connector.fetchCredentials();
4551
return (
4652
credentials != null
4753
? SerializedCredentials.from(credentials)
4854
: null,
4955
null
5056
);
5157
case SyncWorkerMessageType.credentialsCallback:
52-
final credentials = await _connector.getCredentialsCached();
58+
final credentials = await connector.getCredentialsCached();
5359
return (
5460
credentials != null
5561
? SerializedCredentials.from(credentials)
@@ -71,13 +77,19 @@ class SyncWorkerHandle implements StreamingSync {
7177
}
7278

7379
static Future<SyncWorkerHandle> start(
74-
PowerSyncDatabaseImpl database,
75-
PowerSyncBackendConnector connector,
76-
int crudThrottleTimeMs,
77-
Uri workerUri) async {
80+
{required PowerSyncDatabaseImpl database,
81+
required PowerSyncBackendConnector connector,
82+
required int crudThrottleTimeMs,
83+
required Uri workerUri,
84+
Map<String, dynamic>? syncParams}) async {
7885
final worker = SharedWorker(workerUri.toString().toJS);
7986
final handle = SyncWorkerHandle._(
80-
database, connector, crudThrottleTimeMs, worker.port, worker);
87+
database: database,
88+
connector: connector,
89+
crudThrottleTimeMs: crudThrottleTimeMs,
90+
sendToWorker: worker.port,
91+
worker: worker,
92+
syncParams: syncParams);
8193

8294
// Make sure that the worker is working, or throw immediately.
8395
await handle._channel.ping();
@@ -101,6 +113,6 @@ class SyncWorkerHandle implements StreamingSync {
101113
@override
102114
Future<void> streamingSync() async {
103115
await _channel.startSynchronization(
104-
_database.openFactory.path, _crudThrottleTimeMs);
116+
database.openFactory.path, crudThrottleTimeMs, syncParams);
105117
}
106118
}

packages/powersync/lib/src/web/sync_worker.dart

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
library;
55

66
import 'dart:async';
7+
import 'dart:convert';
78
import 'dart:js_interop';
89

910
import 'package:async/async.dart';
@@ -41,10 +42,14 @@ class _SyncWorker {
4142
});
4243
}
4344

44-
_SyncRunner referenceSyncTask(String databaseIdentifier,
45-
int crudThrottleTimeMs, _ConnectedClient client) {
45+
_SyncRunner referenceSyncTask(
46+
String databaseIdentifier,
47+
int crudThrottleTimeMs,
48+
String? syncParamsEncoded,
49+
_ConnectedClient client) {
4650
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
47-
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs);
51+
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs,
52+
syncParamsEncoded == null ? null : jsonDecode(syncParamsEncoded));
4853
})
4954
..registerClient(client);
5055
}
@@ -64,8 +69,8 @@ class _ConnectedClient {
6469
switch (type) {
6570
case SyncWorkerMessageType.startSynchronization:
6671
final request = payload as StartSynchronization;
67-
_runner = _worker.referenceSyncTask(
68-
request.databaseName, request.crudThrottleTimeMs, this);
72+
_runner = _worker.referenceSyncTask(request.databaseName,
73+
request.crudThrottleTimeMs, request.syncParamsEncoded, this);
6974
return (JSObject(), null);
7075
case SyncWorkerMessageType.abortSynchronization:
7176
_runner?.unregisterClient(this);
@@ -106,6 +111,7 @@ class _ConnectedClient {
106111
class _SyncRunner {
107112
final String identifier;
108113
final int crudThrottleTimeMs;
114+
final Map<String, dynamic>? syncParams;
109115

110116
final StreamGroup<_RunnerEvent> _group = StreamGroup();
111117
final StreamController<_RunnerEvent> _mainEvents = StreamController();
@@ -114,7 +120,7 @@ class _SyncRunner {
114120
_ConnectedClient? databaseHost;
115121
final connections = <_ConnectedClient>[];
116122

117-
_SyncRunner(this.identifier, this.crudThrottleTimeMs) {
123+
_SyncRunner(this.identifier, this.crudThrottleTimeMs, this.syncParams) {
118124
_group.add(_mainEvents.stream);
119125

120126
Future(() async {
@@ -227,15 +233,15 @@ class _SyncRunner {
227233
}
228234

229235
sync = StreamingSyncImplementation(
230-
adapter: BucketStorage(database),
231-
credentialsCallback: client.channel.credentialsCallback,
232-
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
233-
uploadCrud: client.channel.uploadCrud,
234-
crudUpdateTriggerStream: crudStream,
235-
retryDelay: Duration(seconds: 3),
236-
client: FetchClient(mode: RequestMode.cors),
237-
identifier: identifier,
238-
);
236+
adapter: BucketStorage(database),
237+
credentialsCallback: client.channel.credentialsCallback,
238+
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
239+
uploadCrud: client.channel.uploadCrud,
240+
crudUpdateTriggerStream: crudStream,
241+
retryDelay: Duration(seconds: 3),
242+
client: FetchClient(mode: RequestMode.cors),
243+
identifier: identifier,
244+
syncParameters: syncParams);
239245
sync!.statusStream.listen((event) {
240246
_logger.fine('Broadcasting sync event: $event');
241247
for (final client in connections) {

packages/powersync/lib/src/web/sync_worker_protocol.dart

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23
import 'dart:js_interop';
34

45
import 'package:web/web.dart';
@@ -60,15 +61,16 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject {
6061

6162
@anonymous
6263
extension type StartSynchronization._(JSObject _) implements JSObject {
63-
external factory StartSynchronization({
64-
required String databaseName,
65-
required int crudThrottleTimeMs,
66-
required int requestId,
67-
});
64+
external factory StartSynchronization(
65+
{required String databaseName,
66+
required int crudThrottleTimeMs,
67+
required int requestId,
68+
String? syncParamsEncoded});
6869

6970
external String get databaseName;
7071
external int get requestId;
7172
external int get crudThrottleTimeMs;
73+
external String? get syncParamsEncoded;
7274
}
7375

7476
@anonymous
@@ -315,15 +317,17 @@ final class WorkerCommunicationChannel {
315317
await _numericRequest(SyncWorkerMessageType.ping);
316318
}
317319

318-
Future<void> startSynchronization(
319-
String databaseName, int crudThrottleTimeMs) async {
320+
Future<void> startSynchronization(String databaseName, int crudThrottleTimeMs,
321+
Map<String, dynamic>? syncParams) async {
320322
final (id, completion) = _newRequest();
321323
port.postMessage(SyncWorkerMessage(
322324
type: SyncWorkerMessageType.startSynchronization.name,
323325
payload: StartSynchronization(
324326
databaseName: databaseName,
325327
crudThrottleTimeMs: crudThrottleTimeMs,
326-
requestId: id),
328+
requestId: id,
329+
syncParamsEncoded:
330+
syncParams == null ? null : jsonEncode(syncParams)),
327331
));
328332
await completion;
329333
}

0 commit comments

Comments
 (0)