Skip to content

Commit 8a3dded

Browse files
committed
Track operation counters from previous syncs
1 parent 6e66936 commit 8a3dded

14 files changed

+166
-174
lines changed

packages/powersync_core/lib/powersync_core.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ export 'src/exceptions.dart';
1010
export 'src/log.dart';
1111
export 'src/open_factory.dart';
1212
export 'src/schema.dart';
13-
export 'src/sync_status.dart' hide BucketProgress, InternalSyncDownloadProgress;
13+
export 'src/sync/sync_status.dart'
14+
hide BucketProgress, InternalSyncDownloadProgress;
1415
export 'src/uuid.dart';

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import 'package:meta/meta.dart';
55
import 'package:http/http.dart' as http;
66
import 'package:logging/logging.dart';
77
import 'package:powersync_core/src/abort_controller.dart';
8-
import 'package:powersync_core/src/bucket_storage.dart';
8+
import 'package:powersync_core/src/sync/bucket_storage.dart';
99
import 'package:powersync_core/src/connector.dart';
1010
import 'package:powersync_core/src/database/powersync_database.dart';
1111
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
@@ -17,7 +17,7 @@ import 'package:powersync_core/src/open_factory/native/native_open_factory.dart'
1717
import 'package:powersync_core/src/schema.dart';
1818
import 'package:powersync_core/src/schema_logic.dart';
1919
import 'package:powersync_core/src/sync/streaming_sync.dart';
20-
import 'package:powersync_core/src/sync_status.dart';
20+
import 'package:powersync_core/src/sync/sync_status.dart';
2121
import 'package:sqlite_async/sqlite3_common.dart';
2222
import 'package:sqlite_async/sqlite_async.dart';
2323

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import 'package:powersync_core/src/database/core_version.dart';
1111
import 'package:powersync_core/src/powersync_update_notification.dart';
1212
import 'package:powersync_core/src/schema.dart';
1313
import 'package:powersync_core/src/schema_logic.dart';
14-
import 'package:powersync_core/src/sync_status.dart';
14+
import 'package:powersync_core/src/sync/sync_status.dart';
1515

