@@ -17,12 +17,17 @@ import {
17
17
WebSocketPayload ,
18
18
WebSocketTopics ,
19
19
} from '../web-socket-channel' ;
20
+ import {
21
+ getWsMessageTimeoutMs ,
22
+ getWsPingIntervalMs ,
23
+ getWsPingTimeoutMs ,
24
+ } from '../web-socket-transmitter' ;
20
25
21
26
/**
22
27
* SocketIO channel for sending real time API updates.
23
28
*/
24
29
export class SocketIOChannel extends WebSocketChannel {
25
- private io ?: SocketIOServer < ClientToServerMessages , ServerToClientMessages > ;
30
+ private io ?: SocketIOServer < ClientToServerMessages , ServerToClientMessages < true > > ;
26
31
private adapter ?: Adapter ;
27
32
28
33
constructor ( server : http . Server ) {
@@ -33,9 +38,14 @@ export class SocketIOChannel extends WebSocketChannel {
33
38
}
34
39
35
40
connect ( ) : void {
36
- const io = new SocketIOServer < ClientToServerMessages , ServerToClientMessages > ( this . server , {
37
- cors : { origin : '*' } ,
38
- } ) ;
41
+ const io = new SocketIOServer < ClientToServerMessages , ServerToClientMessages < true > > (
42
+ this . server ,
43
+ {
44
+ cors : { origin : '*' } ,
45
+ pingInterval : getWsPingIntervalMs ( ) ,
46
+ pingTimeout : getWsPingTimeoutMs ( ) ,
47
+ }
48
+ ) ;
39
49
this . io = io ;
40
50
41
51
io . on ( 'connection' , async socket => {
@@ -153,74 +163,136 @@ export class SocketIOChannel extends WebSocketChannel {
153
163
return false ;
154
164
}
155
165
166
+ private async getTopicSockets ( room : Topic ) {
167
+ if ( ! this . io ) {
168
+ return ;
169
+ }
170
+ const sockets = [ ] ;
171
+ const socketIds = await this . io . to ( room ) . allSockets ( ) ;
172
+ for ( const id of socketIds ) {
173
+ const socket = this . io . sockets . sockets . get ( id ) ;
174
+ if ( socket ) {
175
+ sockets . push ( socket ) ;
176
+ }
177
+ }
178
+ return sockets ;
179
+ }
180
+
156
181
send < P extends keyof WebSocketPayload > (
157
182
payload : P ,
158
183
...args : ListenerType < WebSocketPayload [ P ] >
159
184
) : void {
160
185
if ( ! this . io ) {
161
186
return ;
162
187
}
188
+ // If a client takes more than this number of ms to respond to an event `emit`, it will be
189
+ // disconnected.
190
+ const timeout = getWsMessageTimeoutMs ( ) ;
163
191
switch ( payload ) {
164
192
case 'block' : {
165
193
const [ block ] = args as ListenerType < WebSocketPayload [ 'block' ] > ;
166
194
this . prometheus ?. sendEvent ( 'block' ) ;
167
- this . io . to ( 'block' ) . emit ( 'block' , block ) ;
195
+ void this . getTopicSockets ( 'block' ) . then ( sockets =>
196
+ sockets ?. forEach ( socket =>
197
+ socket . timeout ( timeout ) . emit ( 'block' , block , _ => socket . disconnect ( true ) )
198
+ )
199
+ ) ;
168
200
break ;
169
201
}
170
202
case 'microblock' : {
171
203
const [ microblock ] = args as ListenerType < WebSocketPayload [ 'microblock' ] > ;
172
204
this . prometheus ?. sendEvent ( 'microblock' ) ;
173
- this . io . to ( 'microblock' ) . emit ( 'microblock' , microblock ) ;
205
+ void this . getTopicSockets ( 'microblock' ) . then ( sockets =>
206
+ sockets ?. forEach ( socket =>
207
+ socket . timeout ( timeout ) . emit ( 'microblock' , microblock , _ => socket . disconnect ( true ) )
208
+ )
209
+ ) ;
174
210
break ;
175
211
}
176
212
case 'mempoolTransaction' : {
177
213
const [ tx ] = args as ListenerType < WebSocketPayload [ 'mempoolTransaction' ] > ;
178
214
this . prometheus ?. sendEvent ( 'mempool' ) ;
179
- this . io . to ( 'mempool' ) . emit ( 'mempool' , tx ) ;
215
+ void this . getTopicSockets ( 'mempool' ) . then ( sockets =>
216
+ sockets ?. forEach ( socket =>
217
+ socket . timeout ( timeout ) . emit ( 'mempool' , tx , _ => socket . disconnect ( true ) )
218
+ )
219
+ ) ;
180
220
break ;
181
221
}
182
222
case 'transaction' : {
183
223
const [ tx ] = args as ListenerType < WebSocketPayload [ 'transaction' ] > ;
184
224
this . prometheus ?. sendEvent ( 'transaction' ) ;
185
- this . io . to ( `transaction:${ tx . tx_id } ` ) . emit ( 'transaction' , tx ) ;
225
+ void this . getTopicSockets ( `transaction:${ tx . tx_id } ` ) . then ( sockets =>
226
+ sockets ?. forEach ( socket =>
227
+ socket . timeout ( timeout ) . emit ( 'transaction' , tx , _ => socket . disconnect ( true ) )
228
+ )
229
+ ) ;
186
230
break ;
187
231
}
188
232
case 'nftEvent' : {
189
233
const [ event ] = args as ListenerType < WebSocketPayload [ 'nftEvent' ] > ;
190
234
this . prometheus ?. sendEvent ( 'nft-event' ) ;
191
- this . io . to ( 'nft-event' ) . emit ( 'nft-event' , event ) ;
235
+ void this . getTopicSockets ( `nft-event` ) . then ( sockets =>
236
+ sockets ?. forEach ( socket =>
237
+ socket . timeout ( timeout ) . emit ( 'nft-event' , event , _ => socket . disconnect ( true ) )
238
+ )
239
+ ) ;
192
240
break ;
193
241
}
194
242
case 'nftAssetEvent' : {
195
243
const [ assetIdentifier , value , event ] = args as ListenerType <
196
244
WebSocketPayload [ 'nftAssetEvent' ]
197
245
> ;
198
246
this . prometheus ?. sendEvent ( 'nft-asset-event' ) ;
199
- this . io . to ( 'nft-event' ) . emit ( 'nft-asset-event' , assetIdentifier , value , event ) ;
247
+ void this . getTopicSockets ( `nft-event` ) . then ( sockets =>
248
+ sockets ?. forEach ( socket =>
249
+ socket
250
+ . timeout ( timeout )
251
+ . emit ( 'nft-asset-event' , assetIdentifier , value , event , _ => socket . disconnect ( true ) )
252
+ )
253
+ ) ;
200
254
break ;
201
255
}
202
256
case 'nftCollectionEvent' : {
203
257
const [ assetIdentifier , event ] = args as ListenerType <
204
258
WebSocketPayload [ 'nftCollectionEvent' ]
205
259
> ;
206
260
this . prometheus ?. sendEvent ( 'nft-collection-event' ) ;
207
- this . io . to ( 'nft-event' ) . emit ( 'nft-collection-event' , assetIdentifier , event ) ;
261
+ void this . getTopicSockets ( `nft-event` ) . then ( sockets =>
262
+ sockets ?. forEach ( socket =>
263
+ socket
264
+ . timeout ( timeout )
265
+ . emit ( 'nft-collection-event' , assetIdentifier , event , _ => socket . disconnect ( true ) )
266
+ )
267
+ ) ;
208
268
break ;
209
269
}
210
270
case 'principalTransaction' : {
211
271
const [ principal , tx ] = args as ListenerType < WebSocketPayload [ 'principalTransaction' ] > ;
212
272
const topic : AddressTransactionTopic = `address-transaction:${ principal } ` ;
213
273
this . prometheus ?. sendEvent ( 'address-transaction' ) ;
214
- this . io . to ( topic ) . emit ( 'address-transaction' , principal , tx ) ;
215
- this . io . to ( topic ) . emit ( topic , principal , tx ) ;
274
+ void this . getTopicSockets ( topic ) . then ( sockets =>
275
+ sockets ?. forEach ( socket => {
276
+ socket
277
+ . timeout ( timeout )
278
+ . emit ( 'address-transaction' , principal , tx , _ => socket . disconnect ( true ) ) ;
279
+ socket . timeout ( timeout ) . emit ( topic , principal , tx , _ => socket . disconnect ( true ) ) ;
280
+ } )
281
+ ) ;
216
282
break ;
217
283
}
218
284
case 'principalStxBalance' : {
219
285
const [ principal , balance ] = args as ListenerType < WebSocketPayload [ 'principalStxBalance' ] > ;
220
286
const topic : AddressStxBalanceTopic = `address-stx-balance:${ principal } ` ;
221
287
this . prometheus ?. sendEvent ( 'address-stx-balance' ) ;
222
- this . io . to ( topic ) . emit ( 'address-stx-balance' , principal , balance ) ;
223
- this . io . to ( topic ) . emit ( topic , principal , balance ) ;
288
+ void this . getTopicSockets ( topic ) . then ( sockets =>
289
+ sockets ?. forEach ( socket => {
290
+ socket
291
+ . timeout ( timeout )
292
+ . emit ( 'address-stx-balance' , principal , balance , _ => socket . disconnect ( true ) ) ;
293
+ socket . timeout ( timeout ) . emit ( topic , principal , balance , _ => socket . disconnect ( true ) ) ;
294
+ } )
295
+ ) ;
224
296
break ;
225
297
}
226
298
}
0 commit comments