Skip to content

Commit 7e38172

Browse files
committed
setup y-websocket server with y-postgresql persistence layer
1 parent 99fe108 commit 7e38172

File tree

6 files changed

+258
-12
lines changed

6 files changed

+258
-12
lines changed

backend/collaboration/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"env-cmd": "^10.1.0",
2222
"express": "^4.21.1",
2323
"http-status-codes": "^2.3.0",
24+
"pg": "^8.13.0",
2425
"pino": "^9.4.0",
2526
"pino-http": "^10.3.0",
2627
"redis": "^4.7.0",

backend/collaboration/src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ import 'dotenv/config';
33
export const UI_HOST = process.env.PEERPREP_UI_HOST!;
44

55
export const EXPRESS_PORT = process.env.EXPRESS_PORT;
6+
7+
export const WEBSOCKET_PORT = process.env.WEBSOCKET_PORT;

backend/collaboration/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import { dbHealthCheck } from '@/server';
44
import server from '@/websocket';
55

66
const port = Number.parseInt(EXPRESS_PORT || '8001');
7+
const wsPort = Number.parseInt(WEBSOCKET_PORT || '8002');
78

89
const listenMessage = `App listening on port: ${port}`;
910
server.listen(port, () => {
1011
void dbHealthCheck();
1112
logger.info(listenMessage);
1213
});
14+
server.listen(wsPort, () => logger.info(`WebSocket listening on port: ${wsPort}`));
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import * as awarenessProtocol from 'y-protocols/awareness.js';
2+
import * as Y from 'yjs';
3+
4+
export interface IWSSharedDoc extends Y.Doc {
5+
name: string;
6+
conns: Map<object, Set<number>>;
7+
awareness: awarenessProtocol.Awareness;
8+
}
9+
10+
export interface IPersistence {
11+
bindState: (arg1: string, arg2: IWSSharedDoc) => void;
12+
writeState: (arg1: string, arg2: IWSSharedDoc) => Promise<any>;
13+
provider?: any;
14+
}
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import * as Y from 'yjs';
2+
import * as syncProtocol from 'y-protocols/sync';
3+
import * as awarenessProtocol from 'y-protocols/awareness';
4+
5+
import * as encoding from 'lib0/encoding';
6+
import * as decoding from 'lib0/decoding';
7+
import * as map from 'lib0/map';
8+
9+
import { IPersistence, IWSSharedDoc } from '@/y-postgresql/interfaces';
10+
11+
const wsReadyStateConnecting = 0;
12+
const wsReadyStateOpen = 1;
13+
const wsReadyStateClosing = 2; // eslint-disable-line
14+
const wsReadyStateClosed = 3; // eslint-disable-line
15+
16+
// disable gc when using snapshots!
17+
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0';
18+
19+
let persistence: IPersistence | null = null;
20+
21+
export const setPersistence = (persistence_: IPersistence) => {
22+
persistence = persistence_;
23+
};
24+
25+
export const getPersistence = () => persistence;
26+
27+
// exporting docs so that others can use it
28+
export const docs = new Map<string, IWSSharedDoc>();
29+
30+
const messageSync = 0;
31+
const messageAwareness = 1;
32+
// const messageAuth = 2
33+
34+
const updateHandler = (update: Uint8Array, origin: any, doc: Y.Doc) => {
35+
const sharedDoc = doc as IWSSharedDoc;
36+
37+
const encoder = encoding.createEncoder();
38+
encoding.writeVarUint(encoder, messageSync);
39+
syncProtocol.writeUpdate(encoder, update);
40+
const message = encoding.toUint8Array(encoder);
41+
sharedDoc.conns.forEach((_, conn) => send(sharedDoc, conn, message));
42+
};
43+
44+
class WSSharedDoc extends Y.Doc implements IWSSharedDoc {
45+
name: string;
46+
conns: Map<object, Set<number>>;
47+
awareness: awarenessProtocol.Awareness;
48+
49+
constructor(name: string) {
50+
super({ gc: gcEnabled });
51+
this.name = name;
52+
this.conns = new Map();
53+
this.awareness = new awarenessProtocol.Awareness(this);
54+
this.awareness.setLocalState(null);
55+
56+
const awarenessChangeHandler = (
57+
{
58+
added,
59+
updated,
60+
removed,
61+
}: {
62+
added: Array<number>;
63+
updated: Array<number>;
64+
removed: Array<number>;
65+
},
66+
conn: object | null
67+
) => {
68+
const changedClients = added.concat(updated, removed);
69+
if (conn !== null) {
70+
const connControlledIDs = /** @type {Set<number>} */ this.conns.get(conn);
71+
if (connControlledIDs !== undefined) {
72+
added.forEach((clientID) => {
73+
connControlledIDs.add(clientID);
74+
});
75+
removed.forEach((clientID) => {
76+
connControlledIDs.delete(clientID);
77+
});
78+
}
79+
}
80+
// broadcast awareness update
81+
const encoder = encoding.createEncoder();
82+
encoding.writeVarUint(encoder, messageAwareness);
83+
encoding.writeVarUint8Array(
84+
encoder,
85+
awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients)
86+
);
87+
const buff = encoding.toUint8Array(encoder);
88+
this.conns.forEach((_, c) => {
89+
send(this, c, buff);
90+
});
91+
};
92+
this.awareness.on('update', awarenessChangeHandler);
93+
this.on('update', updateHandler);
94+
}
95+
}
96+
97+
/**
98+
* Gets a Y.Doc by name, whether in memory or on disk
99+
*
100+
* @param {string} docname - the name of the Y.Doc to find or create
101+
* @param {boolean} gc - whether to allow gc on the doc (applies only when created)
102+
* @return {WSSharedDoc}
103+
*/
104+
export const getYDoc = (docname: string, gc = true) =>
105+
map.setIfUndefined(docs, docname, () => {
106+
const doc = new WSSharedDoc(docname);
107+
doc.gc = gc;
108+
if (persistence !== null) {
109+
persistence.bindState(docname, doc);
110+
}
111+
docs.set(docname, doc);
112+
return doc;
113+
});
114+
115+
const messageListener = (conn: any, doc: IWSSharedDoc, message: Uint8Array) => {
116+
try {
117+
const encoder = encoding.createEncoder();
118+
const decoder = decoding.createDecoder(message);
119+
const messageType = decoding.readVarUint(decoder);
120+
switch (messageType) {
121+
case messageSync:
122+
encoding.writeVarUint(encoder, messageSync);
123+
syncProtocol.readSyncMessage(decoder, encoder, doc, conn);
124+
125+
// If the `encoder` only contains the type of reply message and no
126+
// message, there is no need to send the message. When `encoder` only
127+
// contains the type of reply, its length is 1.
128+
if (encoding.length(encoder) > 1) {
129+
send(doc, conn, encoding.toUint8Array(encoder));
130+
}
131+
break;
132+
case messageAwareness: {
133+
awarenessProtocol.applyAwarenessUpdate(
134+
doc.awareness,
135+
decoding.readVarUint8Array(decoder),
136+
conn
137+
);
138+
break;
139+
}
140+
}
141+
} catch (err) {
142+
console.error(err);
143+
}
144+
};
145+
146+
const closeConn = (doc: IWSSharedDoc, conn: any) => {
147+
if (doc.conns.has(conn)) {
148+
const controlledIds = doc.conns.get(conn);
149+
doc.conns.delete(conn);
150+
if (controlledIds) {
151+
awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null);
152+
}
153+
if (doc.conns.size === 0 && persistence !== null) {
154+
// if persisted, we store state and destroy ydocument
155+
persistence.writeState(doc.name, doc).then(() => {
156+
doc.destroy();
157+
});
158+
docs.delete(doc.name);
159+
}
160+
}
161+
conn.close();
162+
};
163+
164+
const send = (doc: IWSSharedDoc, conn: any, m: Uint8Array) => {
165+
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
166+
closeConn(doc, conn);
167+
}
168+
try {
169+
conn.send(m, (err: any) => {
170+
if (err != null) {
171+
closeConn(doc, conn);
172+
}
173+
});
174+
} catch (e) {
175+
closeConn(doc, conn);
176+
}
177+
};
178+
179+
const pingTimeout = 30000;
180+
181+
export const setupWSConnection = (
182+
conn: any,
183+
req: any,
184+
{ docName = req.url.slice(1).split('?')[0], gc = true } = {}
185+
) => {
186+
conn.binaryType = 'arraybuffer';
187+
// get doc, initialize if it does not exist yet
188+
const doc = getYDoc(docName, gc);
189+
doc.conns.set(conn, new Set());
190+
// listen and reply to events
191+
conn.on('message', (message: ArrayBuffer) => messageListener(conn, doc, new Uint8Array(message)));
192+
193+
// Check if connection is still alive
194+
let pongReceived = true;
195+
const pingInterval = setInterval(() => {
196+
if (!pongReceived) {
197+
if (doc.conns.has(conn)) {
198+
closeConn(doc, conn);
199+
}
200+
clearInterval(pingInterval);
201+
} else if (doc.conns.has(conn)) {
202+
pongReceived = false;
203+
try {
204+
conn.ping();
205+
} catch (e) {
206+
closeConn(doc, conn);
207+
clearInterval(pingInterval);
208+
}
209+
}
210+
}, pingTimeout);
211+
conn.on('close', () => {
212+
closeConn(doc, conn);
213+
clearInterval(pingInterval);
214+
});
215+
conn.on('pong', () => {
216+
pongReceived = true;
217+
});
218+
// put the following in a variables in a block so the interval handlers don't keep in in
219+
// scope
220+
{
221+
// send sync step 1
222+
const encoder = encoding.createEncoder();
223+
encoding.writeVarUint(encoder, messageSync);
224+
syncProtocol.writeSyncStep1(encoder, doc);
225+
send(doc, conn, encoding.toUint8Array(encoder));
226+
const awarenessStates = doc.awareness.getStates();
227+
if (awarenessStates.size > 0) {
228+
const encoder = encoding.createEncoder();
229+
encoding.writeVarUint(encoder, messageAwareness);
230+
encoding.writeVarUint8Array(
231+
encoder,
232+
awarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys()))
233+
);
234+
send(doc, conn, encoding.toUint8Array(encoder));
235+
}
236+
}
237+
};

