Skip to content

Commit 8c599a8

Browse files
committed
Refactor redis service
1 parent a210c17 commit 8c599a8

File tree

6 files changed

+77
-16
lines changed

6 files changed

+77
-16
lines changed

backend/gateway-service/src/modules/match/match.controller.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { ClientProxy } from '@nestjs/microservices';
1111
import { Inject } from '@nestjs/common';
1212
import { firstValueFrom } from 'rxjs';
1313
import { RedisService } from './redis.service';
14+
import { MatchRequestDto } from './dto';
15+
import { MATCH_SUCCESS, MATCH_TIMEOUT } from './match.event';
1416

1517
@WebSocketGateway({ namespace: '/match' })
1618
export class MatchGateway implements OnGatewayInit {
@@ -25,23 +27,25 @@ export class MatchGateway implements OnGatewayInit {
2527
afterInit() {
2628
// Subscribe to Redis Pub/Sub for match notifications
2729
this.redisService.subscribeToMatchEvents((matchedUsers) => {
28-
this.notifyUsers(matchedUsers);
30+
this.notifyUsersWithMatch(matchedUsers);
31+
});
32+
33+
this.redisService.subscribeToTimeoutEvents((timedOutUsers) => {
34+
this.notifyUsersWithTimeout(timedOutUsers);
2935
});
3036
}
3137

32-
@SubscribeMessage('requestMatch')
38+
@SubscribeMessage('matchRequest')
3339
async handleRequestMatch(
3440
@ConnectedSocket() client: Socket,
35-
@MessageBody() payload: any,
41+
@MessageBody() payload: MatchRequestDto,
3642
) {
3743
const matchPayload = {
3844
userId: payload.userId,
3945
selectedTopic: payload.selectedTopic,
4046
selectedDifficulty: payload.selectedDifficulty,
4147
};
4248

43-
let isMatched = false;
44-
4549
// Send the match request to the microservice
4650
try {
4751
await firstValueFrom(this.matchingClient.send('match.request', matchPayload));
@@ -52,26 +56,36 @@ export class MatchGateway implements OnGatewayInit {
5256
}
5357

5458
// Notify both matched users via WebSocket
55-
notifyUsers(matchedUsers: string[]) {
59+
notifyUsersWithMatch(matchedUsers: string[]) {
5660
const [user1, user2] = matchedUsers;
5761
const user1SocketId = this.getUserSocketId(user1);
5862
const user2SocketId = this.getUserSocketId(user2);
5963
if (user1SocketId) {
60-
this.server.to(user1SocketId).emit('matchNotification', {
64+
this.server.to(user1SocketId).emit(MATCH_SUCCESS, {
6165
message: `You have been matched with user ${user2}`,
6266
matchedUserId: user2,
6367
});
6468
}
6569

6670
if (user2SocketId) {
67-
this.server.to(user2SocketId).emit('matchNotification', {
71+
this.server.to(user2SocketId).emit(MATCH_SUCCESS, {
6872
message: `You have been matched with user ${user1}`,
6973
matchedUserId: user1,
7074
});
7175
}
7276
}
7377

74-
78+
notifyUsersWithTimeout(timedOutUsers: string[]) {
79+
timedOutUsers.forEach(user => {
80+
const socketId = this.getUserSocketId(user);
81+
if (socketId) {
82+
this.server.to(socketId).emit(MATCH_TIMEOUT, {
83+
message: `You have been timed out.`,
84+
timedOutUserId: user,
85+
});
86+
}
87+
});
88+
}
7589

7690
handleConnection(@ConnectedSocket() client: Socket) {
7791
const userId = client.handshake.query.userId;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export const MATCH_TIMEOUT = 'matchTimeout';
2+
export const MATCH_SUCCESS = 'matchSuccess';
3+

backend/gateway-service/src/modules/match/redis.service.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export class RedisService {
77

88
constructor() {
99
this.redisSubscriber = new Redis({
10-
host: 'backend-redis-1', // Your Redis host
10+
host: 'backend-redis-1',
1111
port: 6379,
1212
});
1313
}
@@ -30,4 +30,21 @@ export class RedisService {
3030
}
3131
});
3232
}
33+
34+
subscribeToTimeoutEvents(callback: (matchedUsers: any) => void): void {
35+
this.redisSubscriber.subscribe('timeoutChannel', (err, count) => {
36+
if (err) {
37+
console.error('Error subscribing to Redis channel:', err);
38+
return;
39+
}
40+
console.log('Subscribed to Redis channel timeoutChannel.');
41+
});
42+
43+
this.redisSubscriber.on('message', (channel, message) => {
44+
if (channel === 'timeoutChannel') {
45+
const timedOutUsers = JSON.parse(message);
46+
callback(timedOutUsers);
47+
}
48+
});
49+
}
3350
}

backend/matching-service/src/interfaces/match-job.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export interface MatchJob {
33
userProficiency: string;
44
selectedTopic: string[];
55
selectedDifficulty: string;
6+
timestamp: number;
67
}

backend/matching-service/src/match-worker.service.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,20 @@ export class MatchWorkerService {
1111
async pollForMatches() {
1212
setInterval(async () => {
1313
const users = await this.redisService.getUsersFromPool();
14-
console.log('polling', users)
14+
const currentTime = Date.now();
15+
const timeout = 29000; // Slightly less than 30 seconds to account for processing time
16+
console.log('Polling', users);
17+
// Filter out users who have timed out
18+
const activeUsers = users.filter(user => currentTime - user.timestamp < timeout);
1519

16-
if (users.length >= 2) {
17-
const matches = this.rankUsers(users);
20+
if (activeUsers.length < users.length) {
21+
const timedOutUsers = users.filter(user => currentTime - user.timestamp >= timeout);
22+
await this.notifyGatewayTimeout(timedOutUsers.map(user => user.userId));
23+
await this.redisService.removeUsersFromPool(timedOutUsers.map(user => user.userId));
24+
}
25+
26+
if (activeUsers.length >= 2) {
27+
const matches = this.rankUsers(activeUsers);
1828
const bestMatch = matches[0];
1929

2030
// Notify the gateway via Redis Pub/Sub
@@ -23,7 +33,7 @@ export class MatchWorkerService {
2333
// Remove the matched users from the Redis pool
2434
await this.redisService.removeUsersFromPool([bestMatch.user1.userId, bestMatch.user2.userId]);
2535
}
26-
}, 5000);
36+
}, 20000);
2737
}
2838

2939
// Ranking logic for matches
@@ -52,5 +62,9 @@ export class MatchWorkerService {
5262
async notifyGateway(matchedUserIds: string[]) {
5363
await this.redisService.publishMatchedUsers(matchedUserIds);
5464
}
65+
66+
async notifyGatewayTimeout(userIds: string[]) {
67+
await this.redisService.publishTimedOutUsers(userIds);
68+
}
5569
}
5670

backend/matching-service/src/redis.service.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Injectable } from '@nestjs/common';
22
import Redis from 'ioredis';
33
import { MatchRequestDto } from './dto/match-request.dto';
4+
import { MatchJob } from './interfaces/match-job.interface';
45

56
@Injectable()
67
export class RedisService {
@@ -20,11 +21,18 @@ export class RedisService {
2021

2122
// Add user to Redis pool
2223
async addUserToPool(data: MatchRequestDto): Promise<void> {
23-
await this.redisPublisher.sadd('userPool', JSON.stringify(data));
24+
const payload: MatchJob = {
25+
userId: data.userId,
26+
userProficiency: 'beginner',
27+
selectedTopic: data.selectedTopic,
28+
selectedDifficulty: data.selectedDifficulty,
29+
timestamp: Date.now(),
30+
};
31+
await this.redisPublisher.sadd('userPool', JSON.stringify(payload));
2432
}
2533

2634
// Get users from Redis pool
27-
async getUsersFromPool(): Promise<MatchRequestDto[]> {
35+
async getUsersFromPool(): Promise<MatchJob[]> {
2836
const users = await this.redisPublisher.smembers('userPool');
2937
return users.map(user => JSON.parse(user));
3038
}
@@ -46,4 +54,8 @@ export class RedisService {
4654
async publishMatchedUsers(matchedUserIds: string[]): Promise<void> {
4755
await this.redisPublisher.publish('matchChannel', JSON.stringify(matchedUserIds));
4856
}
57+
58+
async publishTimedOutUsers(timedOutUserIds: string[]): Promise<void> {
59+
await this.redisPublisher.publish('timeoutChannel', JSON.stringify(timedOutUserIds));
60+
}
4961
}

0 commit comments

Comments
 (0)