Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/odd-snails-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Stream changes in priority order.
5 changes: 5 additions & 0 deletions .changeset/tall-peas-cough.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-sync-rules': minor
---

Replace bucket ids from queries with a description also containing a priority.
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ jobs:
- name: Test
run: pnpm test --filter='./modules/module-postgres'

- name: Test
run: pnpm test --filter='./modules/module-postgres-storage'

run-mysql-tests:
name: MySQL Test
runs-on: ubuntu-latest
Expand Down Expand Up @@ -287,3 +290,6 @@ jobs:

- name: Test
run: pnpm test --filter='./modules/module-mongodb'

- name: Test Storage
run: pnpm test --filter='./modules/module-mongodb-storage'
12 changes: 9 additions & 3 deletions docs/compacting-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ The second part is compacting to CLEAR operations. For each bucket, we keep trac
For an initial workaround, defragmenting can be performed outside powersync by touching all rows in a bucket:

```sql
update mytable set id = id
-- Repeat the above for other tables in the same bucket if relevant
UPDATE mytable
SET
id = id
-- Repeat the above for other tables in the same bucket if relevant
```

After this, the normal MOVE + CLEAR compacting will compact the bucket to only have a single operation per active row.
Expand All @@ -86,7 +88,11 @@ This would cause existing clients to re-sync every row, while reducing the numbe
If an updated_at column or similar is present, we can use this to defragment more incrementally:

```sql
update mytable set id = id where updated_at < now() - interval '1 week'
UPDATE mytable
SET
id = id
WHERE
updated_at < now() - interval '1 week'
```

This version avoids unnecessary defragmentation of rows modified recently.
Expand Down
2 changes: 1 addition & 1 deletion docs/postgres-initial-replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This is our first approach.
We start by creating a logical replication slot, exporting a snapshot:

```sql
CREATE_REPLICATION_SLOT <slot> LOGICAL pgoutput EXPORT_SNAPSHOT
CREATE_REPLICATION_SLOT < slot > LOGICAL pgoutput EXPORT_SNAPSHOT
```

While that connection stays open, we create another connection with a transaction, and read each table:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 1`] = `
"bucket": "mybucket[]",
"checksum": -93886621,
"count": 2,
"priority": 3,
},
],
"last_op_id": "2",
Expand Down Expand Up @@ -44,6 +45,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = `
"bucket": "mybucket[]",
"checksum": 499012468,
"count": 4,
"priority": 3,
},
],
"write_checkpoint": undefined,
Expand Down Expand Up @@ -102,6 +104,7 @@ exports[`sync - mongodb > expiring token 1`] = `
"bucket": "mybucket[]",
"checksum": 0,
"count": 0,
"priority": 3,
},
],
"last_op_id": "0",
Expand All @@ -124,6 +127,80 @@ exports[`sync - mongodb > expiring token 2`] = `
]
`;

