@@ -2,6 +2,7 @@ import { Socket } from "socket.io";
2
2
import { io } from "../server" ;
3
3
import redisClient from "../config/redis" ;
4
4
import { Doc , applyUpdateV2 , encodeStateAsUpdateV2 } from "yjs" ;
5
+ import { createQuestionHistory } from "../api/questionHistoryService" ;
5
6
6
7
enum CollabEvents {
7
8
// Receive
@@ -12,28 +13,30 @@ enum CollabEvents {
12
13
UPDATE_REQUEST = "update_request" ,
13
14
UPDATE_CURSOR_REQUEST = "update_cursor_request" ,
14
15
RECONNECT_REQUEST = "reconnect_request" ,
16
+ END_SESSION_REQUEST = "end_session_request" ,
15
17
16
18
// Send
17
19
ROOM_READY = "room_ready" ,
18
20
DOCUMENT_READY = "document_ready" ,
21
+ DOCUMENT_NOT_FOUND = "document_not_found" ,
19
22
UPDATE = "updateV2" ,
20
23
UPDATE_CURSOR = "update_cursor" ,
21
- PARTNER_LEFT = "partner_left" ,
24
+ END_SESSION = "end_session" ,
25
+ PARTNER_DISCONNECTED = "partner_disconnected" ,
22
26
}
23
27
24
28
const EXPIRY_TIME = 3600 ;
25
- const CONNECTION_DELAY = 3000 ; // time window to allow for page re-renders / refresh
29
+ const CONNECTION_DELAY = 3000 ; // time window to allow for page re-renders
26
30
27
31
const userConnections = new Map < string , NodeJS . Timeout | null > ( ) ;
28
32
const collabSessions = new Map < string , Doc > ( ) ;
29
33
const partnerReadiness = new Map < string , boolean > ( ) ;
30
34
31
35
export const handleWebsocketCollabEvents = ( socket : Socket ) => {
32
- socket . on ( CollabEvents . JOIN , async ( uid : string , roomId : string ) => {
36
+ socket . on ( CollabEvents . JOIN , ( uid : string , roomId : string ) => {
33
37
const connectionKey = `${ uid } :${ roomId } ` ;
34
38
if ( userConnections . has ( connectionKey ) ) {
35
39
clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
36
- return ;
37
40
}
38
41
userConnections . set ( connectionKey , null ) ;
39
42
@@ -46,28 +49,57 @@ export const handleWebsocketCollabEvents = (socket: Socket) => {
46
49
socket . join ( roomId ) ;
47
50
socket . data . roomId = roomId ;
48
51
49
- if (
50
- io . sockets . adapter . rooms . get ( roomId ) ?. size === 2 &&
51
- ! collabSessions . has ( roomId )
52
- ) {
53
- createCollabSession ( roomId ) ;
52
+ if ( io . sockets . adapter . rooms . get ( roomId ) ?. size === 2 ) {
53
+ if ( ! collabSessions . has ( roomId ) ) {
54
+ createCollabSession ( roomId ) ;
55
+ }
54
56
io . to ( roomId ) . emit ( CollabEvents . ROOM_READY , true ) ;
55
57
}
56
58
} ) ;
57
59
58
- socket . on ( CollabEvents . INIT_DOCUMENT , ( roomId : string , template : string ) => {
59
- const doc = getDocument ( roomId ) ;
60
- const isPartnerReady = partnerReadiness . get ( roomId ) ;
61
-
62
- if ( isPartnerReady && doc . getText ( ) . length === 0 ) {
63
- doc . transact ( ( ) => {
64
- doc . getText ( ) . insert ( 0 , template ) ;
65
- } ) ;
66
- io . to ( roomId ) . emit ( CollabEvents . DOCUMENT_READY ) ;
67
- } else {
68
- partnerReadiness . set ( roomId , true ) ;
60
+ socket . on (
61
+ CollabEvents . INIT_DOCUMENT ,
62
+ (
63
+ roomId : string ,
64
+ template : string ,
65
+ uid1 : string ,
66
+ uid2 : string ,
67
+ language : string ,
68
+ qnId : string ,
69
+ qnTitle : string
70
+ ) => {
71
+ const doc = getDocument ( roomId ) ;
72
+ const isPartnerReady = partnerReadiness . get ( roomId ) ;
73
+
74
+ if ( isPartnerReady && doc . getText ( ) . length === 0 ) {
75
+ const token =
76
+ socket . handshake . headers . authorization || socket . handshake . auth . token ;
77
+ createQuestionHistory (
78
+ [ uid1 , uid2 ] ,
79
+ qnId ,
80
+ qnTitle ,
81
+ "Attempted" ,
82
+ template ,
83
+ language ,
84
+ token
85
+ )
86
+ . then ( ( res ) => {
87
+ doc . transact ( ( ) => {
88
+ doc . getText ( ) . insert ( 0 , template ) ;
89
+ } ) ;
90
+ io . to ( roomId ) . emit (
91
+ CollabEvents . DOCUMENT_READY ,
92
+ res . data . qnHistory . id
93
+ ) ;
94
+ } )
95
+ . catch ( ( err ) => {
96
+ console . log ( err ) ;
97
+ } ) ;
98
+ } else {
99
+ partnerReadiness . set ( roomId , true ) ;
100
+ }
69
101
}
70
- } ) ;
102
+ ) ;
71
103
72
104
socket . on (
73
105
CollabEvents . UPDATE_REQUEST ,
@@ -76,7 +108,8 @@ export const handleWebsocketCollabEvents = (socket: Socket) => {
76
108
if ( doc ) {
77
109
applyUpdateV2 ( doc , new Uint8Array ( update ) ) ;
78
110
} else {
79
- // TODO: error handling
111
+ io . to ( roomId ) . emit ( CollabEvents . DOCUMENT_NOT_FOUND ) ;
112
+ io . sockets . adapter . rooms . delete ( roomId ) ;
80
113
}
81
114
}
82
115
) ;
@@ -93,41 +126,45 @@ export const handleWebsocketCollabEvents = (socket: Socket) => {
93
126
94
127
socket . on (
95
128
CollabEvents . LEAVE ,
96
- ( uid : string , roomId : string , isImmediate : boolean ) => {
129
+ ( uid : string , roomId : string , isPartnerNotified : boolean ) => {
97
130
const connectionKey = `${ uid } :${ roomId } ` ;
98
- if ( isImmediate || ! userConnections . has ( connectionKey ) ) {
131
+ if ( userConnections . has ( connectionKey ) ) {
132
+ clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
133
+ }
134
+
135
+ if ( isPartnerNotified ) {
99
136
handleUserLeave ( uid , roomId , socket ) ;
100
137
return ;
101
138
}
102
139
103
- clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
104
-
105
140
const connectionTimeout = setTimeout ( ( ) => {
106
141
handleUserLeave ( uid , roomId , socket ) ;
142
+ io . to ( roomId ) . emit ( CollabEvents . PARTNER_DISCONNECTED ) ;
107
143
} , CONNECTION_DELAY ) ;
108
144
109
145
userConnections . set ( connectionKey , connectionTimeout ) ;
110
146
}
111
147
) ;
112
148
113
- socket . on ( CollabEvents . RECONNECT_REQUEST , async ( roomId : string ) => {
114
- // TODO: Handle recconnection
115
- socket . join ( roomId ) ;
116
-
117
- const doc = getDocument ( roomId ) ;
118
- const storeData = await redisClient . get ( `collaboration: ${ roomId } ` ) ;
149
+ socket . on (
150
+ CollabEvents . END_SESSION_REQUEST ,
151
+ ( roomId : string , sessionDuration : number ) => {
152
+ socket . to ( roomId ) . emit ( CollabEvents . END_SESSION , sessionDuration ) ;
153
+ }
154
+ ) ;
119
155
120
- if ( storeData ) {
121
- const tempDoc = new Doc ( ) ;
122
- const update = Buffer . from ( storeData , "base64" ) ;
123
- applyUpdateV2 ( tempDoc , new Uint8Array ( update ) ) ;
124
- const tempText = tempDoc . getText ( ) . toString ( ) ;
156
+ socket . on ( CollabEvents . RECONNECT_REQUEST , ( roomId : string ) => {
157
+ const room = io . sockets . adapter . rooms . get ( roomId ) ;
158
+ if ( ! room || room . size < 2 ) {
159
+ socket . join ( roomId ) ;
160
+ socket . data . roomId = roomId ;
161
+ }
125
162
126
- const text = doc . getText ( ) ;
127
- doc . transact ( ( ) => {
128
- text . delete ( 0 , text . length ) ;
129
- text . insert ( 0 , tempText ) ;
130
- } ) ;
163
+ if (
164
+ io . sockets . adapter . rooms . get ( roomId ) ?. size === 2 &&
165
+ ! collabSessions . has ( roomId )
166
+ ) {
167
+ restoreDocument ( roomId ) ;
131
168
}
132
169
} ) ;
133
170
} ;
@@ -141,6 +178,7 @@ const removeCollabSession = (roomId: string) => {
141
178
collabSessions . get ( roomId ) ?. destroy ( ) ;
142
179
collabSessions . delete ( roomId ) ;
143
180
partnerReadiness . delete ( roomId ) ;
181
+ redisClient . del ( roomId ) ;
144
182
} ;
145
183
146
184
const getDocument = ( roomId : string ) => {
@@ -157,28 +195,38 @@ const getDocument = (roomId: string) => {
157
195
return doc ;
158
196
} ;
159
197
160
- const saveDocument = async ( roomId : string , doc : Doc ) => {
198
+ const saveDocument = ( roomId : string , doc : Doc ) => {
161
199
const docState = encodeStateAsUpdateV2 ( doc ) ;
162
200
const docAsString = Buffer . from ( docState ) . toString ( "base64" ) ;
163
- await redisClient . set ( `collaboration:${ roomId } ` , docAsString , {
201
+ redisClient . set ( `collaboration:${ roomId } ` , docAsString , {
164
202
EX : EXPIRY_TIME ,
165
203
} ) ;
166
204
} ;
167
205
206
+ const restoreDocument = async ( roomId : string ) => {
207
+ const doc = getDocument ( roomId ) ;
208
+ const storeData = await redisClient . get ( `collaboration:${ roomId } ` ) ;
209
+
210
+ if ( storeData ) {
211
+ const tempDoc = new Doc ( ) ;
212
+ const update = Buffer . from ( storeData , "base64" ) ;
213
+ applyUpdateV2 ( tempDoc , new Uint8Array ( update ) ) ;
214
+ const tempText = tempDoc . getText ( ) . toString ( ) ;
215
+
216
+ const text = doc . getText ( ) ;
217
+ doc . transact ( ( ) => {
218
+ text . delete ( 0 , text . length ) ;
219
+ text . insert ( 0 , tempText ) ;
220
+ } ) ;
221
+ }
222
+ } ;
223
+
168
224
const handleUserLeave = ( uid : string , roomId : string , socket : Socket ) => {
169
225
const connectionKey = `${ uid } :${ roomId } ` ;
170
- if ( userConnections . has ( connectionKey ) ) {
171
- clearTimeout ( userConnections . get ( connectionKey ) ! ) ;
172
- userConnections . delete ( connectionKey ) ;
173
- }
226
+ userConnections . delete ( connectionKey ) ;
174
227
175
228
socket . leave ( roomId ) ;
176
229
socket . disconnect ( ) ;
177
230
178
- const room = io . sockets . adapter . rooms . get ( roomId ) ;
179
- if ( ! room || room . size === 0 ) {
180
- removeCollabSession ( roomId ) ;
181
- } else {
182
- io . to ( roomId ) . emit ( CollabEvents . PARTNER_LEFT ) ;
183
- }
231
+ removeCollabSession ( roomId ) ;
184
232
} ;
0 commit comments