docker-compose.local.yaml

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ services:
3030
- "${QUESTION_EXPRESS_DB_PORT}:5432"
3131
restart: unless-stopped
3232

33-
<<<<<<< HEAD
3433
collab-db:
3534
hostname: "collab-db"
3635
image: postgres:16.4
@@ -44,8 +43,6 @@ services:
4443
- "${COLLAB_EXPRESS_DB_PORT}:5432"
4544
restart: unless-stopped
4645

47-
=======
48-
>>>>>>> ba41a4a2223602ec532fc1d08a3b632e0ca698cd
4946
match-db:
5047
hostname: "match-db"
5148
image: redis/redis-stack
@@ -58,29 +55,22 @@ services:
5855
- "${MATCHING_DB_HOST_MGMT_PORT}:8001"
5956
- "${MATCHING_DB_HOST_PORT}:6379"
6057
restart: unless-stopped
61-
<<<<<<< HEAD
62-
=======
63-
58+
6459
# match-db-ui:
6560
# hostname: "match-db-ui"
6661
# image: redis/redisinsight
6762
# container_name: "match-db-ui"
6863
# ports:
6964
# - "${MATCHING_DB_HOST_MGMT_PORT}:5540"
7065
# restart: unless-stopped
71-
>>>>>>> ba41a4a2223602ec532fc1d08a3b632e0ca698cd
66+
7267

7368
volumes:
7469
user-db-docker:
7570
external: true
7671
question-db-docker:
7772
external: true
78-
<<<<<<< HEAD
7973
collab-db-docker:
8074
external: true
8175
match-db-docker:
8276
external: true
83-
=======
84-
match-db-docker:
85-
external: true
86-
>>>>>>> ba41a4a2223602ec532fc1d08a3b632e0ca698cd

0 commit comments

Comments
 (0)