Skip to content

Commit 1bd7447

Browse files
committed
Use updated format not persisting priorities
1 parent d238769 commit 1bd7447

File tree

3 files changed

+31
-40
lines changed

3 files changed

+31
-40
lines changed

packages/powersync_core/lib/src/bucket_storage.dart

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,11 @@ class BucketStorage {
3333

3434
Future<List<BucketState>> getBucketStates() async {
3535
final rows = await select(
36-
'SELECT name as bucket, priority, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\'');
36+
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\'');
3737
return [
3838
for (var row in rows)
3939
BucketState(
4040
bucket: row['bucket'],
41-
priority: row['priority'],
4241
opId: row['op_id'],
4342
)
4443
];
@@ -53,20 +52,12 @@ class BucketStorage {
5352
var count = 0;
5453

5554
await writeTransaction((tx) async {
56-
final descriptions = [
57-
for (final MapEntry(:key, :value) in batch.descriptions.entries)
58-
{
59-
key: {'priority': value.priority},
60-
}
61-
];
62-
6355
for (var b in batch.buckets) {
6456
count += b.data.length;
6557
await _updateBucket2(
6658
tx,
6759
jsonEncode({
6860
'buckets': [b],
69-
'descriptions': descriptions,
7061
}));
7162
}
7263
// No need to flush - the data is not directly visible to the user either way.
@@ -136,7 +127,8 @@ class BucketStorage {
136127
// Not flushing here - the flush will happen in the next step
137128
}, flush: false);
138129

139-
final valid = await updateObjectsFromBuckets(forPriority: forPriority);
130+
final valid = await updateObjectsFromBuckets(checkpoint,
131+
forPartialPriority: forPriority);
140132
if (!valid) {
141133
return SyncLocalDatabaseResult(ready: false);
142134
}
@@ -146,11 +138,25 @@ class BucketStorage {
146138
return SyncLocalDatabaseResult(ready: true);
147139
}
148140

149-
Future<bool> updateObjectsFromBuckets({int? forPriority}) async {
141+
Future<bool> updateObjectsFromBuckets(Checkpoint checkpoint,
142+
{int? forPartialPriority}) async {
150143
return writeTransaction((tx) async {
151-
await tx.execute(
152-
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
153-
['sync_local', forPriority]);
144+
await tx
145+
.execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", [
146+
'sync_local',
147+
forPartialPriority != null
148+
? jsonEncode({
149+
'priority': forPartialPriority,
150+
// If we're at a partial checkpoint, we should only publish the
151+
// buckets at the completed priority levels.
152+
'buckets': [
153+
for (final desc in checkpoint.checksums)
154+
// Note that higher priorities are encoded as smaller values
155+
if (desc.priority <= forPartialPriority) desc.bucket,
156+
],
157+
})
158+
: null,
159+
]);
154160
final rs = await tx.execute('SELECT last_insert_rowid() as result');
155161
final result = rs[0]['result'];
156162
if (result == 1) {
@@ -338,11 +344,9 @@ class BucketStorage {
338344

339345
class BucketState {
340346
final String bucket;
341-
final int priority;
342347
final String opId;
343348

344-
const BucketState(
345-
{required this.bucket, required this.priority, required this.opId});
349+
const BucketState({required this.bucket, required this.opId});
346350

347351
@override
348352
String toString() {
@@ -351,23 +355,19 @@ class BucketState {
351355

352356
@override
353357
int get hashCode {
354-
return Object.hash(bucket, priority, opId);
358+
return Object.hash(bucket, opId);
355359
}
356360

357361
@override
358362
bool operator ==(Object other) {
359-
return other is BucketState &&
360-
other.priority == priority &&
361-
other.bucket == bucket &&
362-
other.opId == opId;
363+
return other is BucketState && other.bucket == bucket && other.opId == opId;
363364
}
364365
}
365366

366367
final class SyncDataBatch {
367368
final List<SyncBucketData> buckets;
368-
final Map<String, BucketDescription> descriptions;
369369

370-
SyncDataBatch(this.buckets, this.descriptions);
370+
SyncDataBatch(this.buckets);
371371
}
372372

373373
class SyncLocalDatabaseResult {

packages/powersync_core/lib/src/streaming_sync.dart

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -302,19 +302,15 @@ class StreamingSyncImplementation implements StreamingSync {
302302
_statusStreamController.add(newStatus);
303303
}
304304

305-
Future<(List<BucketRequest>, Map<String, BucketDescription>)>
305+
Future<(List<BucketRequest>, Map<String, BucketDescription?>)>
306306
_collectLocalBucketState() async {
307307
final bucketEntries = await adapter.getBucketStates();
308308

309309
final initialRequests = [
310310
for (final entry in bucketEntries) BucketRequest(entry.bucket, entry.opId)
311311
];
312312
final localDescriptions = {
313-
for (final entry in bucketEntries)
314-
entry.bucket: (
315-
name: entry.bucket,
316-
priority: entry.priority,
317-
)
313+
for (final entry in bucketEntries) entry.bucket: null
318314
};
319315

320316
return (initialRequests, localDescriptions);
@@ -435,7 +431,7 @@ class StreamingSyncImplementation implements StreamingSync {
435431
case SyncBucketData():
436432
// TODO: Merge multiple of these into a single one...
437433
_updateStatus(downloading: true);
438-
await adapter.saveSyncData(SyncDataBatch([line], bucketMap));
434+
await adapter.saveSyncData(SyncDataBatch([line]));
439435
case StreamingSyncKeepalive():
440436
if (line.tokenExpiresIn == 0) {
441437
// Token expired already - stop the connection immediately

packages/powersync_core/test/bucket_storage_test.dart

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ BucketChecksum checksum(
4545
}
4646

4747
SyncDataBatch syncDataBatch(List<SyncBucketData> data) {
48-
return SyncDataBatch(data, {
49-
for (final bucket in data.map((e) => e.bucket).toSet())
50-
bucket: (name: bucket, priority: 1),
51-
});
48+
return SyncDataBatch(data);
5249
}
5350

5451
void main() {
@@ -108,10 +105,8 @@ void main() {
108105
]));
109106

110107
final bucketStates = await bucketStorage.getBucketStates();
111-
expect(
112-
bucketStates,
113-
equals(
114-
[const BucketState(bucket: 'bucket1', opId: '3', priority: 1)]));
108+
expect(bucketStates,
109+
equals([const BucketState(bucket: 'bucket1', opId: '3')]));
115110

116111
await syncLocalChecked(Checkpoint(
117112
lastOpId: '3',

0 commit comments

Comments
 (0)