Skip to content

Commit 6e66936

Browse files
committed
Start migrating to new model
1 parent 9d45ba3 commit 6e66936

File tree

5 files changed

+28
-29
lines changed

5 files changed

+28
-29
lines changed

packages/powersync_core/lib/powersync_core.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ export 'src/exceptions.dart';
1010
export 'src/log.dart';
1111
export 'src/open_factory.dart';
1212
export 'src/schema.dart';
13-
export 'src/sync_status.dart' hide InternalSyncDownloadProgress;
13+
export 'src/sync_status.dart' hide BucketProgress, InternalSyncDownloadProgress;
1414
export 'src/uuid.dart';

packages/powersync_core/lib/src/bucket_storage.dart

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ class BucketStorage {
2929
return await _internalDb.execute(query, parameters);
3030
}
3131

32-
void startSession() {}
33-
3432
Future<List<BucketState>> getBucketStates() async {
3533
final rows = await select(
3634
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\'');

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ final class MutableSyncStatus {
4040

4141
void applyCheckpointReached(Checkpoint applied) {
4242
downloading = false;
43+
downloadProgress = null;
4344
downloadError = null;
4445
final now = lastSyncedAt = DateTime.now();
4546
priorityStatusEntries = [

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,6 @@ class StreamingSyncImplementation implements StreamingSync {
285285

286286
Future<bool> streamingSyncIteration(
287287
{AbortController? abortController}) async {
288-
adapter.startSession();
289-
290288
var (bucketRequests, bucketMap) = await _collectLocalBucketState();
291289

292290
Checkpoint? targetCheckpoint;

packages/powersync_core/lib/src/sync_status.dart

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -216,26 +216,32 @@ class UploadQueueStats {
216216
}
217217
}
218218

219+
/// Per-bucket download progress information.
220+
@internal
221+
typedef BucketProgress = ({
222+
BucketPriority priority,
223+
int atLast,
224+
int sinceLast,
225+
int targetCount,
226+
});
227+
219228
@internal
220229
final class InternalSyncDownloadProgress {
221-
final Map<BucketPriority, int> downloaded;
222-
final Map<BucketPriority, int> target;
230+
final Map<String, BucketProgress> buckets;
223231

224232
final int _totalDownloaded;
225233
final int _totalTarget;
226234

227-
InternalSyncDownloadProgress(this.downloaded, this.target)
228-
: _totalDownloaded = target.values.sum,
229-
_totalTarget = target.values.sum;
235+
InternalSyncDownloadProgress(this.buckets)
236+
: _totalDownloaded = buckets.values.map((e) => e.sinceLast).sum,
237+
_totalTarget = buckets.values.map((e) => e.targetCount - e.atLast).sum;
230238

231239
factory InternalSyncDownloadProgress.fromZero(Checkpoint target) {
232240
final targetOps = target.checksums.groupFoldBy<BucketPriority, int>(
233241
(cs) => BucketPriority(cs.priority),
234242
(prev, cs) => (prev ?? 0) + (cs.count ?? 0),
235243
);
236244
final downloaded = targetOps.map((k, v) => MapEntry(k, 0));
237-
238-
return InternalSyncDownloadProgress(downloaded, targetOps);
239245
}
240246

241247
static InternalSyncDownloadProgress ofPublic(SyncDownloadProgress public) {
@@ -250,29 +256,26 @@ final class InternalSyncDownloadProgress {
250256
.sum;
251257
}
252258

253-
InternalSyncDownloadProgress incrementDownloaded(
254-
List<(BucketPriority, int)> opsInPriority) {
255-
var downloadedOps = {...downloaded};
256-
257-
for (final (priority, addedOps) in opsInPriority) {
258-
assert(downloaded.containsKey(priority));
259-
assert(target.containsKey(priority));
260-
261-
downloadedOps[priority] =
262-
max(downloadedOps[priority]! + addedOps, target[priority]!);
259+
InternalSyncDownloadProgress incrementDownloaded(SyncDataBatch batch) {
260+
final newBucketStates = Map.of(buckets);
261+
for (final dataForBucket in batch.buckets) {
262+
final previous = newBucketStates[dataForBucket.bucket]!;
263+
newBucketStates[dataForBucket.bucket] = (
264+
priority: previous.priority,
265+
atLast: previous.atLast,
266+
sinceLast: previous.sinceLast,
267+
targetCount: previous.targetCount,
268+
);
263269
}
264270

265-
return InternalSyncDownloadProgress(downloadedOps, target);
271+
return InternalSyncDownloadProgress(newBucketStates);
266272
}
267273

268274
SyncDownloadProgress get asSyncDownloadProgress =>
269275
SyncDownloadProgress._(this);
270276

271277
@override
272-
int get hashCode => Object.hash(
273-
_mapEquality.hash(downloaded),
274-
_mapEquality.hash(target),
275-
);
278+
int get hashCode => _mapEquality.hash(buckets);
276279

277280
@override
278281
bool operator ==(Object other) {
@@ -281,8 +284,7 @@ final class InternalSyncDownloadProgress {
281284
// them first helps find a difference faster.
282285
_totalDownloaded == other._totalDownloaded &&
283286
_totalTarget == other._totalTarget &&
284-
_mapEquality.equals(downloaded, other.downloaded) &&
285-
_mapEquality.equals(target, other.target);
287+
_mapEquality.equals(buckets, other.buckets);
286288
}
287289

288290
static const _mapEquality = MapEquality<Object?, Object?>();

0 commit comments

Comments
 (0)