Skip to content

Commit 756b7bd

Browse files
committed
PEER-214: Clean up utils code
Signed-off-by: SeeuSim <[email protected]>
1 parent e751ef1 commit 756b7bd

File tree

8 files changed

+125
-97
lines changed

8 files changed

+125
-97
lines changed

backend/collaboration/src/config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,6 @@ export const dbConfig = {
1111
user: process.env.POSTGRES_USER,
1212
password: process.env.POSTGRES_PASSWORD,
1313
};
14+
15+
// disable gc when using snapshots!
16+
export const GC_ENABLED = process.env.GC !== 'false' && process.env.GC !== '0';
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
export const wsReadyStateConnecting = 0;
2+
export const wsReadyStateOpen = 1;
3+
export const wsReadyStateClosing = 2; // eslint-disable-line
4+
export const wsReadyStateClosed = 3; // eslint-disable-line
5+
6+
export const messageSync = 0;
7+
export const messageAwareness = 1;
8+
// const messageAuth = 2
9+
10+
export const pingTimeout = 30000;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './utils';
2+
export * from './persistence';

backend/collaboration/src/y-postgresql-util/persistence.ts renamed to backend/collaboration/src/lib/y-postgres/persistence.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import * as Y from 'yjs';
2-
import { setPersistence } from '@/y-postgresql-util/utils';
3-
import { IWSSharedDoc } from '@/types/interfaces';
42
import { PostgresqlPersistence } from 'y-postgresql';
3+
4+
import type { IWSSharedDoc } from '@/types/interfaces';
55
import { dbConfig } from '@/config';
66