1616
mixin PowerSyncDatabaseMixin implements SqliteConnection {
1717
/// Schema used for the local database.

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import 'package:meta/meta.dart';
33
import 'package:fetch_client/fetch_client.dart';
44
import 'package:logging/logging.dart';
55
import 'package:powersync_core/src/abort_controller.dart';
6-
import 'package:powersync_core/src/bucket_storage.dart';
6+
import 'package:powersync_core/src/sync/bucket_storage.dart';
77
import 'package:powersync_core/src/connector.dart';
88
import 'package:powersync_core/src/database/powersync_database.dart';
99
import 'package:powersync_core/src/database/powersync_db_mixin.dart';

packages/powersync_core/lib/src/bucket_storage.dart renamed to packages/powersync_core/lib/src/sync/bucket_storage.dart

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
1+
@internal
2+
library;
3+
14
import 'dart:async';
25
import 'dart:convert';
36

47
import 'package:collection/collection.dart';
8+
import 'package:meta/meta.dart';
59
import 'package:powersync_core/sqlite_async.dart';
610
import 'package:powersync_core/sqlite3_common.dart';
711

8-
import 'crud.dart';
9-
import 'schema_logic.dart';
10-
import 'sync/protocol.dart';
12+
import '../crud.dart';
13+
import '../schema_logic.dart';
14+
import 'protocol.dart';
1115

1216
const compactOperationInterval = 1000;
1317

18+
typedef LocalOperationCounters = ({int atLast, int sinceLast});
19+
1420
class BucketStorage {
1521
final SqliteConnection _internalDb;
1622
bool _hasCompletedSync = false;
@@ -31,14 +37,30 @@ class BucketStorage {
3137

3238
Future<List<BucketState>> getBucketStates() async {
3339
final rows = await select(
34-
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\'');
40+
"SELECT name, cast(last_op as TEXT) FROM ps_buckets WHERE pending_delete = 0 AND name != '\$local'");
3541
return [
3642
for (var row in rows)
3743
BucketState(
38-
bucket: row['bucket'] as String, opId: row['op_id'] as String)
44+
bucket: row.columnAt(0) as String,
45+
opId: row.columnAt(1) as String,
46+
)
3947
];
4048
}
4149

50+
Future<Map<String, LocalOperationCounters>>
51+
getBucketOperationProgress() async {
52+
final rows = await select(
53+
"SELECT name, count_at_last, count_since_last FROM ps_buckets");
54+
55+
return {
56+
for (final row in rows)
57+
(row.columnAt(0) as String): (
58+
atLast: row.columnAt(1) as int,
59+
sinceLast: row.columnAt(2) as int,
60+
)
61+
};
62+
}
63+
4264
Future<String> getClientId() async {
4365
final rows = await select('SELECT powersync_client_id() as client_id');
4466
return rows.first['client_id'] as String;

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import 'dart:async';
22

33
import 'package:collection/collection.dart';
44

5-
import '../sync_status.dart';
5+
import 'sync_status.dart';
6+
import 'bucket_storage.dart';
67
import 'protocol.dart';
78

89
final class MutableSyncStatus {
@@ -57,26 +58,24 @@ final class MutableSyncStatus {
5758
];
5859
}
5960

60-
void applyCheckpointStarted(Checkpoint target) {
61+
void applyCheckpointStarted(
62+
Map<String, LocalOperationCounters> localProgress,
63+
Checkpoint target,
64+
) {
6165
downloading = true;
62-
// TODO: Include pending ops from interrupted download, if any...
63-
downloadProgress = InternalSyncDownloadProgress.fromZero(target);
66+
downloadProgress =
67+
InternalSyncDownloadProgress.forNewCheckpoint(localProgress, target);
6468
}
6569

6670
void applyUploadError(Object error) {
6771
uploading = false;
6872
uploadError = error;
6973
}
7074

71-
void applyBatchReceived(
72-
Map<String, BucketDescription?> currentBuckets, SyncDataBatch batch) {
75+
void applyBatchReceived(SyncDataBatch batch) {
7376
downloading = true;
7477
if (downloadProgress case final previousProgress?) {
75-
downloadProgress = previousProgress.incrementDownloaded([
76-
for (final bucket in batch.buckets)
77-
if (currentBuckets[bucket.bucket] case final knownBucket?)
78-
(BucketPriority(knownBucket.priority), bucket.data.length),
79-
]);
78+
downloadProgress = previousProgress.incrementDownloaded(batch);
8079
}
8180
}
8281

packages/powersync_core/lib/src/sync/protocol.dart

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import 'dart:async';
22
import 'dart:convert';
33

4-
import '../bucket_storage.dart';
4+
import 'bucket_storage.dart';
55

66
/// Messages sent from the sync service.
77
sealed class StreamingSyncLine {
@@ -146,8 +146,6 @@ final class Checkpoint extends StreamingSyncLine {
146146
}
147147
}
148148

149-
typedef BucketDescription = ({String name, int priority});
150-
151149
class BucketChecksum {
152150
final String bucket;
153151
final int priority;

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import 'package:powersync_core/src/log_internal.dart';
99
import 'package:powersync_core/src/user_agent/user_agent.dart';
1010
import 'package:sqlite_async/mutex.dart';
1111

12-
import '../bucket_storage.dart';
12+
import 'bucket_storage.dart';
1313
import '../connector.dart';
1414
import '../crud.dart';
1515
import 'mutable_sync_status.dart';
1616
import 'stream_utils.dart';
17-
import '../sync_status.dart';
17+
import 'sync_status.dart';
1818
import 'protocol.dart';
1919

2020
abstract interface class StreamingSync {
@@ -28,7 +28,7 @@ abstract interface class StreamingSync {
2828

2929
@internal
3030
class StreamingSyncImplementation implements StreamingSync {
31-
BucketStorage adapter;
31+
final BucketStorage adapter;
3232

3333
final Future<PowerSyncCredentials?> Function() credentialsCallback;
3434
final Future<void> Function()? invalidCredentialsCallback;
@@ -322,7 +322,9 @@ class StreamingSyncImplementation implements StreamingSync {
322322
}
323323
bucketMap = newBuckets;
324324
await adapter.removeBuckets([...bucketsToDelete]);
325-
_state.updateStatus((s) => s.downloading = true);
325+
final initialProgress = await adapter.getBucketOperationProgress();
326+
_state.updateStatus(
327+
(s) => s.applyCheckpointStarted(initialProgress, line));
326328
case StreamingSyncCheckpointComplete():
327329
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
328330
if (!result.checkpointValid) {
@@ -391,7 +393,7 @@ class StreamingSyncImplementation implements StreamingSync {
391393
case SyncDataBatch():
392394
// TODO: This increments the counters before actually saving sync
393395
// data. Might be fine though?
394-
_state.updateStatus((s) => s.applyBatchReceived(bucketMap, line));
396+
_state.updateStatus((s) => s.applyBatchReceived(line));
395397
_state.updateStatus((s) => s.downloading = true);
396398
await adapter.saveSyncData(line);
397399
case StreamingSyncKeepalive(:final tokenExpiresIn):
@@ -520,3 +522,8 @@ String _syncErrorMessage(Object? error) {
520522
return '${error.runtimeType}';
521523
}
522524
}
525+
526+
typedef BucketDescription = ({
527+
String name,
528+
int priority,
529+
});

0 commit comments

Comments
 (0)