Skip to content

Commit 57bac18

Browse files
committed
Merge branch 'canary' of https://github.com/PFConnect/pfcontrol-2 into canary
2 parents 1e31cd3 + 00b1870 commit 57bac18

11 files changed

Lines changed: 463 additions & 229 deletions

File tree

server/websockets/sectorControllerWebsocket.ts

Lines changed: 156 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,88 @@ interface SectorController {
2121
}>;
2222
}
2323

24+
// User roles cache with TTL
25+
const userRolesCache = new Map<
26+
string,
27+
{ roles: SectorController['roles']; timestamp: number }
28+
>();
29+
const ROLES_CACHE_TTL = 5 * 60 * 1000;
30+
31+
// Rate limiting per socket
32+
const rateLimitMap = new Map<
33+
string,
34+
{ count: number; resetTime: number }
35+
>();
36+
const RATE_LIMIT_WINDOW = 10000;
37+
const RATE_LIMIT_MAX = 15;
38+
39+
// Redis key TTL for automatic cleanup
40+
const REDIS_CONTROLLER_TTL = 24 * 60 * 60;
41+
42+
function checkRateLimit(userId: string): boolean {
43+
const now = Date.now();
44+
const userLimit = rateLimitMap.get(userId);
45+
46+
if (!userLimit || now > userLimit.resetTime) {
47+
rateLimitMap.set(userId, { count: 1, resetTime: now + RATE_LIMIT_WINDOW });
48+
return true;
49+
}
50+
51+
if (userLimit.count >= RATE_LIMIT_MAX) {
52+
return false;
53+
}
54+
55+
userLimit.count++;
56+
return true;
57+
}
58+
59+
async function getUserRolesWithCache(
60+
userId: string
61+
): Promise<SectorController['roles']> {
62+
const now = Date.now();
63+
const cached = userRolesCache.get(userId);
64+
65+
if (cached && now - cached.timestamp < ROLES_CACHE_TTL) {
66+
return cached.roles;
67+
}
68+
69+
let roles: SectorController['roles'] = [];
70+
try {
71+
const dbRoles = await getUserRoles(userId);
72+
roles = dbRoles.map((role) => ({
73+
id: role.id,
74+
name: role.name,
75+
color: role.color ?? '#000000',
76+
icon: role.icon ?? '',
77+
priority: role.priority ?? 0,
78+
}));
79+
80+
if (isAdmin(userId)) {
81+
roles.unshift({
82+
id: -1,
83+
name: 'Developer',
84+
color: '#3B82F6',
85+
icon: 'Braces',
86+
priority: 999999,
87+
});
88+
}
89+
90+
userRolesCache.set(userId, { roles, timestamp: now });
91+
} catch (error) {
92+
console.error('Error fetching user roles:', error);
93+
}
94+
95+
return roles;
96+
}
97+
98+
export function invalidateUserRolesCache(userId?: string): void {
99+
if (userId) {
100+
userRolesCache.delete(userId);
101+
} else {
102+
userRolesCache.clear();
103+
}
104+
}
105+
24106
export const getActiveSectorControllers = async (): Promise<
25107
SectorController[]
26108
> => {
@@ -39,6 +121,8 @@ const addSectorController = async (
39121
userId,
40122
JSON.stringify(controllerData)
41123
);
124+
// Set TTL for auto-cleanup in case of ungraceful disconnect
125+
await redisConnection.expire('activeSectorControllers', REDIS_CONTROLLER_TTL);
42126
};
43127

44128
const removeSectorController = async (userId: string): Promise<void> => {
@@ -66,66 +150,67 @@ export function setupSectorControllerWebsocket(
66150
});
67151

