Skip to content

Commit 7b1ba31

Browse files
authored
Merge pull request #192 from powersync-ja/feat/bucket-priorities
Bucket priorities
2 parents a19e166 + b683952 commit 7b1ba31

31 files changed

+1078
-251
lines changed

.changeset/odd-snails-end.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': patch
3+
---
4+
5+
Stream changes in priority order.

.changeset/tall-peas-cough.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-sync-rules': minor
3+
---
4+
5+
Replace bucket ids from queries with a description also containing a priority.

.github/workflows/test.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ jobs:
151151
- name: Test
152152
run: pnpm test --filter='./modules/module-postgres'
153153

154+
- name: Test
155+
run: pnpm test --filter='./modules/module-postgres-storage'
156+
154157
run-mysql-tests:
155158
name: MySQL Test
156159
runs-on: ubuntu-latest
@@ -287,3 +290,6 @@ jobs:
287290

288291
- name: Test
289292
run: pnpm test --filter='./modules/module-mongodb'
293+
294+
- name: Test Storage
295+
run: pnpm test --filter='./modules/module-mongodb-storage'

docs/compacting-operations.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@ The second part is compacting to CLEAR operations. For each bucket, we keep trac
7575
For an initial workaround, defragmenting can be performed outside powersync by touching all rows in a bucket:
7676

7777
```sql
78-
update mytable set id = id
79-
-- Repeat the above for other tables in the same bucket if relevant
78+
UPDATE mytable
79+
SET
80+
id = id
81+
-- Repeat the above for other tables in the same bucket if relevant
8082
```
8183

8284
After this, the normal MOVE + CLEAR compacting will compact the bucket to only have a single operation per active row.
@@ -86,7 +88,11 @@ This would cause existing clients to re-sync every row, while reducing the numbe
8688
If an updated_at column or similar is present, we can use this to defragment more incrementally:
8789

8890
```sql
89-
update mytable set id = id where updated_at < now() - interval '1 week'
91+
UPDATE mytable
92+
SET
93+
id = id
94+
WHERE
95+
updated_at < now() - interval '1 week'
9096
```
9197

9298
This version avoids unnecessary defragmentation of rows modified recently.

docs/postgres-initial-replication.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ This is our first approach.
1111
We start by creating a logical replication slot, exporting a snapshot:
1212

1313
```sql
14-
CREATE_REPLICATION_SLOT <slot> LOGICAL pgoutput EXPORT_SNAPSHOT
14+
CREATE_REPLICATION_SLOT < slot > LOGICAL pgoutput EXPORT_SNAPSHOT
1515
```
1616

1717
While that connection stays open, we create another connection with a transaction, and read each table:

modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 1`] = `
99
"bucket": "mybucket[]",
1010
"checksum": -93886621,
1111
"count": 2,
12+
"priority": 3,
1213
},
1314
],
1415
"last_op_id": "2",
@@ -44,6 +45,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = `
4445
"bucket": "mybucket[]",
4546
"checksum": 499012468,
4647
"count": 4,
48+
"priority": 3,
4749
},
4850
],
4951
"write_checkpoint": undefined,
@@ -102,6 +104,7 @@ exports[`sync - mongodb > expiring token 1`] = `
102104
"bucket": "mybucket[]",
103105
"checksum": 0,
104106
"count": 0,
107+
"priority": 3,
105108
},
106109
],
107110
"last_op_id": "0",
@@ -124,6 +127,80 @@ exports[`sync - mongodb > expiring token 2`] = `
124127
]
125128
`;
126129