7+
import { setPersistence } from './utils';
8+
79
export const setUpPersistence = async () => {
810
const pgdb = await PostgresqlPersistence.build(dbConfig);
911
setPersistence({

backend/collaboration/src/y-postgresql-util/utils.ts renamed to backend/collaboration/src/lib/y-postgres/utils.ts

Lines changed: 13 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,20 @@
1-
/*
2-
Implementation from yjs-websocket/bin/utils.js
3-
Copied here for safe referencing
4-
*/
5-
import * as Y from 'yjs';
6-
import * as syncProtocol from 'y-protocols/sync';
71
import * as awarenessProtocol from 'y-protocols/awareness';
2+
import * as syncProtocol from 'y-protocols/sync';
83

9-
import * as encoding from 'lib0/encoding';
104
import * as decoding from 'lib0/decoding';
5+
import * as encoding from 'lib0/encoding';
116
import * as map from 'lib0/map';
127

13-
import { IPersistence, IWSSharedDoc } from '@/types/interfaces';
14-
15-
const wsReadyStateConnecting = 0;
16-
const wsReadyStateOpen = 1;
17-
const wsReadyStateClosing = 2; // eslint-disable-line
18-
const wsReadyStateClosed = 3; // eslint-disable-line
8+
import type { IPersistence, IWSSharedDoc } from '@/types/interfaces';
199

20-
// disable gc when using snapshots!
21-
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0';
10+
import {
11+
messageAwareness,
12+
messageSync,
13+
pingTimeout,
14+
wsReadyStateConnecting,
15+
wsReadyStateOpen,
16+
} from './constants';
17+
import { WSSharedDoc } from './ws-shared-doc';
2218

2319
let persistence: IPersistence | null = null;
2420

@@ -31,73 +27,6 @@ export const getPersistence = () => persistence;
3127
// exporting docs so that others can use it
3228
export const docs = new Map<string, IWSSharedDoc>();
3329

34-
const messageSync = 0;
35-
const messageAwareness = 1;
36-
// const messageAuth = 2
37-
38-
const updateHandler = (update: Uint8Array, origin: any, doc: Y.Doc) => {
39-
const sharedDoc = doc as IWSSharedDoc;
40-
41-
const encoder = encoding.createEncoder();
42-
encoding.writeVarUint(encoder, messageSync);
43-
syncProtocol.writeUpdate(encoder, update);
44-
const message = encoding.toUint8Array(encoder);
45-
sharedDoc.conns.forEach((_, conn) => send(sharedDoc, conn, message));
46-
};
47-
48-
class WSSharedDoc extends Y.Doc implements IWSSharedDoc {
49-
name: string;
50-
conns: Map<object, Set<number>>;
51-
awareness: awarenessProtocol.Awareness;
52-
53-
constructor(name: string) {
54-
super({ gc: gcEnabled });
55-
this.name = name;
56-
this.conns = new Map();
57-
this.awareness = new awarenessProtocol.Awareness(this);
58-
this.awareness.setLocalState(null);
59-
60-
const awarenessChangeHandler = (
61-
{
62-
added,
63-
updated,
64-
removed,
65-
}: {
66-
added: Array<number>;
67-
updated: Array<number>;
68-
removed: Array<number>;
69-
},
70-
conn: object | null
71-
) => {
72-
const changedClients = added.concat(updated, removed);
73-
if (conn !== null) {
74-
const connControlledIDs = /** @type {Set<number>} */ this.conns.get(conn);
75-
if (connControlledIDs !== undefined) {
76-
added.forEach((clientID) => {
77-
connControlledIDs.add(clientID);
78-
});
79-
removed.forEach((clientID) => {
80-
connControlledIDs.delete(clientID);
81-
});
82-
}
83-
}
84-
// broadcast awareness update
85-
const encoder = encoding.createEncoder();
86-
encoding.writeVarUint(encoder, messageAwareness);
87-
encoding.writeVarUint8Array(
88-
encoder,
89-
awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients)
90-
);
91-
const buff = encoding.toUint8Array(encoder);
92-
this.conns.forEach((_, c) => {
93-
send(this, c, buff);
94-
});
95-
};
96-
this.awareness.on('update', awarenessChangeHandler);
97-
this.on('update', updateHandler);
98-
}
99-
}
100-
10130
/**
10231
* Gets a Y.Doc by name, whether in memory or on disk
10332
*
@@ -165,13 +94,13 @@ const closeConn = (doc: IWSSharedDoc, conn: any) => {
16594
conn.close();
16695
};
16796

168-
const send = (doc: IWSSharedDoc, conn: any, m: Uint8Array) => {
97+
export const send = (doc: IWSSharedDoc, conn: any, m: Uint8Array) => {
16998
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
17099
closeConn(doc, conn);
171100
}
172101
try {
173102
conn.send(m, (err: any) => {
174-
if (err != null) {
103+
if (err !== null) {
175104
closeConn(doc, conn);
176105
}
177106
});
@@ -180,8 +109,6 @@ const send = (doc: IWSSharedDoc, conn: any, m: Uint8Array) => {
180109
}
181110
};
182111

183-
const pingTimeout = 30000;
184-
185112
export const setupWSConnection = (
186113
conn: any,
187114
req: any,
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import * as Y from 'yjs';
2+
import * as syncProtocol from 'y-protocols/sync';
3+
import * as awarenessProtocol from 'y-protocols/awareness';
4+
5+
import * as encoding from 'lib0/encoding';
6+
7+
import type { IWSSharedDoc } from '@/types/interfaces';
8+
import { GC_ENABLED } from '@/config';
9+
import { messageAwareness, messageSync } from './constants';
10+
import { send } from './utils';
11+
12+
const updateHandler = (update: Uint8Array, _origin: any, doc: Y.Doc) => {
13+
const sharedDoc = doc as IWSSharedDoc;
14+
15+
const encoder = encoding.createEncoder();
16+
encoding.writeVarUint(encoder, messageSync);
17+
syncProtocol.writeUpdate(encoder, update);
18+
const message = encoding.toUint8Array(encoder);
19+
sharedDoc.conns.forEach((_, conn) => send(sharedDoc, conn, message));
20+
};
21+
22+
export class WSSharedDoc extends Y.Doc implements IWSSharedDoc {
23+
name: string;
24+
conns: Map<object, Set<number>>;
25+
awareness: awarenessProtocol.Awareness;
26+
27+
constructor(name: string) {
28+
super({ gc: GC_ENABLED });
29+
this.name = name;
30+
this.conns = new Map();
31+
this.awareness = new awarenessProtocol.Awareness(this);
32+
this.awareness.setLocalState(null);
33+
34+
const awarenessChangeHandler = (
35+
{
36+
added,
37+
updated,
38+
removed,
39+
}: {
40+
added: Array<number>;
41+
updated: Array<number>;
42+
removed: Array<number>;
43+
},
44+
conn: object | null
45+
) => {
46+
const changedClients = added.concat(updated, removed);
47+
if (conn !== null) {
48+
const connControlledIDs = /** @type {Set<number>} */ this.conns.get(conn);
49+
if (connControlledIDs !== undefined) {
50+
added.forEach((clientID) => {
51+
connControlledIDs.add(clientID);
52+
});
53+
removed.forEach((clientID) => {
54+
connControlledIDs.delete(clientID);
55+
});
56+
}
57+
}
58+
// broadcast awareness update
59+
const encoder = encoding.createEncoder();
60+
encoding.writeVarUint(encoder, messageAwareness);
61+
encoding.writeVarUint8Array(
62+
encoder,
63+
awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients)
64+
);
65+
const buff = encoding.toUint8Array(encoder);
66+
this.conns.forEach((_, c) => {
67+
send(this, c, buff);
68+
});
69+
};
70+
this.awareness.on('update', awarenessChangeHandler);
71+
this.on('update', updateHandler);
72+
}
73+
}