68152
io.on('connection', async (socket) => {
153+
let user: { userId?: string; username?: string; avatar?: string | null };
154+
69155
try {
70-
const user = JSON.parse(
71-
Array.isArray(socket.handshake.query.user)
72-
? socket.handshake.query.user[0]
73-
: socket.handshake.query.user || '{}'
74-
);
156+
// Parse and validate user data
157+
const userQuery = Array.isArray(socket.handshake.query.user)
158+
? socket.handshake.query.user[0]
159+
: socket.handshake.query.user;
75160

76-
if (!user.userId) {
161+
if (!userQuery) {
77162
socket.disconnect(true);
78163
return;
79164
}
80165

81-
// Get user roles
82-
let userRoles: Array<{
83-
id: number;
84-
name: string;
85-
color: string;
86-
icon: string;
87-
priority: number;
88-
}> = [];
89166
try {
90-
userRoles = (await getUserRoles(user.userId)).map((role) => ({
91-
id: role.id,
92-
name: role.name,
93-
color: role.color ?? '#000000',
94-
icon: role.icon ?? '',
95-
priority: role.priority ?? 0,
96-
}));
97-
} catch (error) {
98-
console.error('Error fetching user roles:', error);
167+
user = JSON.parse(userQuery);
168+
} catch (parseError) {
169+
console.error('Invalid user data:', parseError);
170+
socket.disconnect(true);
171+
return;
99172
}
100173

101-
if (isAdmin(user.userId)) {
102-
userRoles.unshift({
103-
id: -1,
104-
name: 'Developer',
105-
color: '#3B82F6',
106-
icon: 'Braces',
107-
priority: 999999,
108-
});
174+
if (!user.userId || !user.username) {
175+
socket.disconnect(true);
176+
return;
109177
}
110178

179+
// Get user roles with caching
180+
const userRoles = await getUserRolesWithCache(user.userId);
181+
111182
socket.join(`sector-${user.userId}`);
112183

113184
// Handle station selection
114185
socket.on('selectStation', async ({ station }) => {
186+
if (!user.userId) return;
187+
188+
if (!checkRateLimit(user.userId)) {
189+
socket.emit('error', { message: 'Too many requests. Please slow down.' });
190+
return;
191+
}
192+
193+
if (!station || typeof station !== 'string' || station.length > 10) {
194+
socket.emit('error', { message: 'Invalid station format' });
195+
return;
196+
}
197+
115198
try {
116199
const sectorController: SectorController = {
117200
id: user.userId,
118-
username: user.username,
201+
username: user.username!,
119202
avatar: user.avatar || null,
120-
station,
203+
station: station.trim().toUpperCase(),
121204
joinedAt: Date.now(),
122205
roles: userRoles,
123206
};
124207

125208
await addSectorController(user.userId, sectorController);
126209

210+
// Broadcast to all connected clients
211+
io.emit('controllerAdded', sectorController);
127212

128-
socket.emit('stationSelected', { station });
213+
socket.emit('stationSelected', { station: sectorController.station });
129214
} catch (error) {
130215
console.error('Error selecting station:', error);
131216
socket.emit('error', { message: 'Failed to select station' });
@@ -134,9 +219,18 @@ export function setupSectorControllerWebsocket(
134219

135220
// Handle station deselection
136221
socket.on('deselectStation', async () => {
222+
if (!user.userId) return;
223+
224+
if (!checkRateLimit(user.userId)) {
225+
socket.emit('error', { message: 'Too many requests. Please slow down.' });
226+
return;
227+
}
228+
137229
try {
138230
await removeSectorController(user.userId);
139231

232+
// Broadcast to all connected clients
233+
io.emit('controllerRemoved', { id: user.userId });
140234

141235
socket.emit('stationDeselected');
142236
} catch (error) {
@@ -145,13 +239,41 @@ export function setupSectorControllerWebsocket(
145239
});
146240

147241
socket.on('disconnect', async () => {
242+
if (!user.userId) return;
243+
148244
await removeSectorController(user.userId);
245+
246+
// Broadcast to all connected clients
247+
io.emit('controllerRemoved', { id: user.userId });
248+
249+
rateLimitMap.delete(user.userId);
149250
});
150251
} catch (error) {
151252
console.error('Error in sector controller websocket connection:', error);
152253
socket.disconnect(true);
153254
}
154255
});
155256

257+
// Periodic cleanup of expired cache entries
258+
const cacheCleanupInterval = setInterval(() => {
259+
const now = Date.now();
260+
for (const [userId, data] of userRolesCache.entries()) {
261+
if (now - data.timestamp > ROLES_CACHE_TTL) {
262+
userRolesCache.delete(userId);
263+
}
264+
}
265+
for (const [userId, data] of rateLimitMap.entries()) {
266+
if (now > data.resetTime + 60000) {
267+
rateLimitMap.delete(userId);
268+
}
269+
}
270+
}, 60000);
271+
272+
io.on('close', () => {
273+
clearInterval(cacheCleanupInterval);
274+
userRolesCache.clear();
275+
rateLimitMap.clear();
276+
});
277+
156278
return io;
157279
}

0 commit comments

Comments
 (0)