Skip to content

Commit a8d9c7f

Browse files
committed
fix/matching: Add graceful process exit for main process
Signed-off-by: SeeuSim <[email protected]>
1 parent 450b3da commit a8d9c7f

File tree

7 files changed

+63
-52
lines changed

7 files changed

+63
-52
lines changed

backend/matching/src/index.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { logger } from '@/lib/utils';
55
import server, { io } from '@/server';
66
import { initWorker } from '@/workers';
77

8-
const workers: { controller: AbortController; worker: ChildProcess }[] = [];
8+
const workers: ChildProcess[] = [];
99

1010
const port = Number.parseInt(EXPRESS_PORT || '8001');
1111

@@ -18,15 +18,17 @@ server.listen(port, () => {
1818
});
1919

2020
const shutdown = () => {
21-
workers.forEach(({ controller, worker }) => {
22-
controller.abort('Main process shutdown'); // Force an abort error to kill child processes
23-
worker.kill();
24-
});
25-
io.close(() => {
26-
logger.info('WS Server shut down');
27-
});
2821
server.close(() => {
29-
logger.info('App shut down');
22+
workers.forEach((worker) => {
23+
worker.kill();
24+
});
25+
void io
26+
.close(() => {
27+
logger.info('WS Server shut down');
28+
})
29+
.then(() => {
30+
logger.info('App shut down');
31+
});
3032
});
3133
};
3234

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
21
const path = require('path');
32

43
require('ts-node').register();
5-
require(path.resolve(__dirname, './cleaner.ts'));
4+
require(path.resolve(__dirname, './cleaner.ts'));

backend/matching/src/workers/cleaner.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ const cancel = () => {
1919
};
2020
const shutdown = () => {
2121
cancel();
22-
client
23-
.disconnect()
24-
.then(() => client.quit())
25-
.then(process.exit(0));
22+
client.disconnect().then(() => {
23+
process.exit(0);
24+
});
2625
};
2726

28-
process.on('exit', shutdown);
2927
process.on('SIGINT', shutdown);
3028
process.on('SIGTERM', shutdown);
29+
process.on('exit', shutdown);
3130

3231
async function clean() {
3332
const redisClient = await connectClient(client);

backend/matching/src/workers/common.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// CHILD PROCESS UTIL LIB
2+
13
import { client } from '@/lib/db';
24
import { logger } from '@/lib/utils';
35
import type { IChildProcessMessage, IMatchEvent } from '@/ws';

backend/matching/src/workers/index.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// MAIN PROCESS
12
import { fork } from 'child_process';
23
import path from 'path';
34

@@ -6,10 +7,11 @@ import type { Server } from 'socket.io';
67
import { logger } from '@/lib/utils';
78
import { MATCH_SVC_EVENT, type IChildProcessMessage } from '@/ws';
89

10+
let nWorkers = 0; // For tracking graceful exit of main process
911
export const initWorker = (name: string, io: Server) => {
10-
const controller = new AbortController();
1112
const lCaseName = name.toLowerCase();
12-
const worker = fork(path.join(__dirname, `${lCaseName}.js`), { signal: controller.signal });
13+
const worker = fork(path.join(__dirname, `${lCaseName}.js`));
14+
nWorkers += 1;
1315
const upperCaseName = name.replace(/^[A-Za-z]/, (c) => c.toUpperCase());
1416
worker.on('message', (message) => {
1517
if (typeof message.valueOf() === 'string') {
@@ -27,6 +29,11 @@ export const initWorker = (name: string, io: Server) => {
2729
});
2830
worker.on('exit', (code) => {
2931
logger.error(`${upperCaseName} exited with code ${code}.`);
32+
nWorkers -= 1;
33+
if (nWorkers === 0) {
34+
logger.info('Main Process exiting.');
35+
process.exit(0);
36+
}
3037
});
31-
return { worker, controller };
38+
return worker;
3239
};
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
21
const path = require('path');
32

43
require('ts-node').register();
5-
require(path.resolve(__dirname, './matcher.ts'));
4+
require(path.resolve(__dirname, './matcher.ts'));

backend/matching/src/workers/matcher.ts

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ const cancel = () => {
1919
};
2020
const shutdown = () => {
2121
cancel();
22-
client
23-
.disconnect()
24-
.then(() => client.quit())
25-
.then(process.exit(0));
22+
client.disconnect().then(() => {
23+
process.exit(0);
24+
});
2625
};
2726

28-
process.on('exit', shutdown);
2927
process.on('SIGINT', shutdown);
3028
process.on('SIGTERM', shutdown);
29+
process.on('exit', shutdown);
3130

3231
type RequestorParams = {
3332
requestorUserId: string;
@@ -42,32 +41,36 @@ async function processMatch(
4241
searchIdentifier?: string
4342
) {
4443
if (matches.total > 0) {
45-
const matched = matches.documents[0];
46-
const {
47-
userId: matchedUserId,
48-
timestamp, // We use timestamp as the Stream ID
49-
socketPort: matchedSocketPort,
50-
} = decodePoolTicket(matched);
51-
52-
// To block cancellation
53-
sendNotif([matchedSocketPort], MATCH_SVC_EVENT.MATCHING);
54-
55-
const matchedStreamId = getStreamId(timestamp);
56-
57-
logger.info(`Found match: ${JSON.stringify(matched)}`);
58-
59-
await Promise.all([
60-
// Remove other from pool
61-
redisClient.del([getPoolKey(requestorUserId), getPoolKey(matchedUserId)]),
62-
// Remove other from queue
63-
redisClient.xDel(STREAM_NAME, [requestorStreamId, matchedStreamId]),
64-
]);
65-
66-
// Notify both sockets
67-
const { ...matchItems } = getMatchItems();
68-
sendNotif([requestorSocketPort, matchedSocketPort], MATCH_SVC_EVENT.SUCCESS, matchItems);
69-
sendNotif([requestorSocketPort, matchedSocketPort], MATCH_SVC_EVENT.DISCONNECT);
70-
return true;
44+
for (const matched of matches.documents) {
45+
const {
46+
userId: matchedUserId,
47+
timestamp, // We use timestamp as the Stream ID
48+
socketPort: matchedSocketPort,
49+
} = decodePoolTicket(matched);
50+
if (matchedUserId === requestorUserId) {
51+
continue;
52+
}
53+
54+
// To block cancellation
55+
sendNotif([matchedSocketPort], MATCH_SVC_EVENT.MATCHING);
56+
57+
const matchedStreamId = getStreamId(timestamp);
58+
59+
logger.info(`Found match: ${JSON.stringify(matched)}`);
60+
61+
await Promise.all([
62+
// Remove other from pool
63+
redisClient.del([getPoolKey(requestorUserId), getPoolKey(matchedUserId)]),
64+
// Remove other from queue
65+
redisClient.xDel(STREAM_NAME, [requestorStreamId, matchedStreamId]),
66+
]);
67+
68+
// Notify both sockets
69+
const { ...matchItems } = getMatchItems();
70+
sendNotif([requestorSocketPort, matchedSocketPort], MATCH_SVC_EVENT.SUCCESS, matchItems);
71+
sendNotif([requestorSocketPort, matchedSocketPort], MATCH_SVC_EVENT.DISCONNECT);
72+
return true;
73+
}
7174
}
7275

7376
logger.info(`Found no matches` + (searchIdentifier ? ` for ${searchIdentifier}` : ''));

0 commit comments

Comments
 (0)