backend/collaboration/src/server.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { exit } from 'process';
22

33
import cors from 'cors';
4+
import http from 'http';
45
import express, { json } from 'express';
56
import { StatusCodes } from 'http-status-codes';
67
import pino from 'pino-http';
@@ -9,13 +10,9 @@ import { UI_HOST } from '@/config';
910
import { config, db } from '@/lib/db';
1011
import { logger } from '@/lib/utils';
1112
import roomRoutes from '@/routes/room';
13+
import { setUpWSServer } from './ws';
1214

13-
import { WebSocketServer } from 'ws';
14-
import { createServer } from 'http';
15-
import { setupWSConnection } from './y-postgresql-util/utils';
16-
import { setUpPersistence } from './y-postgresql-util/persistence';
1715
const app = express();
18-
const server = createServer(app);
1916

2017
app.use(pino());
2118
app.use(json());
@@ -31,10 +28,6 @@ app.use('/room', roomRoutes);
3128
// Health Check for Docker
3229
app.get('/health', (_req, res) => res.status(StatusCodes.OK).send('OK'));
3330

34-
// y-websocket server
35-
const wss = new WebSocketServer({ server });
36-
wss.on('connection', setupWSConnection);
37-
setUpPersistence();
3831
export const dbHealthCheck = async () => {
3932
try {
4033
await db`SELECT 1`;
@@ -53,4 +46,8 @@ app.get('/test-db', async (_req, res) => {
5346
res.json({ message: 'OK ' });
5447
});
5548

49+
const server = http.createServer(app);
50+
51+
export const wss = setUpWSServer(server);
52+
5653
export default server;

backend/collaboration/src/ws.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import http from 'http';
2+
import { WebSocketServer } from 'ws';
3+
4+
import { setUpPersistence, setupWSConnection } from '@/lib/y-postgres';
5+
6+
export const setUpWSServer = (server: ReturnType<(typeof http)['createServer']>) => {
7+
const wss = new WebSocketServer({ server });
8+
9+
wss.on('connection', setupWSConnection);
10+
11+
setUpPersistence();
12+
13+
return wss;
14+
};

0 commit comments

Comments
 (0)