1+ import WebSocket , { WebSocketServer } from 'ws' ;
2+ import { sessionStorage } from "../app/auth/authentication.server" ;
3+ import type { IncomingMessage } from 'http' ;
4+
5+ function initializeWebsocketServer ( wss : WebSocketServer ) {
6+ // keep track of socket subscriptions
7+ const websockets = new Map < WebSocket , string > ( ) ;
8+ const subscriptions = new Map < string , Set < WebSocket > > ( ) ;
9+ const lastMessage = new Map < string , string > ( ) ;
10+ initializeWebsocketClient ( subscriptions , lastMessage ) ;
11+
12+ // authenticate new websocket sessions
13+ wss . on ( "connection" , async ( ws : WebSocket , request : IncomingMessage ) => {
14+ const cookieHeader = request . headers . cookie ;
15+ if ( cookieHeader ) {
16+ try {
17+ const session = await sessionStorage . getSession ( cookieHeader ) ;
18+ const user = session . get ( "user" ) ;
19+ if ( ! user ) {
20+ console . warn ( "Websocket authentication failed. Sign in required." ) ;
21+ ws . close ( 1008 , "Unauthorized" ) ;
22+ return ;
23+ }
24+
25+ // handle topic subscription
26+ ws . onmessage = ( event : WebSocket . MessageEvent ) => {
27+ var topic = event . data . toString ( ) ;
28+ websockets . set ( ws , topic ) ;
29+ var topicSubscriptions = subscriptions . get ( topic ) ;
30+ if ( topicSubscriptions ) topicSubscriptions . add ( ws ) ;
31+ else subscriptions . set ( topic , new Set < WebSocket > ( [ ws ] ) ) ;
32+ var messageToSend = lastMessage . get ( topic ) ;
33+ if ( messageToSend ) ws . send ( messageToSend ) ;
34+ } ;
35+
36+ // unsubscribe from topics
37+ ws . onclose = ( ) => {
38+ var topic = websockets . get ( ws ) ;
39+ if ( topic ) {
40+ websockets . delete ( ws ) ;
41+ var topicSubscriptions = subscriptions . get ( topic ) ;
42+ if ( topicSubscriptions ) topicSubscriptions . delete ( ws ) ;
43+ }
44+ } ;
45+ } catch ( error ) {
46+ console . error ( "Error authenticating websocket session:" , error ) ;
47+ ws . close ( 1011 , "Internal server error" ) ;
48+ return ;
49+ }
50+ } else {
51+ console . warn ( "Websocket authentication failed. Sign in required." ) ;
52+ ws . close ( 1008 , "Unauthorized" ) ;
53+ return ;
54+ }
55+ } ) ;
56+ }
57+
58+ export function initializeWebsocketClient ( subscriptions : Map < string , Set < WebSocket > > , lastMessage : Map < string , string > ) {
59+ let reconnectRetryDelay = 1000 ;
60+ let reconnectRetryMaxDelay = 30000 ;
61+ let reconnectTimeout : NodeJS . Timeout | null = null ;
62+ const url = getBackendWebsocketUrl ( ) ;
63+
64+ function connect ( ) {
65+ const socket = new WebSocket ( url ) ;
66+
67+ socket . onopen = ( ) => {
68+ reconnectRetryDelay = 1000 ;
69+ if ( reconnectTimeout ) {
70+ clearTimeout ( reconnectTimeout ) ;
71+ reconnectTimeout = null ;
72+ }
73+
74+ socket . send ( Buffer . from ( process . env . FRONTEND_BACKEND_API_KEY ! , "utf-8" ) , { binary : false } ) ;
75+ } ;
76+
77+ socket . onmessage = ( event : WebSocket . MessageEvent ) => {
78+ var rawMessage = event . data . toString ( ) ;
79+ var topicMessage = JSON . parse ( rawMessage ) ;
80+ var [ topic , message ] = [ topicMessage . Topic , topicMessage . Message ] ;
81+ if ( ! topic || ! message ) return ;
82+ lastMessage . set ( topic , message ) ;
83+ var subscribed = subscriptions . get ( topic ) || [ ] ;
84+ subscribed . forEach ( client => {
85+ if ( client . readyState === client . OPEN ) {
86+ client . send ( message ) ;
87+ }
88+ } ) ;
89+ } ;
90+
91+ socket . onerror = ( event : WebSocket . ErrorEvent ) => {
92+ console . error ( 'WebSocket error:' , event . message ) ;
93+ } ;
94+
95+ socket . onclose = ( event : WebSocket . CloseEvent ) => {
96+ console . info ( `WebSocket closed (code: ${ event . code } , reason: ${ event . reason } ) — retrying in ${ reconnectRetryDelay / 1000 } s` ) ;
97+ scheduleReconnect ( ) ;
98+ } ;
99+ }
100+
101+ function scheduleReconnect ( ) {
102+ if ( reconnectTimeout ) return ;
103+ reconnectTimeout = setTimeout ( ( ) => {
104+ reconnectRetryDelay = Math . min ( reconnectRetryDelay * 2 , reconnectRetryMaxDelay ) ;
105+ connect ( ) ;
106+ } , reconnectRetryDelay ) ;
107+ }
108+
109+ connect ( ) ;
110+ }
111+
112+ function getBackendWebsocketUrl ( ) {
113+ const host = process . env . BACKEND_URL ! ;
114+ return `${ host . replace ( / \/ $ / , '' ) } /ws` . replace ( / ^ h t t p / , 'ws' ) ;
115+ }
116+
117+ export const websocketServer = {
118+ initialize : initializeWebsocketServer
119+ }
0 commit comments