130+
exports[`sync - mongodb > sync buckets in order 1`] = `
131+
[
132+
{
133+
"checkpoint": {
134+
"buckets": [
135+
{
136+
"bucket": "b0[]",
137+
"checksum": 920318466,
138+
"count": 1,
139+
"priority": 2,
140+
},
141+
{
142+
"bucket": "b1[]",
143+
"checksum": -1382098757,
144+
"count": 1,
145+
"priority": 1,
146+
},
147+
],
148+
"last_op_id": "2",
149+
"write_checkpoint": undefined,
150+
},
151+
},
152+
{
153+
"data": {
154+
"after": "0",
155+
"bucket": "b1[]",
156+
"data": [
157+
{
158+
"checksum": 2912868539n,
159+
"data": "{"id":"earlier","description":"Test 2"}",
160+
"object_id": "earlier",
161+
"object_type": "test",
162+
"op": "PUT",
163+
"op_id": "2",
164+
"subkey": "0dfe86bd-d15b-5fd0-9c7b-a31693030ee0",
165+
},
166+
],
167+
"has_more": false,
168+
"next_after": "2",
169+
},
170+
},
171+
{
172+
"partial_checkpoint_complete": {
173+
"last_op_id": "2",
174+
"priority": 1,
175+
},
176+
},
177+
{
178+
"data": {
179+
"after": "0",
180+
"bucket": "b0[]",
181+
"data": [
182+
{
183+
"checksum": 920318466n,
184+
"data": "{"id":"t1","description":"Test 1"}",
185+
"object_id": "t1",
186+
"object_type": "test",
187+
"op": "PUT",
188+
"op_id": "1",
189+
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
190+
},
191+
],
192+
"has_more": false,
193+
"next_after": "1",
194+
},
195+
},
196+
{
197+
"checkpoint_complete": {
198+
"last_op_id": "2",
199+
},
200+
},
201+
]
202+
`;
203+
127204
exports[`sync - mongodb > sync global data 1`] = `
128205
[
129206
{
@@ -133,6 +210,7 @@ exports[`sync - mongodb > sync global data 1`] = `
133210
"bucket": "mybucket[]",
134211
"checksum": -93886621,
135212
"count": 2,
213+
"priority": 3,
136214
},
137215
],
138216
"last_op_id": "2",
@@ -184,6 +262,7 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = `
184262
"bucket": "mybucket[]",
185263
"checksum": -852817836,
186264
"count": 1,
265+
"priority": 3,
187266
},
188267
],
189268
"last_op_id": "1",
@@ -231,6 +310,7 @@ exports[`sync - mongodb > sync updates to global data 1`] = `
231310
"bucket": "mybucket[]",
232311
"checksum": 0,
233312
"count": 0,
313+
"priority": 3,
234314
},
235315
],
236316
"last_op_id": "0",
@@ -256,6 +336,7 @@ exports[`sync - mongodb > sync updates to global data 2`] = `
256336
"bucket": "mybucket[]",
257337
"checksum": 920318466,
258338
"count": 1,
339+
"priority": 3,
259340
},
260341
],
261342
"write_checkpoint": undefined,
@@ -299,6 +380,7 @@ exports[`sync - mongodb > sync updates to global data 3`] = `
299380
"bucket": "mybucket[]",
300381
"checksum": -93886621,
301382
"count": 2,
383+
"priority": 3,
302384
},
303385
],
304386
"write_checkpoint": undefined,

modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 1`] = `
99
"bucket": "mybucket[]",
1010
"checksum": -93886621,
1111
"count": 2,
12+
"priority": 3,
1213
},
1314
],
1415
"last_op_id": "2",
@@ -44,6 +45,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = `
4445
"bucket": "mybucket[]",
4546
"checksum": 499012468,
4647
"count": 4,
48+
"priority": 3,
4749
},
4850
],
4951
"write_checkpoint": undefined,
@@ -102,6 +104,7 @@ exports[`sync - postgres > expiring token 1`] = `
102104
"bucket": "mybucket[]",
103105
"checksum": 0,
104106
"count": 0,
107+
"priority": 3,
105108
},
106109
],
107110
"last_op_id": "0",
@@ -124,6 +127,80 @@ exports[`sync - postgres > expiring token 2`] = `
124127
]
125128
`;
126129

