Skip to content

Commit 7377db7

Browse files
committed
PEER-232: Add queuing event schema
Signed-off-by: SeeuSim <[email protected]>
1 parent 16703c4 commit 7377db7

File tree

10 files changed

+55
-48
lines changed

10 files changed

+55
-48
lines changed

backend/matching/src/types/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { client } from '@/lib/db';
2+
import { MATCHING_EVENT } from '@/ws/events';
23

34
const DIFFICULTIES = ['Easy', 'Medium', 'Hard'] as const;
45

@@ -36,3 +37,10 @@ export type IStreamMessage = {
3637
};
3738
value?: Awaited<ReturnType<(typeof client)['ft']['search']>>['documents'][number]['value'];
3839
};
40+
41+
export type IMatchEvent = (typeof MATCHING_EVENT)[keyof typeof MATCHING_EVENT];
42+
export type IChildProcessMessage = {
43+
rooms: Array<string>;
44+
event: IMatchEvent;
45+
message?: unknown;
46+
};

backend/matching/src/workers/cleaner.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { client, logQueueStatus } from '@/lib/db';
22
import { STREAM_CLEANER, STREAM_GROUP, STREAM_NAME } from '@/lib/db/constants';
33
import { decodePoolTicket, getPoolKey } from '@/lib/utils';
4-
import { MATCH_SVC_EVENT } from '@/ws/main';
4+
import { MATCHING_EVENT } from '@/ws/events';
55

66
import { connectClient, sendNotif } from './common';
77

