Skip to content

Commit 7afab68

Browse files
Update YJS demo to use differential queries
1 parent cd5a5c8 commit 7afab68

File tree

9 files changed

+96
-52
lines changed

9 files changed

+96
-52
lines changed

demos/yjs-react-supabase-text-collab/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# yjs-react-supabase-text-collab
22

3+
## 0.2.0
4+
5+
- Added a local development option with local Supabase and PowerSync services.
6+
- Updated Sync rules to use client parameters. Each client now only syncs `document` and `document_updates` for the document being edited.
7+
- Updated `PowerSyncYjsProvider` to use an incremental watched query for `document_updates`.
8+
- Added a `editor_id` column to the `document_updates` table. This tracks which editor created the update and avoids reapplying updates in the source editor.
9+
- The incremental watched query now applies updates from external editors.
10+
311
## 0.1.16
412

513
### Patch Changes

demos/yjs-react-supabase-text-collab/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "yjs-react-supabase-text-collab",
3-
"version": "0.1.16",
3+
"version": "0.2.0",
44
"private": true,
55
"scripts": {
66
"dev": "vite",

demos/yjs-react-supabase-text-collab/src/app/editor/page.tsx

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
1-
import { usePowerSync, useQuery, useStatus } from '@powersync/react';
2-
import { Box, Container, FormControlLabel, Switch, Typography } from '@mui/material';
3-
import { useEffect, useMemo, useState } from 'react';
1+
import { connector, useSupabase } from '@/components/providers/SystemProvider';
42
import MenuBar from '@/components/widgets/MenuBar';
53
import { PowerSyncYjsProvider } from '@/library/powersync/PowerSyncYjsProvider';
4+
import { Box, Container, FormControlLabel, Switch, Typography } from '@mui/material';
5+
import { usePowerSync, useQuery, useStatus } from '@powersync/react';
66
import Collaboration from '@tiptap/extension-collaboration';
77
import Highlight from '@tiptap/extension-highlight';
88
import TaskItem from '@tiptap/extension-task-item';
99
import TaskList from '@tiptap/extension-task-list';
1010
import { EditorContent, useEditor } from '@tiptap/react';
1111
import StarterKit from '@tiptap/starter-kit';
12+
import { useEffect, useMemo, useState } from 'react';
13+
import { useParams } from 'react-router-dom';
1214
import * as Y from 'yjs';
1315
import './tiptap-styles.scss';
14-
import { useParams } from 'react-router-dom';
15-
import { connector } from '@/components/providers/SystemProvider';
1616

1717
export default function EditorPage() {
1818
const powerSync = usePowerSync();
1919
const status = useStatus();
20+
const supabase = useSupabase();
2021
const { id: documentId } = useParams();
2122

2223
// cache the last edited document ID in local storage
@@ -33,6 +34,12 @@ export default function EditorPage() {
3334

3435
useEffect(() => {
3536
const provider = new PowerSyncYjsProvider(ydoc, powerSync, documentId!);
37+
// Only sync changes for this document
38+
powerSync.connect(supabase!, {
39+
params: {
40+
document_id: documentId!
41+
}
42+
});
3643
return () => {
3744
provider.destroy();
3845
};

demos/yjs-react-supabase-text-collab/src/components/providers/SystemProvider.tsx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { AppSchema } from '@/library/powersync/AppSchema';
22
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
3+
import { CircularProgress } from '@mui/material';
34
import { PowerSyncContext } from '@powersync/react';
45
import { createBaseLogger, LogLevel, PowerSyncDatabase } from '@powersync/web';
5-
import { CircularProgress } from '@mui/material';
66
import React, { Suspense } from 'react';
77

88
const SupabaseContext = React.createContext<SupabaseConnector | null>(null);
@@ -13,7 +13,6 @@ export const powerSync = new PowerSyncDatabase({
1313
schema: AppSchema
1414
});
1515
export const connector = new SupabaseConnector();
16-
powerSync.connect(connector);
1716

1817
const logger = createBaseLogger();
1918
logger.useDefaults();

demos/yjs-react-supabase-text-collab/src/library/powersync/AppSchema.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ const document_updates = new Table(
99
{
1010
document_id: column.text,
1111
created_at: column.text,
12-
update_b64: column.text
12+
update_b64: column.text,
13+
// Store an id of whom the update was created by.
14+
// This is only used to not reapply updates which were created by the local editor.
15+
editor_id: column.text
1316
},
1417
{ indexes: { by_document: ['document_id'] } }
1518
);
Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import * as Y from 'yjs';
22

33
import { b64ToUint8Array, Uint8ArrayTob64 } from '@/library/binary-utils';
4-
import { v4 as uuidv4 } from 'uuid';
5-
import { AbstractPowerSyncDatabase } from '@powersync/web';
4+
import { AbstractPowerSyncDatabase, GetAllQuery, IncrementalWatchMode } from '@powersync/web';
65
import { ObservableV2 } from 'lib0/observable';
6+
import { v4 as uuidv4 } from 'uuid';
7+
import { DocumentUpdates } from './AppSchema';
78

89
export interface PowerSyncYjsEvents {
910
/**
@@ -24,8 +25,9 @@ export interface PowerSyncYjsEvents {
2425
* @param documentId
2526
*/
2627
export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
27-
private seenDocUpdates = new Set<string>();
2828
private abortController = new AbortController();
29+
// This ID is updated on every new instance of the provider.
30+
private id = uuidv4();
2931

3032
constructor(
3133
public readonly doc: Y.Doc,
@@ -34,57 +36,72 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
3436
) {
3537
super();
3638

37-
const updates = db.watch('SELECT * FROM document_updates WHERE document_id = ?', [documentId], {
38-
signal: this.abortController.signal
39+
/**
40+
* Watch for changes to the `document_updates` table for this document.
41+
* This will be used to apply updates from other editors.
42+
* When we received an added item we apply the update to the Yjs document.
43+
*/
44+
const updateQuery = db.incrementalWatch({ mode: IncrementalWatchMode.DIFFERENTIAL }).build({
45+
watch: {
46+
query: new GetAllQuery<DocumentUpdates>({
47+
sql: /* sql */ `
48+
SELECT
49+
*
50+
FROM
51+
document_updates
52+
WHERE
53+
document_id = ?
54+
AND editor_id != ?
55+
`,
56+
parameters: [documentId, this.id]
57+
})
58+
}
3959
});
4060

61+
this.abortController.signal.addEventListener(
62+
'abort',
63+
() => {
64+
// Stop the watch query when the abort signal is triggered
65+
updateQuery.close();
66+
},
67+
{ once: true }
68+
);
69+
4170
this._storeUpdate = this._storeUpdate.bind(this);
4271
this.destroy = this.destroy.bind(this);
4372

4473
let synced = false;
4574

46-
const watchLoop = async () => {
47-
for await (const results of updates) {
48-
if (this.abortController.signal.aborted) {
49-
break;
75+
updateQuery.subscribe({
76+
onData: async (diff) => {
77+
for (const added of diff.added) {
78+
Y.applyUpdateV2(doc, b64ToUint8Array(added.update_b64));
5079
}
51-
52-
// New data detected in the database
53-
for (const update of results.rows!._array) {
54-
// Ignore any updates we've already seen
55-
if (!this.seenDocUpdates.has(update.id)) {
56-
this.seenDocUpdates.add(update.id);
57-
// apply the update from the database to the doc
58-
const origin = this;
59-
Y.applyUpdateV2(doc, b64ToUint8Array(update.update_b64), origin);
60-
}
61-
}
62-
6380
if (!synced) {
6481
synced = true;
6582
this.emit('synced', []);
6683
}
84+
},
85+
onError: (error) => {
86+
console.error('Error in PowerSyncYjsProvider update query:', error);
6787
}
68-
};
69-
watchLoop();
88+
});
7089

7190
doc.on('updateV2', this._storeUpdate);
7291
doc.on('destroy', this.destroy);
7392
}
7493

7594
private async _storeUpdate(update: Uint8Array, origin: any) {
76-
if (origin === this) {
77-
// update originated from the database / PowerSync - ignore
78-
return;
79-
}
8095
// update originated from elsewhere - save to the database
81-
const docUpdateId = uuidv4();
82-
this.seenDocUpdates.add(docUpdateId);
83-
await this.db.execute('INSERT INTO document_updates(id, document_id, update_b64) VALUES(?, ?, ?)', [
84-
docUpdateId,
85-
this.documentId,
86-
Uint8ArrayTob64(update)
87-
]);
96+
await this.db.execute(
97+
/* sql */ `
98+
INSERT INTO
99+
document_updates (id, document_id, update_b64, editor_id)
100+
VALUES
101+
(uuid (), ?, ?, ?)
102+
`,
103+
[this.documentId, Uint8ArrayTob64(update), this.id]
104+
);
88105
}
89106

90107
/**
@@ -102,6 +119,13 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
102119
* Also call `destroy()` to remove any event listeners and prevent future updates to the database.
103120
*/
104121
async deleteData() {
105-
await this.db.execute('DELETE FROM document_updates WHERE document_id = ?', [this.documentId]);
122+
await this.db.execute(
123+
/* sql */ `
124+
DELETE FROM document_updates
125+
WHERE
126+
document_id = ?
127+
`,
128+
[this.documentId]
129+
);
106130
}
107131
}

demos/yjs-react-supabase-text-collab/supabase/functions/merge-document-updates/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ Deno.serve(async (req) => {
5555
// insert the new merged update as new single update for the document
5656
const supabaseInsert = await supabase.from('document_updates').insert({
5757
document_id: document_id,
58-
update_data: Uint8ArrayToHex(docState)
58+
update_data: Uint8ArrayToHex(docState),
59+
editor_id: 'merged_update'
5960
});
6061
if (supabaseInsert.error) {
6162
throw new Error(supabaseInsert.error);

demos/yjs-react-supabase-text-collab/supabase/migrations/20250618064101_configure_powersync.sql

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ CREATE TABLE document_updates(
1010
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
1111
created_at timestamptz DEFAULT now(),
1212
document_id UUID,
13-
update_data BYTEA
13+
update_data BYTEA,
14+
editor_id UUID
1415
);
1516

1617

@@ -27,11 +28,12 @@ $$ LANGUAGE SQL;
2728
CREATE OR REPLACE FUNCTION insert_document_updates(batch TEXT)
2829
RETURNS VOID AS $$
2930
BEGIN
30-
INSERT INTO document_updates (id, document_id, update_data)
31+
INSERT INTO document_updates (id, document_id, update_data, editor_id)
3132
SELECT
3233
(elem->>'id')::UUID,
3334
(elem->>'document_id')::UUID,
34-
decode(elem->>'update_b64', 'base64')
35+
decode(elem->>'update_b64', 'base64'),
36+
(elem->>'editor_id')::UUID
3537
FROM json_array_elements(batch::json) AS elem
3638
ON CONFLICT (id) DO NOTHING;
3739
END;

demos/yjs-react-supabase-text-collab/sync-rules.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
# Sync-rule docs: https://docs.powersync.com/usage/sync-rules
44
bucket_definitions:
55
documents:
6-
data:
7-
- SELECT * FROM documents
8-
updates:
96
# Allow remote changes to be synchronized even while there are local changes
107
priority: 0
8+
parameters: SELECT (request.parameters() ->> 'document_id') as document_id
119
data:
12-
- SELECT id, document_id, base64(update_data) as update_b64 FROM document_updates
10+
- SELECT * FROM documents WHERE id = bucket.document_id
11+
- SELECT * FROM documents WHERE id = bucket.document_id
12+
- SELECT id, document_id, base64(update_data) as update_b64, editor_id FROM document_updates WHERE document_id = bucket.document_id

0 commit comments

Comments
 (0)