exports[`sync - mongodb > sync buckets in order 1`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "b0[]",
"checksum": 920318466,
"count": 1,
"priority": 2,
},
{
"bucket": "b1[]",
"checksum": -1382098757,
"count": 1,
"priority": 1,
},
],
"last_op_id": "2",
"write_checkpoint": undefined,
},
},
{
"data": {
"after": "0",
"bucket": "b1[]",
"data": [
{
"checksum": 2912868539n,
"data": "{"id":"earlier","description":"Test 2"}",
"object_id": "earlier",
"object_type": "test",
"op": "PUT",
"op_id": "2",
"subkey": "0dfe86bd-d15b-5fd0-9c7b-a31693030ee0",
},
],
"has_more": false,
"next_after": "2",
},
},
{
"partial_checkpoint_complete": {
"last_op_id": "2",
"priority": 1,
},
},
{
"data": {
"after": "0",
"bucket": "b0[]",
"data": [
{
"checksum": 920318466n,
"data": "{"id":"t1","description":"Test 1"}",
"object_id": "t1",
"object_type": "test",
"op": "PUT",
"op_id": "1",
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
},
],
"has_more": false,
"next_after": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "2",
},
},
]
`;

exports[`sync - mongodb > sync global data 1`] = `
[
{
Expand All @@ -133,6 +210,7 @@ exports[`sync - mongodb > sync global data 1`] = `
"bucket": "mybucket[]",
"checksum": -93886621,
"count": 2,
"priority": 3,
},
],
"last_op_id": "2",
Expand Down Expand Up @@ -184,6 +262,7 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = `
"bucket": "mybucket[]",
"checksum": -852817836,
"count": 1,
"priority": 3,
},
],
"last_op_id": "1",
Expand Down Expand Up @@ -231,6 +310,7 @@ exports[`sync - mongodb > sync updates to global data 1`] = `
"bucket": "mybucket[]",
"checksum": 0,
"count": 0,
"priority": 3,
},
],
"last_op_id": "0",
Expand All @@ -256,6 +336,7 @@ exports[`sync - mongodb > sync updates to global data 2`] = `
"bucket": "mybucket[]",
"checksum": 920318466,
"count": 1,
"priority": 3,
},
],
"write_checkpoint": undefined,
Expand Down Expand Up @@ -299,6 +380,7 @@ exports[`sync - mongodb > sync updates to global data 3`] = `
"bucket": "mybucket[]",
"checksum": -93886621,
"count": 2,
"priority": 3,
},
],
"write_checkpoint": undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 1`] = `
"bucket": "mybucket[]",
"checksum": -93886621,
"count": 2,
"priority": 3,
},
],
"last_op_id": "2",
Expand Down Expand Up @@ -44,6 +45,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = `
"bucket": "mybucket[]",
"checksum": 499012468,
"count": 4,
"priority": 3,
},
],
"write_checkpoint": undefined,
Expand Down Expand Up @@ -102,6 +104,7 @@ exports[`sync - postgres > expiring token 1`] = `
"bucket": "mybucket[]",
"checksum": 0,
"count": 0,
"priority": 3,
},
],
"last_op_id": "0",
Expand All @@ -124,6 +127,80 @@ exports[`sync - postgres > expiring token 2`] = `
]
`;

exports[`sync - postgres > sync buckets in order 1`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "b0[]",
"checksum": 920318466,
"count": 1,
"priority": 2,
},
{
"bucket": "b1[]",
"checksum": -1382098757,
"count": 1,
"priority": 1,
},
],
"last_op_id": "2",
"write_checkpoint": undefined,
},
},
{
"data": {
"after": "0",
"bucket": "b1[]",
"data": [
{
"checksum": 2912868539n,
"data": "{"id":"earlier","description":"Test 2"}",
"object_id": "earlier",
"object_type": "test",
"op": "PUT",
"op_id": "2",
"subkey": "243b0e26-87b2-578a-993c-5ac5b6f7fd64",
},
],
"has_more": false,
"next_after": "2",
},
},
{
"partial_checkpoint_complete": {
"last_op_id": "2",
"priority": 1,
},
},
{
"data": {
"after": "0",
"bucket": "b0[]",
"data": [
{
"checksum": 920318466n,
"data": "{"id":"t1","description":"Test 1"}",
"object_id": "t1",
"object_type": "test",
"op": "PUT",
"op_id": "1",
"subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1",
},
],
"has_more": false,
"next_after": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "2",
},
},
]
`;

exports[`sync - postgres > sync global data 1`] = `
[
{
Expand All @@ -133,6 +210,7 @@ exports[`sync - postgres > sync global data 1`] = `
"bucket": "mybucket[]",
"checksum": -93886621,
"count": 2,
"priority": 3,
},
],
"last_op_id": "2",
Expand Down Expand Up @@ -184,6 +262,7 @@ exports[`sync - postgres > sync legacy non-raw data 1`] = `
"bucket": "mybucket[]",
"checksum": -852817836,
"count": 1,
"priority": 3,
},
],
"last_op_id": "1",
Expand Down Expand Up @@ -231,6 +310,7 @@ exports[`sync - postgres > sync updates to global data 1`] = `
"bucket": "mybucket[]",
"checksum": 0,
"count": 0,
"priority": 3,
},
],
"last_op_id": "0",
Expand All @@ -256,6 +336,7 @@ exports[`sync - postgres > sync updates to global data 2`] = `
"bucket": "mybucket[]",
"checksum": 920318466,
"count": 1,
"priority": 3,
},
],
"write_checkpoint": undefined,
Expand Down Expand Up @@ -299,6 +380,7 @@ exports[`sync - postgres > sync updates to global data 3`] = `
"bucket": "mybucket[]",
"checksum": -93886621,
"count": 2,
"priority": 3,
},
],
"write_checkpoint": undefined,
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"start:service": "pnpm --filter @powersync/service-image watch",
"clean": "pnpm run -r clean",
"release": "pnpm build:production && pnpm changeset publish",
"test": "pnpm run -r test"
"test": "pnpm run -r test",
"vitest": "vitest"
},
"devDependencies": {
"@changesets/cli": "^2.27.8",
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

A small helper package which exposes common unit tests and test utility functions.

This package is used in various modules for their unit tests.
This package is used in various modules for their unit tests.
Loading