|
1 | 1 | import { Socket } from "socket.io";
|
2 | 2 | import { io } from "../server";
|
3 | 3 | import redisClient from "../config/redis";
|
| 4 | +import { Doc, applyUpdateV2, encodeStateAsUpdateV2 } from "yjs"; |
4 | 5 |
|
5 | 6 | enum CollabEvents {
|
6 | 7 | // Receive
|
7 | 8 | JOIN = "join",
|
8 |
| - CHANGE = "change", |
9 | 9 | LEAVE = "leave",
|
10 | 10 | DISCONNECT = "disconnect",
|
| 11 | + INIT_DOCUMENT = "init_document", |
| 12 | + UPDATE_REQUEST = "update_request", |
| 13 | + UPDATE_CURSOR_REQUEST = "update_cursor_request", |
| 14 | + RECONNECT_REQUEST = "reconnect_request", |
11 | 15 |
|
12 | 16 | // Send
|
13 |
| - ROOM_FULL = "room_full", |
14 |
| - CONNECTED = "connected", |
15 |
| - NEW_USER_CONNECTED = "new_user_connected", |
16 |
| - CODE_CHANGE = "code_change", |
17 |
| - PARTNER_LEFT = "partner_left", |
18 |
| - PARTNER_DISCONNECTED = "partner_disconnected", |
| 17 | + ROOM_READY = "room_ready", |
| 18 | + DOCUMENT_READY = "document_ready", |
| 19 | + UPDATE = "updateV2", |
| 20 | + UPDATE_CURSOR = "update_cursor", |
| 21 | + // PARTNER_LEFT = "partner_left", |
| 22 | + // PARTNER_DISCONNECTED = "partner_disconnected", |
19 | 23 | }
|
20 | 24 |
|
21 | 25 | const EXPIRY_TIME = 3600;
|
| 26 | +const CONNECTION_DELAY = 3000; // time window to allow for page re-renders / refresh |
| 27 | + |
| 28 | +const userConnections = new Map<string, NodeJS.Timeout | null>(); |
| 29 | +const collabSessions = new Map<string, Doc>(); |
| 30 | +const partnerReadiness = new Map<string, boolean>(); |
22 | 31 |
|
23 | 32 | export const handleWebsocketCollabEvents = (socket: Socket) => {
|
24 |
| - socket.on(CollabEvents.JOIN, async ({ roomId }) => { |
25 |
| - if (!roomId) { |
| 33 | + socket.on(CollabEvents.JOIN, async (uid: string, roomId: string) => { |
| 34 | + const connectionKey = `${uid}:${roomId}`; |
| 35 | + if (userConnections.has(connectionKey)) { |
| 36 | + clearTimeout(userConnections.get(connectionKey)!); |
26 | 37 | return;
|
27 | 38 | }
|
| 39 | + userConnections.set(connectionKey, null); |
28 | 40 |
|
29 | 41 | const room = io.sockets.adapter.rooms.get(roomId);
|
30 |
| - if (room && room.size >= 2) { |
31 |
| - socket.emit(CollabEvents.ROOM_FULL); |
| 42 | + if (room && room?.size >= 2) { |
| 43 | + socket.emit(CollabEvents.ROOM_READY, false); |
32 | 44 | return;
|
33 | 45 | }
|
34 | 46 |
|
35 | 47 | socket.join(roomId);
|
36 | 48 | socket.data.roomId = roomId;
|
37 | 49 |
|
38 |
| - // in case of disconnect, send the code to the user when he rejoins |
39 |
| - const code = await redisClient.get(`collaboration:${roomId}`); |
40 |
| - socket.emit(CollabEvents.CONNECTED, { code: code ? code : "" }); |
| 50 | + if ( |
| 51 | + io.sockets.adapter.rooms.get(roomId)?.size === 2 && |
| 52 | + !collabSessions.has(roomId) |
| 53 | + ) { |
| 54 | + createCollabSession(roomId); |
| 55 | + io.to(roomId).emit(CollabEvents.ROOM_READY, true); |
| 56 | + } |
| 57 | + }); |
| 58 | + |
| 59 | + socket.on(CollabEvents.INIT_DOCUMENT, (roomId: string, template: string) => { |
| 60 | + const doc = getDocument(roomId); |
| 61 | + const isPartnerReady = partnerReadiness.get(roomId); |
41 | 62 |
|
42 |
| - // inform the other user that a new user has joined |
43 |
| - socket.to(roomId).emit(CollabEvents.NEW_USER_CONNECTED); |
| 63 | + if (isPartnerReady && doc.getText().length === 0) { |
| 64 | + doc.transact(() => { |
| 65 | + doc.getText().insert(0, template); |
| 66 | + }); |
| 67 | + io.to(roomId).emit(CollabEvents.DOCUMENT_READY); |
| 68 | + } else { |
| 69 | + partnerReadiness.set(roomId, true); |
| 70 | + } |
44 | 71 | });
|
45 | 72 |
|
46 |
| - socket.on(CollabEvents.CHANGE, async ({ roomId, code }) => { |
47 |
| - if (!roomId || !code) { |
48 |
| - return; |
| 73 | + socket.on( |
| 74 | + CollabEvents.UPDATE_REQUEST, |
| 75 | + (roomId: string, update: Uint8Array) => { |
| 76 | + const doc = collabSessions.get(roomId); |
| 77 | + if (doc) { |
| 78 | + applyUpdateV2(doc, new Uint8Array(update)); |
| 79 | + } else { |
| 80 | + // TODO: error handling |
| 81 | + } |
49 | 82 | }
|
| 83 | + ); |
50 | 84 |
|
51 |
| - await redisClient.set(`collaboration:${roomId}`, code, { |
52 |
| - EX: EXPIRY_TIME, |
53 |
| - }); |
54 |
| - socket.to(roomId).emit(CollabEvents.CODE_CHANGE, { code }); |
55 |
| - }); |
| 85 | + socket.on( |
| 86 | + CollabEvents.UPDATE_CURSOR_REQUEST, |
| 87 | + ( |
| 88 | + roomId: string, |
| 89 | + cursor: { uid: string; username: string; from: number; to: number } |
| 90 | + ) => { |
| 91 | + socket.to(roomId).emit(CollabEvents.UPDATE_CURSOR, cursor); |
| 92 | + } |
| 93 | + ); |
56 | 94 |
|
57 |
| - socket.on(CollabEvents.LEAVE, ({ roomId }) => { |
58 |
| - if (!roomId) { |
| 95 | + socket.on(CollabEvents.LEAVE, (uid: string, roomId: string) => { |
| 96 | + const connectionKey = `${uid}:${roomId}`; |
| 97 | + if (!userConnections.has(connectionKey)) { |
59 | 98 | return;
|
60 | 99 | }
|
61 | 100 |
|
62 |
| - socket.leave(roomId); |
63 |
| - socket.to(roomId).emit(CollabEvents.PARTNER_LEFT); |
| 101 | + clearTimeout(userConnections.get(connectionKey)!); |
| 102 | + |
| 103 | + const connectionTimeout = setTimeout(() => { |
| 104 | + userConnections.delete(connectionKey); |
| 105 | + socket.leave(roomId); |
| 106 | + socket.disconnect(); |
| 107 | + |
| 108 | + const room = io.sockets.adapter.rooms.get(roomId); |
| 109 | + if (!room || room.size === 0) { |
| 110 | + removeCollabSession(roomId); |
| 111 | + } |
| 112 | + }, CONNECTION_DELAY); |
| 113 | + |
| 114 | + userConnections.set(connectionKey, connectionTimeout); |
64 | 115 | });
|
65 | 116 |
|
66 |
| - socket.on(CollabEvents.DISCONNECT, () => { |
67 |
| - const { roomId } = socket.data; |
68 |
| - if (roomId) { |
69 |
| - socket.to(roomId).emit(CollabEvents.PARTNER_DISCONNECTED); |
| 117 | + socket.on(CollabEvents.RECONNECT_REQUEST, async (roomId: string) => { |
| 118 | + // TODO: Handle recconnection |
| 119 | + socket.join(roomId); |
| 120 | + |
| 121 | + const doc = getDocument(roomId); |
| 122 | + const storeData = await redisClient.get(`collaboration:${roomId}`); |
| 123 | + |
| 124 | + if (storeData) { |
| 125 | + const tempDoc = new Doc(); |
| 126 | + const update = Buffer.from(storeData, "base64"); |
| 127 | + applyUpdateV2(tempDoc, new Uint8Array(update)); |
| 128 | + const tempText = tempDoc.getText().toString(); |
| 129 | + |
| 130 | + const text = doc.getText(); |
| 131 | + doc.transact(() => { |
| 132 | + text.delete(0, text.length); |
| 133 | + text.insert(0, tempText); |
| 134 | + }); |
70 | 135 | }
|
71 | 136 | });
|
72 | 137 | };
|
| 138 | + |
| 139 | +const createCollabSession = (roomId: string) => { |
| 140 | + getDocument(roomId); |
| 141 | + partnerReadiness.set(roomId, false); |
| 142 | +}; |
| 143 | + |
| 144 | +const removeCollabSession = (roomId: string) => { |
| 145 | + collabSessions.get(roomId)?.destroy(); |
| 146 | + collabSessions.delete(roomId); |
| 147 | + partnerReadiness.delete(roomId); |
| 148 | +}; |
| 149 | + |
| 150 | +const getDocument = (roomId: string) => { |
| 151 | + let doc = collabSessions.get(roomId); |
| 152 | + if (!doc) { |
| 153 | + doc = new Doc(); |
| 154 | + doc.on(CollabEvents.UPDATE, (_update) => { |
| 155 | + saveDocument(roomId, doc!); |
| 156 | + io.to(roomId).emit(CollabEvents.UPDATE, encodeStateAsUpdateV2(doc!)); |
| 157 | + }); |
| 158 | + collabSessions.set(roomId, doc); |
| 159 | + } |
| 160 | + |
| 161 | + return doc; |
| 162 | +}; |
| 163 | + |
| 164 | +const saveDocument = async (roomId: string, doc: Doc) => { |
| 165 | + const docState = encodeStateAsUpdateV2(doc); |
| 166 | + const docAsString = Buffer.from(docState).toString("base64"); |
| 167 | + await redisClient.set(`collaboration:${roomId}`, docAsString, { |
| 168 | + EX: EXPIRY_TIME, |
| 169 | + }); |
| 170 | +}; |
0 commit comments