Skip to content

Commit 16703c4

Browse files
committed
PEER-232: Add updated BE logic
Signed-off-by: SeeuSim <[email protected]>
1 parent a6ef1ff commit 16703c4

File tree

15 files changed

+292
-164
lines changed

15 files changed

+292
-164
lines changed
Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,24 @@
11
import type { Request, Response } from 'express';
22
import { StatusCodes } from 'http-status-codes';
33

4-
import { client, logQueueStatus } from '@/lib/db';
5-
import { logger } from '@/lib/utils';
6-
import { createNotifSocket, queueingService } from '@/services';
7-
import type { IRedisClient, IRequestMatchPayload } from '@/types';
8-
9-
let redisClient: IRedisClient;
4+
import { createNotifSocket } from '@/services';
5+
import type { IRequestMatchRESTPayload } from '@/types';
106

117
export const matchRequestController = async (req: Request, res: Response) => {
12-
const payload: Partial<IRequestMatchPayload> = req.body;
13-
const { userId, difficulty, topic } = payload;
8+
const payload: Partial<IRequestMatchRESTPayload> = req.body;
9+
const { userId } = payload;
1410

15-
if (!userId || (!difficulty && !topic)) {
11+
if (!userId) {
1612
return res.status(StatusCodes.UNPROCESSABLE_ENTITY).json('Malformed Request');
1713
}
1814

19-
if (!redisClient || !redisClient.isOpen || !redisClient.isReady) {
20-
redisClient = await client.connect();
21-
}
22-
23-
// TODO: Assign a proper socket to the user
2415
const socketRoom = createNotifSocket(userId);
25-
const timestamp = `${Date.now()}`;
2616

27-
// Send socket to user first for them to subscribe
28-
res
17+
// Send socket to user for subscription
18+
return res
2919
.status(StatusCodes.OK)
3020
.json({
3121
socketPort: socketRoom,
32-
requestId: timestamp, // Queue ID
3322
})
3423
.end();
35-
36-
// Added time buffer for matcher worker, but may be insufficient (especially if <1s is left)
37-
await queueingService(redisClient, {
38-
userId,
39-
difficulty,
40-
topic,
41-
socketPort: socketRoom,
42-
timestamp,
43-
});
44-
45-
logQueueStatus(logger, redisClient, `Queue Status Before Matching: <PLACEHOLDER>`);
4624
};

backend/matching/src/types/index.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,22 @@ const DIFFICULTIES = ['Easy', 'Medium', 'Hard'] as const;
44

55
export type ITopicDifficulty = (typeof DIFFICULTIES)[number];
66

7-
export type IRequestMatchPayload = {
7+
export type IRequestMatchRESTPayload = {
88
userId: string;
9+
};
10+
11+
export type IRequestMatchWSPayload = {
912
topic: string | string[];
1013
difficulty: string;
1114
};
1215

13-
export type IQueueRequest = Partial<Pick<IRequestMatchPayload, 'topic' | 'difficulty'>> &
14-
Pick<IRequestMatchPayload, 'userId'> & {
16+
export type IRequestMatchEvent = IRequestMatchWSPayload &
17+
IRequestMatchRESTPayload & {
18+
roomId: string;
19+
};
20+
21+
export type IQueueRequest = Partial<IRequestMatchWSPayload> &
22+
IRequestMatchRESTPayload & {
1523
socketPort: string;
1624
timestamp: string;
1725
};

backend/matching/src/workers/cleaner.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { client, logQueueStatus } from '@/lib/db';
22
import { STREAM_CLEANER, STREAM_GROUP, STREAM_NAME } from '@/lib/db/constants';
33
import { decodePoolTicket, getPoolKey } from '@/lib/utils';
4-
import { MATCH_SVC_EVENT } from '@/ws';
4+
import { MATCH_SVC_EVENT } from '@/ws/main';
55

66
import { connectClient, sendNotif } from './common';
77

backend/matching/src/workers/common.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import { client } from '@/lib/db';
44
import { logger } from '@/lib/utils';
5-
import type { IChildProcessMessage, IMatchEvent } from '@/ws';
5+
import type { IChildProcessMessage, IMatchEvent } from '@/ws/main';
66

77
export const sendNotif = (roomIds: Array<string>, event: IMatchEvent, message?: unknown) => {
88
if (process.send) {

backend/matching/src/workers/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import path from 'path';
55
import type { Server } from 'socket.io';
66

77
import { logger } from '@/lib/utils';
8-
import { type IChildProcessMessage, MATCH_SVC_EVENT } from '@/ws';
8+
import { type IChildProcessMessage, MATCH_SVC_EVENT } from '@/ws/main';
99

1010
let nWorkers = 0; // For tracking graceful exit of main process
1111

@@ -28,7 +28,7 @@ export const initWorker = (name: string, io: Server) => {
2828
const { rooms, event, message: payload } = messagePayload as IChildProcessMessage;
2929

3030
if (event === MATCH_SVC_EVENT.DISCONNECT) {
31-
io.sockets.in(rooms).disconnectSockets();
31+
io.sockets.in(rooms).socketsLeave(rooms);
3232
return;
3333
}
3434

backend/matching/src/workers/matcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { client, logQueueStatus } from '@/lib/db';
22
import { POOL_INDEX, STREAM_GROUP, STREAM_NAME, STREAM_WORKER } from '@/lib/db/constants';
33
import { decodePoolTicket, getPoolKey, getStreamId } from '@/lib/utils';
44
import { getMatchItems } from '@/services';
5-
import { MATCH_SVC_EVENT } from '@/ws';
5+
import { MATCH_SVC_EVENT } from '@/ws/main';
66

77
import { connectClient, sendNotif } from './common';
88

backend/matching/src/ws.ts

Lines changed: 0 additions & 64 deletions
This file was deleted.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import type { DefaultEventsMap, Server, Socket } from 'socket.io';
2+
3+
import { client, logQueueStatus } from '@/lib/db';
4+
import { logger } from '@/lib/utils';
5+
import { queueingService } from '@/services';
6+
import type { IRedisClient, IRequestMatchEvent } from '@/types';
7+
8+
import { MATCH_SVC_EVENT } from './main';
9+
10+
type ISocketIOServer<T> = Server<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, T>;
11+
type ISocketIOSocket<T> = Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, T>;
12+
13+
export const EVENTS = {
14+
ERROR: 'ERROR',
15+
JOIN_ROOM: 'joinRoom',
16+
CANCEL_ROOM: 'cancelRoom',
17+
LEAVE_ROOM: 'leave',
18+
START_QUEUE: 'startQueue',
19+
DISCONNECT: 'disconnect',
20+
} as const;
21+
22+
export const joinRoomHandler =
23+
<T>(socket: ISocketIOSocket<T>) =>
24+
(roomId?: string) => {
25+
if (!roomId) {
26+
logger.warn('joinRoom event received without a roomId');
27+
return;
28+
}
29+
30+
socket.join(roomId);
31+
logger.info(`Socket ${socket.id} joined room: ${roomId}`);
32+
socket.emit('joinedRoom', roomId);
33+
};
34+
35+
export const cancelRoomHandler =
36+
<S, T>(io: ISocketIOServer<S>, socket: ISocketIOSocket<T>) =>
37+
(roomId?: string) => {
38+
if (roomId) {
39+
io.in(roomId).socketsLeave(roomId);
40+
logger.info(`Room ${roomId} has been cancelled and closed.`);
41+
socket.emit('roomCancelled', roomId);
42+
} else {
43+
logger.warn('No room ID provided for cancellation');
44+
}
45+
};
46+
47+
let redisClient: IRedisClient;
48+
49+
export const queueEventHandler =
50+
<T>(socket: ISocketIOSocket<T>) =>
51+
async (payload: Partial<IRequestMatchEvent>) => {
52+
// 1. Invalid Room
53+
if (!payload.roomId) {
54+
const errorMessage = 'Queuing Event triggered without room.';
55+
logger.warn(errorMessage);
56+
socket.emit(EVENTS.ERROR, errorMessage);
57+
return;
58+
}
59+
60+
// 2. Invalid Request
61+
const { roomId } = payload;
62+
63+
if (
64+
!payload.userId ||
65+
(!payload.topic && !payload.difficulty) ||
66+
(payload.topic && !Array.isArray(payload.topic))
67+
) {
68+
socket.emit(EVENTS.ERROR, `Payload for ${EVENTS.START_QUEUE} is invalid.`);
69+
return;
70+
}
71+
72+
// 3. Start Queuing
73+
if (!redisClient || !redisClient.isOpen || !redisClient.isReady) {
74+
try {
75+
redisClient = await client.connect();
76+
const { userId, difficulty, topic } = payload;
77+
const timestamp = `${Date.now()}`;
78+
await queueingService(redisClient, {
79+
userId,
80+
difficulty,
81+
topic,
82+
socketPort: roomId,
83+
timestamp,
84+
});
85+
} catch (error) {
86+
const { name, message, stack, cause } = error as Error;
87+
logger.error({ name, message, stack, cause }, `An error occurred connecting to the client`);
88+
socket.emit(EVENTS.ERROR, 'Error connecting to client');
89+
return;
90+
}
91+
}
92+
93+
socket.emit(MATCH_SVC_EVENT.QUEUED);
94+
logQueueStatus(logger, redisClient, `Queue Status Before Matching: <PLACEHOLDER>`);
95+
};

backend/matching/src/ws/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './main';

backend/matching/src/ws/main.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { createServer } from 'http';
2+
3+
import { Server } from 'socket.io';
4+
5+
import { UI_HOST } from '@/config';
6+
import { logger } from '@/lib/utils';
7+
8+
import { cancelRoomHandler, EVENTS, joinRoomHandler, queueEventHandler } from './handlers';
9+
10+
export const createWs = (server: ReturnType<typeof createServer>) => {
11+
const io = new Server(server, {
12+
cors: {
13+
origin: [UI_HOST],
14+
credentials: true,
15+
},
16+
path: '/matching-socket',
17+
});
18+
io.on('connection', (socket) => {
19+
logger.info(`Socket ${socket.id} connected`);
20+
21+
socket.on(EVENTS.JOIN_ROOM, joinRoomHandler(socket));
22+
socket.on(EVENTS.CANCEL_ROOM, cancelRoomHandler(io, socket));
23+
socket.on(EVENTS.LEAVE_ROOM, (room) => {
24+
socket.leave(room);
25+
});
26+
socket.on(EVENTS.DISCONNECT, () => {
27+
logger.info(`Client disconnected: ${socket.id}`);
28+
});
29+
socket.on(EVENTS.START_QUEUE, queueEventHandler(socket));
30+
});
31+
return io;
32+
};
33+
34+
export const MATCH_SVC_EVENT = {
35+
QUEUED: 'QUEUED', // When match joins pool
36+
SUCCESS: 'SUCCESS', // When match successful
37+
FAILED: 'FAILED', // When match failed
38+
PENDING: 'PENDING', // When waiting for match
39+
MATCHING: 'MATCHING', // When matching in progress
40+
DISCONNECT: 'DISCONNECT', // To disconnect all sockets in room
41+
} as const;
42+
export type IMatchEvent = (typeof MATCH_SVC_EVENT)[keyof typeof MATCH_SVC_EVENT];
43+
export type IChildProcessMessage = {
44+
rooms: Array<string>;
45+
event: IMatchEvent;
46+
message?: unknown;
47+
};

0 commit comments

Comments
 (0)