130+
exports[`sync - postgres > sync buckets in order 1`] = `
131+
[
132+
{
133+
"checkpoint": {
134+
"buckets": [
135+
{
136+
"bucket": "b0[]",
137+
"checksum": 920318466,
138+
"count": 1,
139+
"priority": 2,
140+
},
141+
{
142+
"bucket": "b1[]",
143+
"checksum": -1382098757,
144+
"count": 1,
145+
"priority": 1,
146+
},
147+
],
148+
"last_op_id": "2",
149+
"write_checkpoint": undefined,
150+
},
151+
},
152+
{
153+
"data": {
154+
"after": "0",
155+
"bucket": "b1[]",
156+
"data": [
157+
{
158+
"checksum": 2912868539n,
159+
"data": "{"id":"earlier","description":"Test 2"}",
160+
"object_id": "earlier",
161+
"object_type": "test",
162+
"op": "PUT",
163+
"op_id": "2",
164+
"subkey": "243b0e26-87b2-578a-993c-5ac5b6f7fd64",
165+
},
166+
],
167+
"has_more": false,
168+
"next_after": "2",
169+
},
170+
},
171+
{
172+
"partial_checkpoint_complete": {
173+
"last_op_id": "2",
174+
"priority": 1,
175+
},
176+
},
177+
{
178+
"data": {
179+
"after": "0",
180+
"bucket": "b0[]",
181+
"data": [
182+
{
183+
"checksum": 920318466n,
184+
"data": "{"id":"t1","description":"Test 1"}",
185+
"object_id": "t1",
186+
"object_type": "test",
187+
"op": "PUT",
188+
"op_id": "1",
189+
"subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1",
190+
},
191+
],
192+
"has_more": false,
193+
"next_after": "1",
194+
},
195+
},
196+
{
197+
"checkpoint_complete": {
198+
"last_op_id": "2",
199+
},
200+
},
201+
]
202+
`;
203+
127204
exports[`sync - postgres > sync global data 1`] = `
128205
[
129206
{
@@ -133,6 +210,7 @@ exports[`sync - postgres > sync global data 1`] = `
133210
"bucket": "mybucket[]",
134211
"checksum": -93886621,
135212
"count": 2,
213+
"priority": 3,
136214
},
137215
],
138216
"last_op_id": "2",
@@ -184,6 +262,7 @@ exports[`sync - postgres > sync legacy non-raw data 1`] = `
184262
"bucket": "mybucket[]",
185263
"checksum": -852817836,
186264
"count": 1,
265+
"priority": 3,
187266
},
188267
],
189268
"last_op_id": "1",
@@ -231,6 +310,7 @@ exports[`sync - postgres > sync updates to global data 1`] = `
231310
"bucket": "mybucket[]",
232311
"checksum": 0,
233312
"count": 0,
313+
"priority": 3,
234314
},
235315
],
236316
"last_op_id": "0",
@@ -256,6 +336,7 @@ exports[`sync - postgres > sync updates to global data 2`] = `
256336
"bucket": "mybucket[]",
257337
"checksum": 920318466,
258338
"count": 1,
339+
"priority": 3,
259340
},
260341
],
261342
"write_checkpoint": undefined,
@@ -299,6 +380,7 @@ exports[`sync - postgres > sync updates to global data 3`] = `
299380
"bucket": "mybucket[]",
300381
"checksum": -93886621,
301382
"count": 2,
383+
"priority": 3,
302384
},
303385
],
304386
"write_checkpoint": undefined,

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"start:service": "pnpm --filter @powersync/service-image watch",
1919
"clean": "pnpm run -r clean",
2020
"release": "pnpm build:production && pnpm changeset publish",
21-
"test": "pnpm run -r test"
21+
"test": "pnpm run -r test",
22+
"vitest": "vitest"
2223
},
2324
"devDependencies": {
2425
"@changesets/cli": "^2.27.8",

packages/service-core-tests/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
A small helper package which exposes common unit tests and test utility functions.
44

5-
This package is used in various modules for their unit tests.
5+
This package is used in various modules for their unit tests.

0 commit comments

Comments
 (0)