Skip to content

Commit d0019a1

Browse files
committed
Use sealed classes for sync lines
1 parent 98ef589 commit d0019a1

File tree

3 files changed

+254
-262
lines changed

3 files changed

+254
-262
lines changed

packages/powersync_core/lib/src/bucket_storage.dart

Lines changed: 0 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -347,105 +347,6 @@ class SyncDataBatch {
347347
SyncDataBatch(this.buckets);
348348
}
349349

350-
class SyncBucketData {
351-
final String bucket;
352-
final List<OplogEntry> data;
353-
final bool hasMore;
354-
final String? after;
355-
final String? nextAfter;
356-
357-
const SyncBucketData(
358-
{required this.bucket,
359-
required this.data,
360-
this.hasMore = false,
361-
this.after,
362-
this.nextAfter});
363-
364-
SyncBucketData.fromJson(Map<String, dynamic> json)
365-
: bucket = json['bucket'],
366-
hasMore = json['has_more'] ?? false,
367-
after = json['after'],
368-
nextAfter = json['next_after'],
369-
data =
370-
(json['data'] as List).map((e) => OplogEntry.fromJson(e)).toList();
371-
372-
Map<String, dynamic> toJson() {
373-
return {
374-
'bucket': bucket,
375-
'has_more': hasMore,
376-
'after': after,
377-
'next_after': nextAfter,
378-
'data': data
379-
};
380-
}
381-
}
382-
383-
class OplogEntry {
384-
final String opId;
385-
386-
final OpType? op;
387-
388-
/// rowType + rowId uniquely identifies an entry in the local database.
389-
final String? rowType;
390-
final String? rowId;
391-
392-
/// Together with rowType and rowId, this uniquely identifies a source entry
393-
/// per bucket in the oplog. There may be multiple source entries for a single
394-
/// "rowType + rowId" combination.
395-
final String? subkey;
396-
397-
final String? data;
398-
final int checksum;
399-
400-
const OplogEntry(
401-
{required this.opId,
402-
required this.op,
403-
this.subkey,
404-
this.rowType,
405-
this.rowId,
406-
this.data,
407-
required this.checksum});
408-
409-
OplogEntry.fromJson(Map<String, dynamic> json)
410-
: opId = json['op_id'],
411-
op = OpType.fromJson(json['op']),
412-
rowType = json['object_type'],
413-
rowId = json['object_id'],
414-
checksum = json['checksum'],
415-
data = json['data'] is String ? json['data'] : jsonEncode(json['data']),
416-
subkey = json['subkey'] is String ? json['subkey'] : null;
417-
418-
Map<String, dynamic>? get parsedData {
419-
return data == null ? null : jsonDecode(data!);
420-
}
421-
422-
/// Key to uniquely represent a source entry in a bucket.
423-
/// This is used to supersede old entries.
424-
/// Relevant for put and remove ops.
425-
String get key {
426-
return "$rowType/$rowId/$subkey";
427-
}
428-
429-
Map<String, dynamic> toJson() {
430-
return {
431-
'op_id': opId,
432-
'op': op?.toJson(),
433-
'object_type': rowType,
434-
'object_id': rowId,
435-
'checksum': checksum,
436-
'subkey': subkey,
437-
'data': data
438-
};
439-
}
440-
}
441-
442-
class SqliteOp {
443-
String sql;
444-
List<dynamic> args;
445-
446-
SqliteOp(this.sql, this.args);
447-
}
448-
449350
class SyncLocalDatabaseResult {
450351
final bool ready;
451352
final bool checkpointValid;
@@ -483,42 +384,3 @@ class ChecksumCache {
483384

484385
ChecksumCache(this.lastOpId, this.checksums);
485386
}
486-
487-
enum OpType {
488-
clear(1),
489-
move(2),
490-
put(3),
491-
remove(4);
492-
493-
final int value;
494-
495-
const OpType(this.value);
496-
497-
static OpType? fromJson(String json) {
498-
switch (json) {
499-
case 'CLEAR':
500-
return clear;
501-
case 'MOVE':
502-
return move;
503-
case 'PUT':
504-
return put;
505-
case 'REMOVE':
506-
return remove;
507-
default:
508-
return null;
509-
}
510-
}
511-
512-
String toJson() {
513-
switch (this) {
514-
case clear:
515-
return 'CLEAR';
516-
case move:
517-
return 'MOVE';
518-
case put:
519-
return 'PUT';
520-
case remove:
521-
return 'REMOVE';
522-
}
523-
}
524-
}

packages/powersync_core/lib/src/streaming_sync.dart

Lines changed: 95 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class StreamingSyncImplementation implements StreamingSync {
5353

5454
late final http.Client _client;
5555

56-
final StreamController<String?> _localPingController =
56+
final StreamController<Null> _localPingController =
5757
StreamController.broadcast();
5858

5959
final Duration retryDelay;
@@ -340,96 +340,19 @@ class StreamingSyncImplementation implements StreamingSync {
340340
}
341341

342342
_updateStatus(connected: true, connecting: false);
343-
if (line is Checkpoint) {
344-
targetCheckpoint = line;
345-
final Set<String> bucketsToDelete = {...bucketSet};
346-
final Set<String> newBuckets = {};
347-
for (final checksum in line.checksums) {
348-
newBuckets.add(checksum.bucket);
349-
bucketsToDelete.remove(checksum.bucket);
350-
}
351-
bucketSet = newBuckets;
352-
await adapter.removeBuckets([...bucketsToDelete]);
353-
_updateStatus(downloading: true);
354-
} else if (line is StreamingSyncCheckpointComplete) {
355-
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
356-
if (!result.checkpointValid) {
357-
// This means checksums failed. Start again with a new checkpoint.
358-
// TODO: better back-off
359-
// await new Promise((resolve) => setTimeout(resolve, 50));
360-
return false;
361-
} else if (!result.ready) {
362-
// Checksums valid, but need more data for a consistent checkpoint.
363-
// Continue waiting.
364-
} else {
365-
appliedCheckpoint = targetCheckpoint;
366-
367-
_updateStatus(
368-
downloading: false,
369-
downloadError: _noError,
370-
lastSyncedAt: DateTime.now());
371-
}
372-
373-
validatedCheckpoint = targetCheckpoint;
374-
} else if (line is StreamingSyncCheckpointDiff) {
375-
// TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
376-
if (targetCheckpoint == null) {
377-
throw PowerSyncProtocolException(
378-
'Checkpoint diff without previous checkpoint');
379-
}
380-
_updateStatus(downloading: true);
381-
final diff = line;
382-
final Map<String, BucketChecksum> newBuckets = {};
383-
for (var checksum in targetCheckpoint.checksums) {
384-
newBuckets[checksum.bucket] = checksum;
385-
}
386-
for (var checksum in diff.updatedBuckets) {
387-
newBuckets[checksum.bucket] = checksum;
388-
}
389-
for (var bucket in diff.removedBuckets) {
390-
newBuckets.remove(bucket);
391-
}
392-
393-
final newCheckpoint = Checkpoint(
394-
lastOpId: diff.lastOpId,
395-
checksums: [...newBuckets.values],
396-
writeCheckpoint: diff.writeCheckpoint);
397-
targetCheckpoint = newCheckpoint;
398-
399-
bucketSet = Set.from(newBuckets.keys);
400-
await adapter.removeBuckets(diff.removedBuckets);
401-
adapter.setTargetCheckpoint(targetCheckpoint);
402-
} else if (line is SyncBucketData) {
403-
_updateStatus(downloading: true);
404-
await adapter.saveSyncData(SyncDataBatch([line]));
405-
} else if (line is StreamingSyncKeepalive) {
406-
if (line.tokenExpiresIn == 0) {
407-
// Token expired already - stop the connection immediately
408-
invalidCredentialsCallback?.call().ignore();
409-
break;
410-
} else if (line.tokenExpiresIn <= 30) {
411-
// Token expires soon - refresh it in the background
412-
if (credentialsInvalidation == null &&
413-
invalidCredentialsCallback != null) {
414-
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
415-
// Token has been refreshed - we should restart the connection.
416-
haveInvalidated = true;
417-
// trigger next loop iteration ASAP, don't wait for another
418-
// message from the server.
419-
_localPingController.add(null);
420-
}, onError: (_) {
421-
// Token refresh failed - retry on next keepalive.
422-
credentialsInvalidation = null;
423-
});
343+
switch (line) {
344+
case Checkpoint():
345+
targetCheckpoint = line;
346+
final Set<String> bucketsToDelete = {...bucketSet};
347+
final Set<String> newBuckets = {};
348+
for (final checksum in line.checksums) {
349+
newBuckets.add(checksum.bucket);
350+
bucketsToDelete.remove(checksum.bucket);
424351
}
425-
}
426-
} else {
427-
if (targetCheckpoint == appliedCheckpoint) {
428-
_updateStatus(
429-
downloading: false,
430-
downloadError: _noError,
431-
lastSyncedAt: DateTime.now());
432-
} else if (validatedCheckpoint == targetCheckpoint) {
352+
bucketSet = newBuckets;
353+
await adapter.removeBuckets([...bucketsToDelete]);
354+
_updateStatus(downloading: true);
355+
case StreamingSyncCheckpointComplete():
433356
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
434357
if (!result.checkpointValid) {
435358
// This means checksums failed. Start again with a new checkpoint.
@@ -447,7 +370,85 @@ class StreamingSyncImplementation implements StreamingSync {
447370
downloadError: _noError,
448371
lastSyncedAt: DateTime.now());
449372
}
450-
}
373+
374+
validatedCheckpoint = targetCheckpoint;
375+
case StreamingSyncCheckpointDiff():
376+
// TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
377+
if (targetCheckpoint == null) {
378+
throw PowerSyncProtocolException(
379+
'Checkpoint diff without previous checkpoint');
380+
}
381+
_updateStatus(downloading: true);
382+
final diff = line;
383+
final Map<String, BucketChecksum> newBuckets = {};
384+
for (var checksum in targetCheckpoint.checksums) {
385+
newBuckets[checksum.bucket] = checksum;
386+
}
387+
for (var checksum in diff.updatedBuckets) {
388+
newBuckets[checksum.bucket] = checksum;
389+
}
390+
for (var bucket in diff.removedBuckets) {
391+
newBuckets.remove(bucket);
392+
}
393+
394+
final newCheckpoint = Checkpoint(
395+
lastOpId: diff.lastOpId,
396+
checksums: [...newBuckets.values],
397+
writeCheckpoint: diff.writeCheckpoint);
398+
targetCheckpoint = newCheckpoint;
399+
400+
bucketSet = Set.from(newBuckets.keys);
401+
await adapter.removeBuckets(diff.removedBuckets);
402+
adapter.setTargetCheckpoint(targetCheckpoint);
403+
case SyncBucketData():
404+
_updateStatus(downloading: true);
405+
await adapter.saveSyncData(SyncDataBatch([line]));
406+
case StreamingSyncKeepalive():
407+
if (line.tokenExpiresIn == 0) {
408+
// Token expired already - stop the connection immediately
409+
invalidCredentialsCallback?.call().ignore();
410+
break;
411+
} else if (line.tokenExpiresIn <= 30) {
412+
// Token expires soon - refresh it in the background
413+
if (credentialsInvalidation == null &&
414+
invalidCredentialsCallback != null) {
415+
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
416+
// Token has been refreshed - we should restart the connection.
417+
haveInvalidated = true;
418+
// trigger next loop iteration ASAP, don't wait for another
419+
// message from the server.
420+
_localPingController.add(null);
421+
}, onError: (_) {
422+
// Token refresh failed - retry on next keepalive.
423+
credentialsInvalidation = null;
424+
});
425+
}
426+
}
427+
case null: // Local ping
428+
if (targetCheckpoint == appliedCheckpoint) {
429+
_updateStatus(
430+
downloading: false,
431+
downloadError: _noError,
432+
lastSyncedAt: DateTime.now());
433+
} else if (validatedCheckpoint == targetCheckpoint) {
434+
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
435+
if (!result.checkpointValid) {
436+
// This means checksums failed. Start again with a new checkpoint.
437+
// TODO: better back-off
438+
// await new Promise((resolve) => setTimeout(resolve, 50));
439+
return false;
440+
} else if (!result.ready) {
441+
// Checksums valid, but need more data for a consistent checkpoint.
442+
// Continue waiting.
443+
} else {
444+
appliedCheckpoint = targetCheckpoint;
445+
446+
_updateStatus(
447+
downloading: false,
448+
downloadError: _noError,
449+
lastSyncedAt: DateTime.now());
450+
}
451+
}
451452
}
452453

453454
if (haveInvalidated) {
@@ -458,7 +459,8 @@ class StreamingSyncImplementation implements StreamingSync {
458459
return true;
459460
}
460461

461-
Stream<Object?> streamingSyncRequest(StreamingSyncRequest data) async* {
462+
Stream<StreamingSyncLine?> streamingSyncRequest(
463+
StreamingSyncRequest data) async* {
462464
final credentials = await credentialsCallback();
463465
if (credentials == null) {
464466
throw CredentialsException('Not logged in');
@@ -498,7 +500,7 @@ class StreamingSyncImplementation implements StreamingSync {
498500
if (aborted) {
499501
break;
500502
}
501-
yield parseStreamingSyncLine(line as Map<String, dynamic>);
503+
yield StreamingSyncLine.fromJson(line as Map<String, dynamic>);
502504
}
503505
}
504506

0 commit comments

Comments
 (0)