Skip to content

Commit aadabcc

Browse files
committed
PEER-222: Add IPC for ws message sending
Signed-off-by: SeeuSim <[email protected]>
1 parent 40f573e commit aadabcc

File tree

6 files changed

+63
-13
lines changed

6 files changed

+63
-13
lines changed

backend/matching/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ const port = Number.parseInt(EXPRESS_PORT || '8001');
1212
const listenMessage = `App listening on port: ${port}`;
1313
server.listen(port, () => {
1414
logger.info(listenMessage);
15-
['Cleaner', 'Matcher'].map(initWorker).forEach((process) => workers.push(process));
15+
['Cleaner', 'Matcher']
16+
.map((name) => initWorker(name, io))
17+
.forEach((process) => workers.push(process));
1618
});
1719

1820
const shutdown = () => {

backend/matching/src/workers/cleaner.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { client } from '@/lib/db';
22
import { STREAM_CLEANER, STREAM_GROUP, STREAM_NAME } from '@/lib/db/constants';
33
import { decodePoolTicket, getPoolKey } from '@/lib/utils';
4-
import { io } from '@/server';
4+
import { MATCH_SVC_EVENT } from '@/ws';
5+
import { sendNotif } from './common';
56

67
const logger = {
78
info: (message: unknown) => process.send && process.send(message),
@@ -60,8 +61,8 @@ async function clean() {
6061

6162
if (socketRoom) {
6263
// Notify client
63-
io.sockets.in(socketRoom).emit('FAILED');
64-
io.sockets.in(socketRoom).disconnectSockets();
64+
sendNotif([socketRoom], MATCH_SVC_EVENT.FAILED);
65+
sendNotif([socketRoom], MATCH_SVC_EVENT.DISCONNECT);
6566
}
6667
}
6768
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import type { IChildProcessMessage, IMatchEvent } from '@/ws';
2+
3+
export const sendNotif = (roomIds: Array<string>, event: IMatchEvent, message?: unknown) => {
4+
if (process.send) {
5+
const payload: IChildProcessMessage = {
6+
rooms: roomIds,
7+
event,
8+
message,
9+
};
10+
process.send(payload);
11+
}
12+
};

backend/matching/src/workers/index.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
import { fork } from 'child_process';
22
import path from 'path';
3+
4+
import type { Server } from 'socket.io';
5+
36
import { logger } from '@/lib/utils';
7+
import { MATCH_SVC_EVENT, type IChildProcessMessage } from '@/ws';
48

5-
export const initWorker = (name: string) => {
9+
export const initWorker = (name: string, io: Server) => {
610
const lCaseName = name.toLowerCase();
711
const worker = fork(path.join(__dirname, `${lCaseName}.js`));
812
const upperCaseName = name.replace(/^[A-Za-z]/, (c) => c.toUpperCase());
913
worker.on('message', (message) => {
10-
logger.info(`[${upperCaseName}]: ${message}`);
14+
logger.info(`Received message from '${upperCaseName}': ${JSON.stringify(message)}`);
15+
16+
const { rooms, event, message: payload } = message as IChildProcessMessage;
17+
if (event === MATCH_SVC_EVENT.DISCONNECT) {
18+
io.sockets.in(rooms).disconnectSockets();
19+
return;
20+
}
21+
io.sockets.in(rooms).emit(event, payload);
1122
});
1223
worker.on('exit', (code) => {
1324
logger.error(`${upperCaseName} exited with code ${code}.`);

backend/matching/src/workers/matcher.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { client } from '@/lib/db';
22
import { POOL_INDEX, STREAM_GROUP, STREAM_NAME, STREAM_WORKER } from '@/lib/db/constants';
33
import { decodePoolTicket, getPoolKey, getStreamId } from '@/lib/utils';
4-
import { io } from '@/server';
54
import { getMatchItems } from '@/services';
5+
import { MATCH_SVC_EVENT } from '@/ws';
6+
import { sendNotif } from './common';
67

78
const logger = {
89
info: (message: unknown) => process.send && process.send(message),
@@ -48,7 +49,7 @@ async function processMatch(
4849
} = decodePoolTicket(matched);
4950

5051
// To block cancellation
51-
io.sockets.in([matchedSocketPort]).emit('Matching');
52+
sendNotif([matchedSocketPort], MATCH_SVC_EVENT.MATCHING);
5253

5354
const matchedStreamId = getStreamId(timestamp);
5455

@@ -63,7 +64,8 @@ async function processMatch(
6364

6465
// Notify both sockets
6566
const { ...matchItems } = getMatchItems();
66-
io.sockets.in([requestorSocketPort, matchedSocketPort]).emit(JSON.stringify(matchItems));
67+
sendNotif([requestorSocketPort, matchedSocketPort], MATCH_SVC_EVENT.SUCCESS, matchItems);
68+
sendNotif([requestorSocketPort, matchedSocketPort], MATCH_SVC_EVENT.DISCONNECT);
6769
return true;
6870
}
6971

@@ -105,7 +107,7 @@ async function match() {
105107
} = decodePoolTicket(matchRequest);
106108

107109
// To Block Cancellation
108-
io.sockets.in([requestorSocketPort]).emit('Matching');
110+
sendNotif([requestorSocketPort], MATCH_SVC_EVENT.MATCHING);
109111

110112
const clause = [`-@userId:(${requestorUserId})`];
111113
if (difficulty) {
@@ -164,7 +166,7 @@ async function match() {
164166

165167
if (!hasDifficultyMatch) {
166168
// To allow cancellation
167-
io.sockets.in(requestorSocketPort).emit('Waiting');
169+
sendNotif([requestorSocketPort], MATCH_SVC_EVENT.PENDING);
168170
}
169171
}
170172
}

backend/matching/src/ws.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,37 @@
11
import http from 'http';
22
import { Server } from 'socket.io';
3+
import { logger } from './lib/utils';
34

45
export const createWs = (server: ReturnType<(typeof http)['createServer']>) => {
56
const io = new Server(server);
6-
io.sockets.on('connection', (socket) => {
7-
socket.on('create', (room) => {
7+
io.on('connection', (socket) => {
8+
logger.info(`${socket.id} connected`);
9+
10+
socket.on('joinRoom', (room) => {
811
socket.join(room);
12+
logger.info(`Client joined room: ${room}`);
13+
socket.to(room).emit('message', `A new user has joined room: ${room}`);
914
});
15+
// socket.on('create', (room) => {
16+
// socket.join(room);
17+
// });
1018
socket.on('leave', (room) => {
1119
socket.leave(room);
1220
});
1321
});
1422
return io;
1523
};
24+
25+
export const MATCH_SVC_EVENT = {
26+
SUCCESS: 'SUCCESS', // When match successful
27+
FAILED: 'FAILED', // When match failed
28+
PENDING: 'PENDING', // When waiting for match
29+
MATCHING: 'MATCHING', // When matching in progress
30+
DISCONNECT: 'DISCONNECT', // To disconnect all sockets in room
31+
} as const;
32+
export type IMatchEvent = (typeof MATCH_SVC_EVENT)[keyof typeof MATCH_SVC_EVENT];
33+
export type IChildProcessMessage = {
34+
rooms: Array<string>;
35+
event: IMatchEvent;
36+
message?: unknown;
37+
};

0 commit comments

Comments
 (0)