Skip to content

Commit d1dd12b

Browse files
yacinebsamwillis
andauthored
fix: pg-sync (#435)
* fix: pg-sync * fix(lint): exception for lint rule in hook * fix(doc): update pglite-sync example * fix(lint): * fix(doc): docs and docker-compose example * fix(doc): * fix(doc): * Update docs/docs/sync.md Co-authored-by: Sam Willis <[email protected]> * Update docs/docs/sync.md Co-authored-by: Sam Willis <[email protected]> * Update packages/pglite-sync/example/index.html Co-authored-by: Sam Willis <[email protected]> * fix: electric extention parameters in tests * Update docs/docs/sync.md Co-authored-by: Sam Willis <[email protected]> * fix: duplicate in doc * Changeset * Tweak example readme --------- Co-authored-by: Sam Willis <[email protected]>
1 parent 328ed3a commit d1dd12b

File tree

11 files changed

+77
-78
lines changed

11 files changed

+77
-78
lines changed

.changeset/poor-zebras-cheer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/pglite-sync': patch
3+
---
4+
5+
Bump the supported version of the ElectricSQL sync server to the latest version

docs/docs/sync.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ You can then use the `syncShapeToTable` method to sync a table from Electric:
3838

3939
```ts
4040
const shape = await pg.electric.syncShapeToTable({
41-
shape: { url: 'http://localhost:3000/v1/shape/todo' },
41+
shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' },
4242
table: 'todo',
4343
primaryKey: ['id'],
4444
})
@@ -97,12 +97,9 @@ The returned `shape` object from the `syncShapeToTable` call has the following m
9797
- `shapeId: string`<br>
9898
The server side `shapeId`
9999

100-
- `subscribeOnceToUpToDate(cb: () => void, error: (err: FetchError | Error) => void)`<br>
100+
- `subscribe(cb: () => void, error: (err: FetchError | Error) => void)`<br>
101101
A callback to indicate that the shape caught up to the main Postgres.
102102

103-
- `unsubscribeAllUpToDateSubscribers()`<br>
104-
Unsubscribe all `subscribeOnceToUpToDate` listeners.
105-
106103
- `subscribeMustRefresh(cb: () => void)`<br>
107104
A callback that is called when the stream emits a `must-refresh` message.
108105

@@ -115,7 +112,10 @@ The returned `shape` object from the `syncShapeToTable` call has the following m
115112
### `ShapeStreamOptions`
116113

117114
- `url: string`<br>
118-
The full URL to where the Shape is hosted. This can either be the Electric server directly, or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape/foo`
115+
The full URL to where the Shape is hosted. This can either be the Electric server directly, or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape`
116+
117+
- `table: string`<br>
118+
The name of the table in the remote database to sync from
119119

120120
- `where?: string`<br>
121121
Where clauses for the shape.

packages/pglite-react/src/hooks.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { useEffect, useState, useRef } from 'react'
21
import type { LiveQuery, LiveQueryResults } from '@electric-sql/pglite/live'
3-
import { usePGlite } from './provider'
42
import { query as buildQuery } from '@electric-sql/pglite/template'
3+
import { useEffect, useRef, useState } from 'react'
4+
import { usePGlite } from './provider'
55

66
function paramsEqual(
77
a1: unknown[] | undefined | null,
@@ -42,6 +42,7 @@ function useLiveQueryImpl<T = { [key: string]: unknown }>(
4242
currentParams = params
4343
}
4444

45+
/* eslint-disable @eslint-react/hooks-extra/no-direct-set-state-in-use-effect */
4546
useEffect(() => {
4647
let cancelled = false
4748
const cb = (results: LiveQueryResults<T>) => {
@@ -80,6 +81,7 @@ function useLiveQueryImpl<T = { [key: string]: unknown }>(
8081
throw new Error('Should never happen')
8182
}
8283
}, [db, key, query, currentParams, liveQuery])
84+
/* eslint-enable @eslint-react/hooks-extra/no-direct-set-state-in-use-effect */
8385

8486
if (liveQueryChanged && liveQuery) {
8587
return liveQuery.initialResults

packages/pglite-sync/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ You can then use the syncShapeToTable method to sync a table from Electric:
3232

3333
```ts
3434
const shape = await pg.electric.syncShapeToTable({
35-
url: 'http://localhost:3000/v1/shape/todo',
35+
shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' },
3636
table: 'todo',
3737
primaryKey: ['id'],
3838
})

packages/pglite-sync/example/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Then connect with `psql` and insert, update, or delete rows in
1818
the `test` table.
1919

2020
```sh
21-
psql -h localhost -p 5432 -U postgres -d postgres
21+
psql postgresql://postgres:password@localhost:54321/electric
2222
```
2323

2424
```sql

packages/pglite-sync/example/docker-compose.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: "electric_quickstart"
33

44
services:
55
postgres:
6-
image: postgres:16-alpine
6+
image: postgres:16
77
environment:
88
POSTGRES_DB: electric
99
POSTGRES_USER: postgres
@@ -22,12 +22,10 @@ services:
2222
- wal_level=logical
2323

2424
electric:
25-
image: electricsql/electric:0.2.8
25+
image: electricsql/electric
2626
environment:
27-
DATABASE_URL: postgresql://postgres:password@postgres:5432/electric
27+
DATABASE_URL: postgresql://postgres:password@postgres:5432/electric?sslmode=disable
2828
ports:
2929
- "3000:3000"
30-
build:
31-
context: ../packages/sync-service/
3230
depends_on:
33-
- postgres
31+
- postgres

packages/pglite-sync/example/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ <h1>PGlite Electric Sync Example</h1>
5656
window.pg = pg;
5757

5858
await pg.electric.syncShapeToTable({
59-
url: "http://localhost:3000/v1/shape/test",
59+
shape: { url: "http://localhost:3000/v1/shape", table: "test" },
6060
table: "test",
6161
primaryKey: ["id"],
6262
});

packages/pglite-sync/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@electric-sql/pglite-sync",
3-
"version": "0.2.14",
3+
"version": "0.2.15",
44
"description": "ElectricSQL Sync for PGlite",
55
"type": "module",
66
"private": false,
@@ -45,7 +45,7 @@
4545
"dist"
4646
],
4747
"dependencies": {
48-
"@electric-sql/client": "^0.6.2"
48+
"@electric-sql/client": "~0.8.0"
4949
},
5050
"devDependencies": {
5151
"@electric-sql/pglite": "workspace:*",

packages/pglite-sync/src/index.ts

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import type {
2-
Extension,
3-
PGliteInterface,
4-
Transaction,
5-
} from '@electric-sql/pglite'
1+
import type { Offset, ShapeStreamOptions } from '@electric-sql/client'
62
import {
7-
ShapeStream,
83
ChangeMessage,
4+
ShapeStream,
95
isChangeMessage,
106
isControlMessage,
117
} from '@electric-sql/client'
12-
import type { Offset, ShapeStreamOptions } from '@electric-sql/client'
8+
import type {
9+
Extension,
10+
PGliteInterface,
11+
Transaction,
12+
} from '@electric-sql/pglite'
1313

1414
export type MapColumnsMap = Record<string, string>
1515
export type MapColumnsFn = (message: ChangeMessage<any>) => Record<string, any>
@@ -205,13 +205,13 @@ async function createPlugin(
205205
if (
206206
options.shapeKey &&
207207
messageAggregator.length > 0 &&
208-
stream.shapeId !== undefined
208+
stream.shapeHandle !== undefined
209209
) {
210210
await updateShapeSubscriptionState({
211211
pg: tx,
212212
metadataSchema,
213213
shapeKey: options.shapeKey,
214-
shapeId: stream.shapeId,
214+
shapeId: stream.shapeHandle,
215215
lastOffset:
216216
messageAggregator[messageAggregator.length - 1].offset,
217217
})
@@ -238,16 +238,14 @@ async function createPlugin(
238238
return stream.isUpToDate
239239
},
240240
get shapeId() {
241-
return stream.shapeId
241+
return stream.shapeHandle
242242
},
243-
subscribeOnceToUpToDate: (
244-
cb: () => void,
245-
error: (err: Error) => void,
246-
) => {
247-
return stream.subscribeOnceToUpToDate(cb, error)
248-
},
249-
unsubscribeAllUpToDateSubscribers: () => {
250-
stream.unsubscribeAllUpToDateSubscribers()
243+
subscribe: (cb: () => void, error: (err: Error) => void) => {
244+
return stream.subscribe(() => {
245+
if (stream.isUpToDate) {
246+
cb()
247+
}
248+
}, error)
251249
},
252250
}
253251
},
@@ -446,12 +444,12 @@ async function applyMessagesToTableWithCopy({
446444
}
447445

448446
interface GetShapeSubscriptionStateOptions {
449-
pg: PGliteInterface | Transaction
450-
metadataSchema: string
451-
shapeKey: ShapeKey
447+
readonly pg: PGliteInterface | Transaction
448+
readonly metadataSchema: string
449+
readonly shapeKey: ShapeKey
452450
}
453451

454-
type ShapeSubscriptionState = Pick<ShapeStreamOptions, 'shapeId' | 'offset'>
452+
type ShapeSubscriptionState = Pick<ShapeStreamOptions, 'handle' | 'offset'>
455453

456454
async function getShapeSubscriptionState({
457455
pg,
@@ -469,9 +467,9 @@ async function getShapeSubscriptionState({
469467

470468
if (result.rows.length === 0) return null
471469

472-
const { shape_id: shapeId, last_offset: offset } = result.rows[0]
470+
const { shape_id: handle, last_offset: offset } = result.rows[0]
473471
return {
474-
shapeId,
472+
handle,
475473
offset: offset as Offset,
476474
}
477475
}

0 commit comments

Comments
 (0)