@@ -65,8 +65,8 @@ async function clean() {
6565

6666
if (socketRoom) {
6767
// Notify client
68-
sendNotif([socketRoom], MATCH_SVC_EVENT.FAILED);
69-
sendNotif([socketRoom], MATCH_SVC_EVENT.DISCONNECT);
68+
sendNotif([socketRoom], MATCHING_EVENT.FAILED);
69+
sendNotif([socketRoom], MATCHING_EVENT.DISCONNECT);
7070
}
7171

7272
await logQueueStatus(logger, redisClient, `Queue Status after Expiring Request: <PLACEHOLDER>`);

backend/matching/src/workers/common.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import { client } from '@/lib/db';
44
import { logger } from '@/lib/utils';
5-
import type { IChildProcessMessage, IMatchEvent } from '@/ws/main';
5+
import type { IChildProcessMessage, IMatchEvent } from '@/types';
66

77
export const sendNotif = (roomIds: Array<string>, event: IMatchEvent, message?: unknown) => {
88
if (process.send) {

backend/matching/src/workers/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import path from 'path';
55
import type { Server } from 'socket.io';
66

77
import { logger } from '@/lib/utils';
8-
import { type IChildProcessMessage, MATCH_SVC_EVENT } from '@/ws/main';
8+
import type { IChildProcessMessage } from '@/types';
9+
import { MATCHING_EVENT } from '@/ws/events';
910

1011
let nWorkers = 0; // For tracking graceful exit of main process
1112

@@ -27,7 +28,7 @@ export const initWorker = (name: string, io: Server) => {
2728
);
2829
const { rooms, event, message: payload } = messagePayload as IChildProcessMessage;
2930

30-
if (event === MATCH_SVC_EVENT.DISCONNECT) {
31+
if (event === MATCHING_EVENT.DISCONNECT) {
3132
io.sockets.in(rooms).socketsLeave(rooms);
3233
return;
3334
}

backend/matching/src/workers/matcher.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { client, logQueueStatus } from '@/lib/db';
22
import { POOL_INDEX, STREAM_GROUP, STREAM_NAME, STREAM_WORKER } from '@/lib/db/constants';
33
import { decodePoolTicket, getPoolKey, getStreamId } from '@/lib/utils';
44
import { getMatchItems } from '@/services';
5-
import { MATCH_SVC_EVENT } from '@/ws/main';
5+
import { MATCHING_EVENT } from '@/ws/events';
66

77
import { connectClient, sendNotif } from './common';
88

@@ -56,7 +56,7 @@ async function processMatch(
5656
}
5757

5858
// To block cancellation
59-
sendNotif([matchedSocketPort], MATCH_SVC_EVENT.MATCHING);
59+
sendNotif([matchedSocketPort], MATCHING_EVENT.MATCHING);
6060

6161
const matchedStreamId = getStreamId(timestamp);
6262

@@ -73,8 +73,8 @@ async function processMatch(
7373
const { ...matchItems } = getMatchItems({ userId1: requestorUserId, userId2: matchedUserId });
7474

7575
const sendMatchLogic = (socketPort: string) => {
76-
sendNotif([socketPort], MATCH_SVC_EVENT.SUCCESS, matchItems);
77-
sendNotif([socketPort], MATCH_SVC_EVENT.DISCONNECT);
76+
sendNotif([socketPort], MATCHING_EVENT.SUCCESS, matchItems);
77+
sendNotif([socketPort], MATCHING_EVENT.DISCONNECT);
7878
};
7979

8080
// TODO: If client disconnected, stop sending to requestor
@@ -127,7 +127,7 @@ async function match() {
127127
} = decodePoolTicket(matchRequest);
128128

129129
// To Block Cancellation
130-
sendNotif([requestorSocketPort], MATCH_SVC_EVENT.MATCHING);
130+
sendNotif([requestorSocketPort], MATCHING_EVENT.MATCHING);
131131

132132
const clause = [`-@userId:(${requestorUserId})`];
133133

@@ -190,7 +190,7 @@ async function match() {
190190

191191
if (!hasDifficultyMatch) {
192192
// To allow cancellation
193-
sendNotif([requestorSocketPort], MATCH_SVC_EVENT.PENDING);
193+
sendNotif([requestorSocketPort], MATCHING_EVENT.PENDING);
194194
}
195195
}
196196
}

backend/matching/src/ws/events.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
export const WS_EVENT = {
2+
JOIN_ROOM: 'joinRoom',
3+
CANCEL_ROOM: 'cancelRoom',
4+
LEAVE_ROOM: 'leave',
5+
START_QUEUING: 'startQueuing',
6+
DISCONNECT: 'disconnect',
7+
} as const;
8+
9+
export const MATCHING_EVENT = {
10+
ERROR: 'ERROR', // When match encounters error
11+
QUEUED: 'QUEUED', // When match joins pool
12+
MATCHING: 'MATCHING', // When matching in progress
13+
PENDING: 'PENDING', // When waiting for match
14+
SUCCESS: 'SUCCESS', // When match successful
15+
FAILED: 'FAILED', // When match failed
16+
DISCONNECT: 'DISCONNECT', // To disconnect all sockets in room
17+
} as const;

backend/matching/src/ws/handlers.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,11 @@ import { logger } from '@/lib/utils';
55
import { queueingService } from '@/services';
66
import type { IRedisClient, IRequestMatchEvent } from '@/types';
77

8-
import { MATCH_SVC_EVENT } from './main';
8+
import { MATCHING_EVENT,WS_EVENT } from './events';
99

1010
type ISocketIOServer<T> = Server<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, T>;
1111
type ISocketIOSocket<T> = Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, T>;
1212

13-
export const EVENTS = {
14-
ERROR: 'ERROR',
15-
JOIN_ROOM: 'joinRoom',
16-
CANCEL_ROOM: 'cancelRoom',
17-
LEAVE_ROOM: 'leave',
18-
START_QUEUE: 'startQueue',
19-
DISCONNECT: 'disconnect',
20-
} as const;
21-
2213
export const joinRoomHandler =
2314
<T>(socket: ISocketIOSocket<T>) =>
2415
(roomId?: string) => {
@@ -53,7 +44,7 @@ export const queueEventHandler =
5344
if (!payload.roomId) {
5445
const errorMessage = 'Queuing Event triggered without room.';
5546
logger.warn(errorMessage);
56-
socket.emit(EVENTS.ERROR, errorMessage);
47+
socket.emit(MATCHING_EVENT.ERROR, errorMessage);
5748
return;
5849
}
5950

@@ -65,7 +56,7 @@ export const queueEventHandler =
6556
(!payload.topic && !payload.difficulty) ||
6657
(payload.topic && !Array.isArray(payload.topic))
6758
) {
68-
socket.emit(EVENTS.ERROR, `Payload for ${EVENTS.START_QUEUE} is invalid.`);
59+
socket.emit(MATCHING_EVENT.ERROR, `Payload for ${WS_EVENT.START_QUEUING} is invalid.`);
6960
return;
7061
}
7162

@@ -85,11 +76,11 @@ export const queueEventHandler =
8576
} catch (error) {
8677
const { name, message, stack, cause } = error as Error;
8778
logger.error({ name, message, stack, cause }, `An error occurred connecting to the client`);
88-
socket.emit(EVENTS.ERROR, 'Error connecting to client');
79+
socket.emit(MATCHING_EVENT.ERROR, 'Error connecting to client');
8980
return;
9081
}
9182
}
9283

93-
socket.emit(MATCH_SVC_EVENT.QUEUED);
84+
socket.emit(MATCHING_EVENT.QUEUED);
9485
logQueueStatus(logger, redisClient, `Queue Status Before Matching: <PLACEHOLDER>`);
9586
};

backend/matching/src/ws/main.ts

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { Server } from 'socket.io';
55
import { UI_HOST } from '@/config';
66
import { logger } from '@/lib/utils';
77

8-
import { cancelRoomHandler, EVENTS, joinRoomHandler, queueEventHandler } from './handlers';
8+
import { WS_EVENT } from './events';
9+
import { cancelRoomHandler, joinRoomHandler, queueEventHandler } from './handlers';
910

1011
export const createWs = (server: ReturnType<typeof createServer>) => {
1112
const io = new Server(server, {
@@ -18,30 +19,18 @@ export const createWs = (server: ReturnType<typeof createServer>) => {
1819
io.on('connection', (socket) => {
1920
logger.info(`Socket ${socket.id} connected`);
2021

21-
socket.on(EVENTS.JOIN_ROOM, joinRoomHandler(socket));
22-
socket.on(EVENTS.CANCEL_ROOM, cancelRoomHandler(io, socket));
23-
socket.on(EVENTS.LEAVE_ROOM, (room) => {
24-
socket.leave(room);
22+
socket.on(WS_EVENT.JOIN_ROOM, joinRoomHandler(socket));
23+
socket.on(WS_EVENT.CANCEL_ROOM, cancelRoomHandler(io, socket));
24+
socket.on(WS_EVENT.LEAVE_ROOM, (room?: string) => {
25+
if (room) {
26+
socket.leave(room);
27+
}
2528
});
26-
socket.on(EVENTS.DISCONNECT, () => {
29+
socket.on(WS_EVENT.START_QUEUING, queueEventHandler(socket));
30+
socket.on(WS_EVENT.DISCONNECT, () => {
2731
logger.info(`Client disconnected: ${socket.id}`);
32+
socket.disconnect();
2833
});
29-
socket.on(EVENTS.START_QUEUE, queueEventHandler(socket));
3034
});
3135
return io;
3236
};
33-
34-
export const MATCH_SVC_EVENT = {
35-
QUEUED: 'QUEUED', // When match joins pool
36-
SUCCESS: 'SUCCESS', // When match successful
37-
FAILED: 'FAILED', // When match failed
38-
PENDING: 'PENDING', // When waiting for match
39-
MATCHING: 'MATCHING', // When matching in progress
40-
DISCONNECT: 'DISCONNECT', // To disconnect all sockets in room
41-
} as const;
42-
export type IMatchEvent = (typeof MATCH_SVC_EVENT)[keyof typeof MATCH_SVC_EVENT];
43-
export type IChildProcessMessage = {
44-
rooms: Array<string>;
45-
event: IMatchEvent;
46-
message?: unknown;
47-
};

frontend/src/lib/ws/events.ts

Whitespace-only changes.

frontend/src/lib/ws/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './events';

0 commit comments

Comments
 (0)