@@ -13,28 +13,30 @@ enum CollabEvents {
13
13
UPDATE_REQUEST = "update_request" ,
14
14
UPDATE_CURSOR_REQUEST = "update_cursor_request" ,
15
15
RECONNECT_REQUEST = "reconnect_request" ,
16
+ END_SESSION_REQUEST = "end_session_request" ,
16
17
17
18
// Send
18
19
ROOM_READY = "room_ready" ,
19
20
DOCUMENT_READY = "document_ready" ,
21
+ DOCUMENT_NOT_FOUND = "document_not_found" ,
20
22
UPDATE = "updateV2" ,
21
23
UPDATE_CURSOR = "update_cursor" ,
22
- PARTNER_LEFT = "partner_left" ,
24
+ END_SESSION = "end_session" ,
25
+ PARTNER_DISCONNECTED = "partner_disconnected" ,
23
26
}
24
27
25
28
const EXPIRY_TIME = 3600 ;
26
- const CONNECTION_DELAY = 3000 ; // time window to allow for page re-renders / refresh
29
+ const CONNECTION_DELAY = 3000 ; // time window to allow for page re-renders
27
30
28
31
const userConnections = new Map < string , NodeJS . Timeout | null > ( ) ;
29
32
const collabSessions = new Map < string , Doc > ( ) ;
30
33
const partnerReadiness = new Map < string , boolean > ( ) ;
31
34
32
35
export const handleWebsocketCollabEvents = ( socket : Socket ) => {
33
- socket . on ( CollabEvents . JOIN , async ( uid : string , roomId : string ) => {
36
+ socket . on ( CollabEvents . JOIN , ( uid : string , roomId : string ) => {
34
37
const connectionKey = `${ uid } :${ roomId } ` ;
35
38
if ( userConnections . has ( connectionKey ) ) {
36
39
clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
37
- return ;
38
40
}
39
41
userConnections . set ( connectionKey , null ) ;
40
42
@@ -47,11 +49,10 @@ export const handleWebsocketCollabEvents = (socket: Socket) => {
47
49
socket . join ( roomId ) ;
48
50
socket . data . roomId = roomId ;
49
51
50
- if (
51
- io . sockets . adapter . rooms . get ( roomId ) ?. size === 2 &&
52
- ! collabSessions . has ( roomId )
53
- ) {
54
- createCollabSession ( roomId ) ;
52
+ if ( io . sockets . adapter . rooms . get ( roomId ) ?. size === 2 ) {
53
+ if ( ! collabSessions . has ( roomId ) ) {
54
+ createCollabSession ( roomId ) ;
55
+ }
55
56
io . to ( roomId ) . emit ( CollabEvents . ROOM_READY , true ) ;
56
57
}
57
58
} ) ;
@@ -107,7 +108,8 @@ export const handleWebsocketCollabEvents = (socket: Socket) => {
107
108
if ( doc ) {
108
109
applyUpdateV2 ( doc , new Uint8Array ( update ) ) ;
109
110
} else {
110
- // TODO: error handling
111
+ io . to ( roomId ) . emit ( CollabEvents . DOCUMENT_NOT_FOUND ) ;
112
+ io . sockets . adapter . rooms . delete ( roomId ) ;
111
113
}
112
114
}
113
115
) ;
@@ -124,41 +126,45 @@ export const handleWebsocketCollabEvents = (socket: Socket) => {
124
126
125
127
socket . on (
126
128
CollabEvents . LEAVE ,
127
- ( uid : string , roomId : string , isImmediate : boolean ) => {
129
+ ( uid : string , roomId : string , isPartnerNotified : boolean ) => {
128
130
const connectionKey = `${ uid } :${ roomId } ` ;
129
- if ( isImmediate || ! userConnections . has ( connectionKey ) ) {
131
+ if ( userConnections . has ( connectionKey ) ) {
132
+ clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
133
+ }
134
+
135
+ if ( isPartnerNotified ) {
130
136
handleUserLeave ( uid , roomId , socket ) ;
131
137
return ;
132
138
}
133
139
134
- clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
135
-
136
140
const connectionTimeout = setTimeout ( ( ) => {
137
141
handleUserLeave ( uid , roomId , socket ) ;
142
+ io . to ( roomId ) . emit ( CollabEvents . PARTNER_DISCONNECTED ) ;
138
143
} , CONNECTION_DELAY ) ;
139
144
140
145
userConnections . set ( connectionKey , connectionTimeout ) ;
141
146
}
142
147
) ;
143
148
144
- socket . on ( CollabEvents . RECONNECT_REQUEST , async ( roomId : string ) => {
145
- // TODO: Handle recconnection
146
- socket . join ( roomId ) ;
147
-
148
- const doc = getDocument ( roomId ) ;
149
- const storeData = await redisClient . get ( `collaboration: ${ roomId } ` ) ;
149
+ socket . on (
150
+ CollabEvents . END_SESSION_REQUEST ,
151
+ ( roomId : string , sessionDuration : number ) => {
152
+ socket . to ( roomId ) . emit ( CollabEvents . END_SESSION , sessionDuration ) ;
153
+ }
154
+ ) ;
150
155
151
- if ( storeData ) {
152
- const tempDoc = new Doc ( ) ;
153
- const update = Buffer . from ( storeData , "base64" ) ;
154
- applyUpdateV2 ( tempDoc , new Uint8Array ( update ) ) ;
155
- const tempText = tempDoc . getText ( ) . toString ( ) ;
156
+ socket . on ( CollabEvents . RECONNECT_REQUEST , ( roomId : string ) => {
157
+ const room = io . sockets . adapter . rooms . get ( roomId ) ;
158
+ if ( ! room || room . size < 2 ) {
159
+ socket . join ( roomId ) ;
160
+ socket . data . roomId = roomId ;
161
+ }
156
162
157
- const text = doc . getText ( ) ;
158
- doc . transact ( ( ) => {
159
- text . delete ( 0 , text . length ) ;
160
- text . insert ( 0 , tempText ) ;
161
- } ) ;
163
+ if (
164
+ io . sockets . adapter . rooms . get ( roomId ) ?. size === 2 &&
165
+ ! collabSessions . has ( roomId )
166
+ ) {
167
+ restoreDocument ( roomId ) ;
162
168
}
163
169
} ) ;
164
170
} ;
@@ -172,6 +178,7 @@ const removeCollabSession = (roomId: string) => {
172
178
collabSessions . get ( roomId ) ?. destroy ( ) ;
173
179
collabSessions . delete ( roomId ) ;
174
180
partnerReadiness . delete ( roomId ) ;
181
+ redisClient . del ( roomId ) ;
175
182
} ;
176
183
177
184
const getDocument = ( roomId : string ) => {
@@ -188,28 +195,38 @@ const getDocument = (roomId: string) => {
188
195
return doc ;
189
196
} ;
190
197
191
- const saveDocument = async ( roomId : string , doc : Doc ) => {
198
+ const saveDocument = ( roomId : string , doc : Doc ) => {
192
199
const docState = encodeStateAsUpdateV2 ( doc ) ;
193
200
const docAsString = Buffer . from ( docState ) . toString ( "base64" ) ;
194
- await redisClient . set ( `collaboration:${ roomId } ` , docAsString , {
201
+ redisClient . set ( `collaboration:${ roomId } ` , docAsString , {
195
202
EX : EXPIRY_TIME ,
196
203
} ) ;
197
204
} ;
198
205
206
+ const restoreDocument = async ( roomId : string ) => {
207
+ const doc = getDocument ( roomId ) ;
208
+ const storeData = await redisClient . get ( `collaboration:${ roomId } ` ) ;
209
+
210
+ if ( storeData ) {
211
+ const tempDoc = new Doc ( ) ;
212
+ const update = Buffer . from ( storeData , "base64" ) ;
213
+ applyUpdateV2 ( tempDoc , new Uint8Array ( update ) ) ;
214
+ const tempText = tempDoc . getText ( ) . toString ( ) ;
215
+
216
+ const text = doc . getText ( ) ;
217
+ doc . transact ( ( ) => {
218
+ text . delete ( 0 , text . length ) ;
219
+ text . insert ( 0 , tempText ) ;
220
+ } ) ;
221
+ }
222
+ } ;
223
+
199
224
const handleUserLeave = ( uid : string , roomId : string , socket : Socket ) => {
200
225
const connectionKey = `${ uid } :${ roomId } ` ;
201
- if ( userConnections . has ( connectionKey ) ) {
202
- clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
203
- userConnections . delete ( connectionKey ) ;
204
- }
226
+ userConnections . delete ( connectionKey ) ;
205
227
206
228
socket . leave ( roomId ) ;
207
229
socket . disconnect ( ) ;
208
230
209
- const room = io . sockets . adapter . rooms . get ( roomId ) ;
210
- if ( ! room || room . size === 0 ) {
211
- removeCollabSession ( roomId ) ;
212
- } else {
213
- io . to ( roomId ) . emit ( CollabEvents . PARTNER_LEFT ) ;
214
- }
231
+ removeCollabSession ( roomId ) ;
215
232
} ;
0 commit comments