Skip to content

Commit af641ea

Browse files
committed
Reconnect when sync params change.
1 parent c25dd87 commit af641ea

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

packages/powersync/lib/src/web/sync_worker.dart

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ class _SyncWorker {
4848
String? syncParamsEncoded,
4949
_ConnectedClient client) {
5050
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
51-
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs,
52-
syncParamsEncoded == null ? null : jsonDecode(syncParamsEncoded));
51+
return _SyncRunner(databaseIdentifier);
5352
})
54-
..registerClient(client);
53+
..registerClient(client, crudThrottleTimeMs, syncParamsEncoded);
5554
}
5655
}
5756

@@ -110,8 +109,8 @@ class _ConnectedClient {
110109

111110
class _SyncRunner {
112111
final String identifier;
113-
final int crudThrottleTimeMs;
114-
final Map<String, dynamic>? syncParams;
112+
int crudThrottleTimeMs = 1;
113+
String? syncParamsEncoded;
115114

116115
final StreamGroup<_RunnerEvent> _group = StreamGroup();
117116
final StreamController<_RunnerEvent> _mainEvents = StreamController();
@@ -120,17 +119,35 @@ class _SyncRunner {
120119
_ConnectedClient? databaseHost;
121120
final connections = <_ConnectedClient>[];
122121

123-
_SyncRunner(this.identifier, this.crudThrottleTimeMs, this.syncParams) {
122+
_SyncRunner(this.identifier) {
124123
_group.add(_mainEvents.stream);
125124

126125
Future(() async {
127126
await for (final event in _group.stream) {
128127
try {
129128
switch (event) {
130-
case _AddConnection(:final client):
129+
case _AddConnection(
130+
:final client,
131+
:final crudThrottleTimeMs,
132+
:final syncParamsEncoded
133+
):
131134
connections.add(client);
135+
var reconnect = false;
136+
if (this.crudThrottleTimeMs != crudThrottleTimeMs) {
137+
this.crudThrottleTimeMs = crudThrottleTimeMs;
138+
reconnect = true;
139+
}
140+
if (this.syncParamsEncoded != syncParamsEncoded) {
141+
this.syncParamsEncoded = syncParamsEncoded;
142+
reconnect = true;
143+
}
132144
if (sync == null) {
133145
await _requestDatabase(client);
146+
} else if (reconnect) {
147+
// Parameters changed - reconnect.
148+
sync?.abort();
149+
sync = null;
150+
await _requestDatabase(client);
134151
}
135152
case _RemoveConnection(:final client):
136153
connections.remove(client);
@@ -232,6 +249,10 @@ class _SyncRunner {
232249
);
233250
}
234251

252+
final syncParams = syncParamsEncoded == null
253+
? null
254+
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;
255+
235256
sync = StreamingSyncImplementation(
236257
adapter: BucketStorage(database),
237258
credentialsCallback: client.channel.credentialsCallback,
@@ -252,8 +273,10 @@ class _SyncRunner {
252273
sync!.streamingSync();
253274
}
254275

255-
void registerClient(_ConnectedClient client) {
256-
_mainEvents.add(_AddConnection(client));
276+
void registerClient(_ConnectedClient client, int currentCrudThrottleTimeMs,
277+
String? currentSyncParamsEncoded) {
278+
_mainEvents.add(_AddConnection(
279+
client, currentCrudThrottleTimeMs, currentSyncParamsEncoded));
257280
}
258281

259282
void unregisterClient(_ConnectedClient client) {
@@ -265,8 +288,10 @@ sealed class _RunnerEvent {}
265288

266289
final class _AddConnection implements _RunnerEvent {
267290
final _ConnectedClient client;
291+
final int crudThrottleTimeMs;
292+
final String? syncParamsEncoded;
268293

269-
_AddConnection(this.client);
294+
_AddConnection(this.client, this.crudThrottleTimeMs, this.syncParamsEncoded);
270295
}
271296

272297
final class _RemoveConnection implements _RunnerEvent {

0 commit comments

Comments
 (0)