Skip to content

Commit c99c043

Browse files
committed
Add status for each priority
1 parent f423ffd commit c99c043

File tree

4 files changed

+157
-44
lines changed

4 files changed

+157
-44
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,27 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
135135
}
136136
}
137137

138-
/// Returns a [Future] which will resolve once the first full sync has completed.
139-
Future<void> waitForFirstSync() async {
140-
if (currentStatus.hasSynced ?? false) {
138+
/// Returns a [Future] which will resolve once a synchronization operation has
139+
/// completed.
140+
///
141+
/// When [priority] is null (the default), this method waits for a full sync
142+
/// operation to complete. When set to a [BucketPriority] however, it also
143+
/// completes once a partial sync operation containing that priority has
144+
/// completed.
145+
Future<void> waitForFirstSync({BucketPriority? priority}) async {
146+
bool matches(SyncStatus status) {
147+
if (priority == null) {
148+
return status.hasSynced == true;
149+
} else {
150+
return status.statusForPriority(priority)?.hasSynced == true;
151+
}
152+
}
153+
154+
if (matches(currentStatus)) {
141155
return;
142156
}
143157
await for (final result in statusStream) {
144-
if (result.hasSynced ?? false) {
158+
if (matches(result)) {
145159
break;
146160
}
147161
}

packages/powersync_core/lib/src/streaming_sync.dart

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:async';
22
import 'dart:convert' as convert;
33

4+
import 'package:collection/collection.dart';
45
import 'package:http/http.dart' as http;
56
import 'package:powersync_core/src/abort_controller.dart';
67
import 'package:powersync_core/src/exceptions.dart';
@@ -273,31 +274,62 @@ class StreamingSyncImplementation implements StreamingSync {
273274
return body['data']['write_checkpoint'] as String;
274275
}
275276

277+
void _updateStatusForPriority(SyncPriorityStatus completed) {
278+
// Note: statusInPriority is sorted by priorities (ascending)
279+
final existingPriorityState = lastStatus.statusInPriority;
280+
281+
for (final (i, priority) in existingPriorityState.indexed) {
282+
switch (
283+
BucketPriority.comparator(priority.priority, completed.priority)) {
284+
case < 0:
285+
// Entries from here on have a higher priority than the one that was
286+
// just completed
287+
final copy = existingPriorityState.toList();
288+
copy.insert(i, completed);
289+
_updateStatus(statusInPriority: copy);
290+
return;
291+
case 0:
292+
final copy = existingPriorityState.toList();
293+
copy[i] = completed;
294+
_updateStatus(statusInPriority: copy);
295+
return;
296+
case > 0:
297+
continue;
298+
}
299+
}
300+
301+
_updateStatus(statusInPriority: [...existingPriorityState, completed]);
302+
}
303+
276304
/// Update sync status based on any non-null parameters.
277305
/// To clear errors, use [_noError] instead of null.
278-
void _updateStatus(
279-
{DateTime? lastSyncedAt,
280-
bool? hasSynced,
281-
bool? connected,
282-
bool? connecting,
283-
bool? downloading,
284-
bool? uploading,
285-
Object? uploadError,
286-
Object? downloadError}) {
306+
void _updateStatus({
307+
DateTime? lastSyncedAt,
308+
bool? hasSynced,
309+
bool? connected,
310+
bool? connecting,
311+
bool? downloading,
312+
bool? uploading,
313+
Object? uploadError,
314+
Object? downloadError,
315+
List<SyncPriorityStatus>? statusInPriority,
316+
}) {
287317
final c = connected ?? lastStatus.connected;
288318
var newStatus = SyncStatus(
289-
connected: c,
290-
connecting: !c && (connecting ?? lastStatus.connecting),
291-
lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt,
292-
hasSynced: hasSynced ?? lastStatus.hasSynced,
293-
downloading: downloading ?? lastStatus.downloading,
294-
uploading: uploading ?? lastStatus.uploading,
295-
uploadError: uploadError == _noError
296-
? null
297-
: (uploadError ?? lastStatus.uploadError),
298-
downloadError: downloadError == _noError
299-
? null
300-
: (downloadError ?? lastStatus.downloadError));
319+
connected: c,
320+
connecting: !c && (connecting ?? lastStatus.connecting),
321+
lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt,
322+
hasSynced: hasSynced ?? lastStatus.hasSynced,
323+
downloading: downloading ?? lastStatus.downloading,
324+
uploading: uploading ?? lastStatus.uploading,
325+
uploadError: uploadError == _noError
326+
? null
327+
: (uploadError ?? lastStatus.uploadError),
328+
downloadError: downloadError == _noError
329+
? null
330+
: (downloadError ?? lastStatus.downloadError),
331+
statusInPriority: statusInPriority ?? lastStatus.statusInPriority,
332+
);
301333
lastStatus = newStatus;
302334
_statusStreamController.add(newStatus);
303335
}
@@ -371,10 +403,25 @@ class StreamingSyncImplementation implements StreamingSync {
371403
} else {
372404
appliedCheckpoint = targetCheckpoint;
373405

406+
final now = DateTime.now();
374407
_updateStatus(
375-
downloading: false,
376-
downloadError: _noError,
377-
lastSyncedAt: DateTime.now());
408+
downloading: false,
409+
downloadError: _noError,
410+
lastSyncedAt: now,
411+
statusInPriority: [
412+
if (appliedCheckpoint.checksums.isNotEmpty)
413+
(
414+
hasSynced: true,
415+
lastSyncedAt: now,
416+
priority: maxBy(
417+
appliedCheckpoint.checksums
418+
.map((cs) => BucketPriority(cs.priority)),
419+
(priority) => priority,
420+
compare: BucketPriority.comparator,
421+
)!,
422+
)
423+
],
424+
);
378425
}
379426

380427
validatedCheckpoint = targetCheckpoint;
@@ -390,12 +437,11 @@ class StreamingSyncImplementation implements StreamingSync {
390437
// Checksums valid, but need more data for a consistent checkpoint.
391438
// Continue waiting.
392439
} else {
393-
appliedCheckpoint = targetCheckpoint;
394-
395-
_updateStatus(
396-
downloading: false,
397-
downloadError: _noError,
398-
lastSyncedAt: DateTime.now());
440+
_updateStatusForPriority((
441+
priority: BucketPriority(bucketPriority),
442+
lastSyncedAt: DateTime.now(),
443+
hasSynced: true,
444+
));
399445
}
400446

401447
validatedCheckpoint = targetCheckpoint;

packages/powersync_core/lib/src/sync_status.dart

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
class SyncStatus {
1+
import 'package:collection/collection.dart';
2+
3+
final class SyncStatus {
24
/// true if currently connected.
35
///
46
/// This means the PowerSync connection is ready to download, and
@@ -22,11 +24,11 @@ class SyncStatus {
2224
/// Time that a last sync has fully completed, if any.
2325
///
2426
/// This is null while loading the database.
25-
DateTime? get lastSyncedAt => statusInPriority.lastOrNull?.lastSyncedAt;
27+
final DateTime? lastSyncedAt;
2628

2729
/// Indicates whether there has been at least one full sync, if any.
2830
/// Is null when unknown, for example when state is still being loaded from the database.
29-
bool? get hasSynced => statusInPriority.lastOrNull?.hasSynced;
31+
final bool? hasSynced;
3032

3133
/// Error during uploading.
3234
///
@@ -62,7 +64,8 @@ class SyncStatus {
6264
other.downloadError == downloadError &&
6365
other.uploadError == uploadError &&
6466
other.lastSyncedAt == lastSyncedAt &&
65-
other.hasSynced == hasSynced);
67+
other.hasSynced == hasSynced &&
68+
_statusEquality.equals(other.statusInPriority, statusInPriority));
6669
}
6770

6871
SyncStatus copyWith({
@@ -74,6 +77,7 @@ class SyncStatus {
7477
Object? downloadError,
7578
DateTime? lastSyncedAt,
7679
bool? hasSynced,
80+
List<SyncPriorityStatus>? statusInPriority,
7781
}) {
7882
return SyncStatus(
7983
connected: connected ?? this.connected,
@@ -84,6 +88,7 @@ class SyncStatus {
8488
downloadError: downloadError ?? this.downloadError,
8589
lastSyncedAt: lastSyncedAt ?? this.lastSyncedAt,
8690
hasSynced: hasSynced ?? this.hasSynced,
91+
statusInPriority: statusInPriority ?? this.statusInPriority,
8792
);
8893
}
8994

@@ -92,31 +97,57 @@ class SyncStatus {
9297
return downloadError ?? uploadError;
9398
}
9499

100+
/// Returns [lastSyncedAt] and [hasSynced] information for a partial sync
101+
/// operation, or `null` if the status for that priority is unknown.
102+
SyncPriorityStatus? statusForPriority(BucketPriority priority) {
103+
assert(statusInPriority.isSortedByCompare(
104+
(e) => e.priority, BucketPriority.comparator));
105+
106+
for (final known in statusInPriority) {
107+
// Lower-priority buckets are synchronized after higher-priority buckets,
108+
// and since statusInPriority is sorted we look for the first entry that
109+
// doesn't have a higher priority.
110+
if (BucketPriority.comparator(known.priority, priority) <= 0) {
111+
return known;
112+
}
113+
}
114+
115+
return null;
116+
}
117+
95118
@override
96119
int get hashCode {
97-
return Object.hash(connected, downloading, uploading, connecting,
98-
uploadError, downloadError, lastSyncedAt);
120+
return Object.hash(
121+
connected,
122+
downloading,
123+
uploading,
124+
connecting,
125+
uploadError,
126+
downloadError,
127+
lastSyncedAt,
128+
_statusEquality.hash(statusInPriority));
99129
}
100130

101131
@override
102132
String toString() {
103133
return "SyncStatus<connected: $connected connecting: $connecting downloading: $downloading uploading: $uploading lastSyncedAt: $lastSyncedAt, hasSynced: $hasSynced, error: $anyError>";
104134
}
135+
136+
static const _statusEquality = ListEquality<SyncPriorityStatus>();
105137
}
106138

107139
/// The priority of a PowerSync bucket.
108140
extension type const BucketPriority._(int priorityNumber) {
109141
static const _highest = 0;
110-
static const _lowests = 3;
111142

112143
factory BucketPriority(int i) {
113-
assert(i >= _highest && i <= _lowests);
144+
assert(i >= _highest);
114145
return BucketPriority._(i);
115146
}
116147

117148
/// A [Comparator] instance suitable for comparing [BucketPriority] values.
118-
static Comparator<BucketPriority> comparator =
119-
(a, b) => -a.priorityNumber.compareTo(b.priorityNumber);
149+
static int comparator(BucketPriority a, BucketPriority b) =>
150+
-a.priorityNumber.compareTo(b.priorityNumber);
120151
}
121152

122153
/// Partial information about the synchronization status for buckets within a

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
157157
required bool? hasSyned,
158158
required String? uploadError,
159159
required String? downloadError,
160+
required JSArray? statusInPriority,
160161
});
161162

162163
factory SerializedSyncStatus.from(SyncStatus status) {
@@ -169,6 +170,14 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
169170
hasSyned: status.hasSynced,
170171
uploadError: status.uploadError?.toString(),
171172
downloadError: status.downloadError?.toString(),
173+
statusInPriority: <JSArray?>[
174+
for (final entry in status.statusInPriority)
175+
[
176+
entry.priority.priorityNumber.toJS,
177+
entry.lastSyncedAt.microsecondsSinceEpoch.toJS,
178+
entry.hasSynced.toJS,
179+
].toJS
180+
].toJS,
172181
);
173182
}
174183

@@ -180,6 +189,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
180189
external bool? hasSynced;
181190
external String? uploadError;
182191
external String? downloadError;
192+
external JSArray? statusInPriority;
183193

184194
SyncStatus asSyncStatus() {
185195
return SyncStatus(
@@ -193,6 +203,18 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject {
193203
hasSynced: hasSynced,
194204
uploadError: uploadError,
195205
downloadError: downloadError,
206+
statusInPriority: statusInPriority?.toDart.map((e) {
207+
final [rawPriority, rawSynced, rawHasSynced, ...] =
208+
(e as JSArray).toDart;
209+
210+
return (
211+
priority: BucketPriority((rawPriority as JSNumber).toDartInt),
212+
lastSyncedAt: DateTime.fromMicrosecondsSinceEpoch(
213+
(rawSynced as JSNumber).toDartInt),
214+
hasSynced: (rawHasSynced as JSBoolean).toDart,
215+
);
216+
}).toList() ??
217+
const [],
196218
);
197219
}
198220
}

0 commit comments

Comments
 (0)