Skip to content

Commit 12680b9

Browse files
committed
Test previous sync interfaces
1 parent e4ffdad commit 12680b9

File tree

3 files changed

+354
-3
lines changed

3 files changed

+354
-3
lines changed

crates/core/src/operations.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,27 @@ use crate::error::SQLiteError;
22
use crate::sync::line::DataLine;
33
use crate::sync::operations::insert_bucket_operations;
44
use crate::sync::storage_adapter::StorageAdapter;
5+
use alloc::vec::Vec;
6+
use serde::Deserialize;
57
use sqlite_nostd as sqlite;
68
use sqlite_nostd::{Connection, ResultCode};
79

810
use crate::ext::SafeManagedStmt;
911

1012
// Run inside a transaction
1113
pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
12-
let line = serde_json::from_str::<DataLine>(data)?;
14+
#[derive(Deserialize)]
15+
struct DataBatch<'a> {
16+
#[serde(borrow)]
17+
buckets: Vec<DataLine<'a>>,
18+
}
19+
20+
let batch = serde_json::from_str::<DataBatch>(data)?;
1321
let adapter = StorageAdapter::new(db)?;
14-
insert_bucket_operations(&adapter, &line)?;
22+
23+
for bucket in &batch.buckets {
24+
insert_bucket_operations(&adapter, &bucket)?;
25+
}
1526

1627
Ok(())
1728
}

crates/core/src/schema/management.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ SELECT
3737
let local_only = statement.column_int(2) != 0;
3838

3939
db.exec_safe(&format!(
40-
"CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data TEXT) STRICT",
40+
"CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data TEXT)",
4141
quote_identifier(internal_name)
4242
))
4343
.into_db_result(db)?;

dart/test/legacy_sync_test.dart

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
import 'dart:convert';
2+
3+
import 'package:fake_async/fake_async.dart';
4+
import 'package:file/local.dart';
5+
import 'package:sqlite3/common.dart';
6+
import 'package:sqlite3/sqlite3.dart';
7+
import 'package:sqlite3_test/sqlite3_test.dart';
8+
import 'package:test/test.dart';
9+
10+
import 'utils/native_test_utils.dart';
11+
12+
/// Tests that the older sync interfaces requiring clients to decode and handle
13+
/// sync lines still work.
14+
void main() {
15+
final vfs = TestSqliteFileSystem(fs: const LocalFileSystem());
16+
17+
setUpAll(() {
18+
loadExtension();
19+
sqlite3.registerVirtualFileSystem(vfs, makeDefault: false);
20+
});
21+
tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs));
22+
23+
group('sync tests', () {
24+
late CommonDatabase db;
25+
26+
setUp(() async {
27+
db = openTestDatabase(vfs)
28+
..select('select powersync_init();')
29+
..select('select powersync_replace_schema(?)', [json.encode(_schema)]);
30+
});
31+
32+
tearDown(() {
33+
db.dispose();
34+
});
35+
36+
void pushSyncData(
37+
String bucket,
38+
String opId,
39+
String rowId,
40+
Object op,
41+
Object? data, {
42+
Object? descriptions = _bucketDescriptions,
43+
}) {
44+
final encoded = json.encode({
45+
'buckets': [
46+
{
47+
'bucket': bucket,
48+
'data': [
49+
{
50+
'op_id': opId,
51+
'op': op,
52+
'object_type': 'items',
53+
'object_id': rowId,
54+
'checksum': 0,
55+
'data': data,
56+
}
57+
],
58+
}
59+
],
60+
if (descriptions != null) 'descriptions': descriptions,
61+
});
62+
63+
db.execute('insert into powersync_operations (op, data) VALUES (?, ?);',
64+
['save', encoded]);
65+
}
66+
67+
bool pushCheckpointComplete(
68+
String lastOpId, String? writeCheckpoint, List<Object?> checksums,
69+
{int? priority}) {
70+
final [row] = db.select('select powersync_validate_checkpoint(?) as r;', [
71+
json.encode({
72+
'last_op_id': lastOpId,
73+
'write_checkpoint': writeCheckpoint,
74+
'buckets': [
75+
for (final cs in checksums.cast<Map<String, dynamic>>())
76+
if (priority == null || cs['priority'] <= priority) cs
77+
],
78+
'priority': priority,
79+
})
80+
]);
81+
82+
final decoded = json.decode(row['r']);
83+
if (decoded['valid'] != true) {
84+
fail(row['r']);
85+
}
86+
87+
db.execute(
88+
'UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))',
89+
[
90+
lastOpId,
91+
json.encode(checksums.map((e) => (e as Map)['bucket']).toList())
92+
],
93+
);
94+
95+
db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [
96+
'sync_local',
97+
priority != null
98+
? jsonEncode({
99+
'priority': priority,
100+
'buckets': [
101+
for (final cs in checksums.cast<Map<String, dynamic>>())
102+
if (cs['priority'] <= priority) cs['bucket']
103+
],
104+
})
105+
: null,
106+
]);
107+
return db.lastInsertRowId == 1;
108+
}
109+
110+
ResultSet fetchRows() {
111+
return db.select('select * from items');
112+
}
113+
114+
test('does not publish until reaching checkpoint', () {
115+
expect(fetchRows(), isEmpty);
116+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
117+
expect(fetchRows(), isEmpty);
118+
119+
expect(
120+
pushCheckpointComplete(
121+
'1', null, [_bucketChecksum('prio1', 1, checksum: 0)]),
122+
isTrue);
123+
expect(fetchRows(), [
124+
{'id': 'row-0', 'col': 'hi'}
125+
]);
126+
});
127+
128+
test('does not publish with pending local data', () {
129+
expect(fetchRows(), isEmpty);
130+
db.execute("insert into items (id, col) values ('local', 'data');");
131+
expect(fetchRows(), isNotEmpty);
132+
133+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
134+
expect(
135+
pushCheckpointComplete(
136+
'1', null, [_bucketChecksum('prio1', 1, checksum: 0)]),
137+
isFalse);
138+
expect(fetchRows(), [
139+
{'id': 'local', 'col': 'data'}
140+
]);
141+
});
142+
143+
test('publishes with local data for prio=0 buckets', () {
144+
expect(fetchRows(), isEmpty);
145+
db.execute("insert into items (id, col) values ('local', 'data');");
146+
expect(fetchRows(), isNotEmpty);
147+
148+
pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'});
149+
expect(
150+
pushCheckpointComplete(
151+
'1',
152+
null,
153+
[_bucketChecksum('prio0', 0, checksum: 0)],
154+
priority: 0,
155+
),
156+
isTrue,
157+
);
158+
expect(fetchRows(), [
159+
{'id': 'local', 'col': 'data'},
160+
{'id': 'row-0', 'col': 'hi'},
161+
]);
162+
});
163+
164+
test('can publish partial checkpoints under different priorities', () {
165+
for (var i = 0; i < 4; i++) {
166+
pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'});
167+
}
168+
expect(fetchRows(), isEmpty);
169+
170+
// Simulate a partial checkpoint complete for each of the buckets.
171+
for (var i = 0; i < 4; i++) {
172+
expect(
173+
pushCheckpointComplete(
174+
'1',
175+
null,
176+
[
177+
for (var j = 0; j <= 4; j++)
178+
_bucketChecksum(
179+
'prio$j',
180+
j,
181+
// Give buckets outside of the current priority a wrong
182+
// checksum. They should not be validated yet.
183+
checksum: j <= i ? 0 : 1234,
184+
),
185+
],
186+
priority: i,
187+
),
188+
isTrue,
189+
);
190+
191+
expect(fetchRows(), [
192+
for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'},
193+
]);
194+
195+
expect(db.select('select 1 from ps_sync_state where priority = ?', [i]),
196+
isNotEmpty);
197+
// A sync at this priority includes all higher priorities too, so they
198+
// should be cleared.
199+
expect(db.select('select 1 from ps_sync_state where priority < ?', [i]),
200+
isEmpty);
201+
}
202+
});
203+
204+
test('can sync multiple times', () {
205+
fakeAsync((controller) {
206+
for (var i = 0; i < 10; i++) {
207+
for (var prio in const [1, 2, 3, null]) {
208+
pushCheckpointComplete('1', null, [], priority: prio);
209+
210+
// Make sure there's only a single row in last_synced_at
211+
expect(
212+
db.select(
213+
"SELECT datetime(last_synced_at, 'localtime') AS last_synced_at FROM ps_sync_state WHERE priority = ?",
214+
[prio ?? 2147483647]),
215+
[
216+
{'last_synced_at': '2025-03-01 ${10 + i}:00:00'}
217+
],
218+
);
219+
220+
if (prio == null) {
221+
expect(
222+
db.select(
223+
"SELECT datetime(powersync_last_synced_at(), 'localtime') AS last_synced_at"),
224+
[
225+
{'last_synced_at': '2025-03-01 ${10 + i}:00:00'}
226+
],
227+
);
228+
}
229+
}
230+
231+
controller.elapse(const Duration(hours: 1));
232+
}
233+
}, initialTime: DateTime(2025, 3, 1, 10));
234+
});
235+
236+
test('clearing database clears sync status', () {
237+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
238+
239+
expect(
240+
pushCheckpointComplete(
241+
'1', null, [_bucketChecksum('prio1', 1, checksum: 0)]),
242+
isTrue);
243+
expect(db.select('SELECT powersync_last_synced_at() AS r').single,
244+
{'r': isNotNull});
245+
expect(db.select('SELECT priority FROM ps_sync_state').single,
246+
{'priority': 2147483647});
247+
248+
db.execute('SELECT powersync_clear(0)');
249+
expect(db.select('SELECT powersync_last_synced_at() AS r').single,
250+
{'r': isNull});
251+
expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0));
252+
});
253+
254+
test('tracks download progress', () {
255+
const bucket = 'bkt';
256+
void expectProgress(int atLast, int sinceLast) {
257+
final [row] = db.select(
258+
'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?',
259+
[bucket],
260+
);
261+
final [actualAtLast, actualSinceLast] = row.values;
262+
263+
expect(actualAtLast, atLast, reason: 'count_at_last mismatch');
264+
expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch');
265+
}
266+
267+
pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'});
268+
expectProgress(0, 1);
269+
270+
pushSyncData(bucket, '2', 'row-1', 'PUT', {'col': 'hi'});
271+
expectProgress(0, 2);
272+
273+
expect(
274+
pushCheckpointComplete(
275+
'2',
276+
null,
277+
[_bucketChecksum(bucket, 1, checksum: 0)],
278+
priority: 1,
279+
),
280+
isTrue,
281+
);
282+
283+
// Running partial or complete checkpoints should not reset stats, client
284+
// SDKs are responsible for that.
285+
expectProgress(0, 2);
286+
expect(db.select('SELECT * FROM items'), isNotEmpty);
287+
288+
expect(
289+
pushCheckpointComplete(
290+
'2',
291+
null,
292+
[_bucketChecksum(bucket, 1, checksum: 0)],
293+
),
294+
isTrue,
295+
);
296+
expectProgress(0, 2);
297+
298+
db.execute('''
299+
UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
300+
WHERE ?1->name IS NOT NULL
301+
''', [
302+
json.encode({bucket: 2}),
303+
]);
304+
expectProgress(2, 0);
305+
306+
// Run another iteration of this
307+
pushSyncData(bucket, '3', 'row-3', 'PUT', {'col': 'hi'});
308+
expectProgress(2, 1);
309+
db.execute('''
310+
UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
311+
WHERE ?1->name IS NOT NULL
312+
''', [
313+
json.encode({bucket: 3}),
314+
]);
315+
expectProgress(3, 0);
316+
});
317+
});
318+
}
319+
320+
Object? _bucketChecksum(String bucket, int prio, {int checksum = 0}) {
321+
return {'bucket': bucket, 'priority': prio, 'checksum': checksum};
322+
}
323+
324+
const _schema = {
325+
'tables': [
326+
{
327+
'name': 'items',
328+
'columns': [
329+
{'name': 'col', 'type': 'text'}
330+
],
331+
}
332+
]
333+
};
334+
335+
const _bucketDescriptions = {
336+
'prio0': {'priority': 0},
337+
'prio1': {'priority': 1},
338+
'prio2': {'priority': 2},
339+
'prio3': {'priority': 3},
340+
};

0 commit comments

Comments
 (0)