Skip to content

Commit a210c17

Browse files
committed
Refactor redis service
1 parent ebfd967 commit a210c17

File tree

8 files changed

+98
-131
lines changed

8 files changed

+98
-131
lines changed
Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,34 @@
11
import {
22
WebSocketGateway,
33
WebSocketServer,
4-
OnGatewayInit,
5-
ConnectedSocket,
64
SubscribeMessage,
5+
ConnectedSocket,
76
MessageBody,
7+
OnGatewayInit,
88
} from '@nestjs/websockets';
9-
import { Server, Socket } from 'socket.io';
10-
import { RedisService } from './redis.service';
9+
import { Socket, Server } from 'socket.io';
1110
import { ClientProxy } from '@nestjs/microservices';
1211
import { Inject } from '@nestjs/common';
12+
import { firstValueFrom } from 'rxjs';
13+
import { RedisService } from './redis.service';
1314

1415
@WebSocketGateway({ namespace: '/match' })
1516
export class MatchGateway implements OnGatewayInit {
1617
@WebSocketServer() server: Server;
18+
private userSockets: Map<string, string> = new Map();
1719

1820
constructor(
19-
private redisService: RedisService,
2021
@Inject('MATCHING_SERVICE') private matchingClient: ClientProxy,
22+
private redisService: RedisService,
2123
) {}
2224

2325
afterInit() {
24-
// Subscribe to Redis Pub/Sub for user match notifications
25-
this.redisService.subscribeToUserMatchEvents((payload) => {
26-
this.notifyUsers(payload);
26+
// Subscribe to Redis Pub/Sub for match notifications
27+
this.redisService.subscribeToMatchEvents((matchedUsers) => {
28+
this.notifyUsers(matchedUsers);
2729
});
2830
}
2931

30-
// Handle user connection and add them to a room based on their userId
31-
handleConnection(@ConnectedSocket() client: Socket) {
32-
const userId = client.handshake.query.userId; // Extract userId from query
33-
if (userId) {
34-
client.join(userId); // Make the client join a room based on userId
35-
console.log(`User ${userId} connected and joined room ${userId}`);
36-
} else {
37-
console.log('User connected without userId');
38-
}
39-
}
40-
4132
@SubscribeMessage('requestMatch')
4233
async handleRequestMatch(
4334
@ConnectedSocket() client: Socket,
@@ -49,33 +40,59 @@ export class MatchGateway implements OnGatewayInit {
4940
selectedDifficulty: payload.selectedDifficulty,
5041
};
5142

52-
// Emit match request event to the microservice
53-
const timeoutRef = setTimeout(() => {
54-
client.emit('matchTimeout', 'No match found within 30 seconds');
55-
}, 30000); // Timeout after 30 seconds if no match is found
43+
let isMatched = false;
5644

57-
this.matchingClient.emit('match.request', matchPayload).subscribe({
58-
next: () => {
59-
// If a match is found, clear the timeout
60-
clearTimeout(timeoutRef);
61-
client.emit('matchInProgress', 'Finding a match...');
62-
},
63-
error: (err) => {
64-
clearTimeout(timeoutRef); // Also clear the timeout in case of an error
65-
client.emit('matchError', 'Error finding match');
66-
},
67-
});
45+
// Send the match request to the microservice
46+
try {
47+
await firstValueFrom(this.matchingClient.send('match.request', matchPayload));
48+
} catch (error) {
49+
client.emit('matchError', `Error requesting match: ${error.message}`);
50+
return;
51+
}
6852
}
6953

70-
// Notify matched users via WebSocket
71-
notifyUsers(userIds: string[]) {
72-
userIds.forEach((userId, index) => {
73-
const matchedUserId = userIds[index === 0 ? 1 : 0]; // Get the other matched user's ID
74-
console.log(`Notifying user ${userId} about match with ${matchedUserId}`);
75-
this.server.to(userId).emit('matchNotification', {
76-
message: `You have been matched with user ${matchedUserId}`,
77-
matchedUserId: matchedUserId,
54+
// Notify both matched users via WebSocket
55+
notifyUsers(matchedUsers: string[]) {
56+
const [user1, user2] = matchedUsers;
57+
const user1SocketId = this.getUserSocketId(user1);
58+
const user2SocketId = this.getUserSocketId(user2);
59+
if (user1SocketId) {
60+
this.server.to(user1SocketId).emit('matchNotification', {
61+
message: `You have been matched with user ${user2}`,
62+
matchedUserId: user2,
7863
});
79-
});
64+
}
65+
66+
if (user2SocketId) {
67+
this.server.to(user2SocketId).emit('matchNotification', {
68+
message: `You have been matched with user ${user1}`,
69+
matchedUserId: user1,
70+
});
71+
}
72+
}
73+
74+
75+
76+
handleConnection(@ConnectedSocket() client: Socket) {
77+
const userId = client.handshake.query.userId;
78+
if (userId) {
79+
this.userSockets.set(userId as string, client.id);
80+
console.log(`User ${userId} connected with socket ID ${client.id}`);
81+
}
82+
}
83+
84+
handleDisconnect(@ConnectedSocket() client: Socket) {
85+
const userId = [...this.userSockets.entries()].find(
86+
([, socketId]) => socketId === client.id,
87+
)?.[0];
88+
89+
if (userId) {
90+
this.userSockets.delete(userId);
91+
console.log(`User ${userId} disconnected`);
92+
}
93+
}
94+
95+
getUserSocketId(userId: string): string | undefined {
96+
return this.userSockets.get(userId);
8097
}
8198
}

backend/gateway-service/src/modules/match/redis.service.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@ export class RedisService {
66
private redisSubscriber: Redis;
77

88
constructor() {
9-
// Initialize Redis for subscribing to match notifications
109
this.redisSubscriber = new Redis({
11-
host: 'backend-redis-1',
10+
host: 'backend-redis-1', // Your Redis host
1211
port: 6379,
1312
});
1413
}
1514

16-
// Subscribe to Redis Pub/Sub events for match notifications
17-
subscribeToUserMatchEvents(matchCallback: (users: any) => void) {
18-
// Subscribe to the Redis Pub/Sub channel 'matchChannel'
15+
// Subscribe to the Redis Pub/Sub channel for match events
16+
subscribeToMatchEvents(callback: (matchedUsers: any) => void): void {
1917
this.redisSubscriber.subscribe('matchChannel', (err, count) => {
2018
if (err) {
2119
console.error('Error subscribing to Redis channel:', err);
@@ -24,11 +22,11 @@ export class RedisService {
2422
console.log('Subscribed to Redis channel matchChannel.');
2523
});
2624

27-
// Listen for messages on the 'matchChannel' and trigger the callback
25+
// Listen for published messages on the channel
2826
this.redisSubscriber.on('message', (channel, message) => {
2927
if (channel === 'matchChannel') {
30-
const matchedUsers = JSON.parse(message); // Parse the JSON message
31-
matchCallback(matchedUsers); // Call the provided callback with the matched users
28+
const matchedUsers = JSON.parse(message);
29+
callback(matchedUsers);
3230
}
3331
});
3432
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { Controller, Get } from '@nestjs/common';
22
import { AppService } from './app.service';
3-
import { EventPattern, MessagePattern, Payload } from '@nestjs/microservices';
3+
import { MessagePattern, Payload } from '@nestjs/microservices';
44
import { MatchRequestDto } from './dto';
55

66
@Controller()
77
export class AppController {
88
constructor(private readonly appService: AppService) {}
99

10-
@EventPattern('match.request')
10+
@MessagePattern('match.request')
1111
async handleMatchRequest(@Payload() data: MatchRequestDto) {
12-
await this.appService.requestMatch(data);
12+
await this.appService.requestMatch(data);
1313
}
1414
}

backend/matching-service/src/app.module.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { AppService } from './app.service';
44
import { BullModule } from '@nestjs/bull';
55
import { ClientsModule, Transport } from '@nestjs/microservices';
66
import { MatchWorkerService } from './match-worker.service';
7-
import { NotificationService } from './notification.service';
87
import { RedisService } from './redis.service';
98

109
@Module({
@@ -33,7 +32,6 @@ import { RedisService } from './redis.service';
3332
providers: [
3433
AppService,
3534
MatchWorkerService,
36-
NotificationService,
3735
RedisService,
3836
],
3937
})
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
import { Inject, Injectable } from '@nestjs/common';
2-
import { ClientProxy, RpcException } from '@nestjs/microservices';
3-
import { MatchRequestDto } from './dto';
1+
import { Injectable } from '@nestjs/common';
2+
import Redis from 'ioredis';
3+
import { MatchRequestDto } from './dto/match-request.dto';
44
import { RedisService } from './redis.service';
55

66
@Injectable()
77
export class AppService {
8-
constructor(
9-
@Inject('USER_SERVICE') private readonly userClient: ClientProxy,
10-
private redisService: RedisService,
11-
) {}
8+
constructor(private redisService: RedisService) {}
129

13-
async requestMatch(data: MatchRequestDto) {
10+
// Add user to the Redis pool
11+
async requestMatch(data: MatchRequestDto): Promise<void> {
1412
await this.redisService.addUserToPool(data);
1513
}
1614
}
15+
16+

backend/matching-service/src/match-worker.service.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Injectable } from '@nestjs/common';
2-
import { RedisService } from './redis.service';
32
import { MatchRequestDto } from './dto';
3+
import { RedisService } from './redis.service';
4+
import { PriorityQueue } from './helper/priority-queue';
45

56
@Injectable()
67
export class MatchWorkerService {
@@ -9,22 +10,20 @@ export class MatchWorkerService {
910
// Poll for matches at a regular interval
1011
async pollForMatches() {
1112
setInterval(async () => {
12-
const users = await this.redisService.getUsersFromPool(); // Get users from Redis pool
13+
const users = await this.redisService.getUsersFromPool();
1314
console.log('polling', users)
1415

1516
if (users.length >= 2) {
1617
const matches = this.rankUsers(users);
17-
const bestMatch = matches[0]; // Find the best match
18-
19-
console.log(`Matched users: ${bestMatch.user1.userId} and ${bestMatch.user2.userId}`);
18+
const bestMatch = matches[0];
2019

2120
// Notify the gateway via Redis Pub/Sub
2221
await this.notifyGateway([bestMatch.user1.userId, bestMatch.user2.userId]);
2322

2423
// Remove the matched users from the Redis pool
2524
await this.redisService.removeUsersFromPool([bestMatch.user1.userId, bestMatch.user2.userId]);
2625
}
27-
}, 5000); // Poll every 5 seconds
26+
}, 5000);
2827
}
2928

3029
// Ranking logic for matches
@@ -36,7 +35,7 @@ export class MatchWorkerService {
3635
matches.push({ user1: users[i], user2: users[j], score });
3736
}
3837
}
39-
return matches.sort((a, b) => b.score - a.score); // Sort matches by score
38+
return matches.sort((a, b) => b.score - a.score)
4039
}
4140

4241
private calculateScore(user1: MatchRequestDto, user2: MatchRequestDto): number {
@@ -51,7 +50,6 @@ export class MatchWorkerService {
5150

5251
// Notify the gateway service about the match via Redis Pub/Sub
5352
async notifyGateway(matchedUserIds: string[]) {
54-
// Publish matched users to the Redis Pub/Sub channel
5553
await this.redisService.publishMatchedUsers(matchedUserIds);
5654
}
5755
}

backend/matching-service/src/notification.service.ts

Lines changed: 0 additions & 24 deletions
This file was deleted.
Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,49 @@
11
import { Injectable } from '@nestjs/common';
22
import Redis from 'ioredis';
3-
import { MatchRequestDto } from 'src/dto';
3+
import { MatchRequestDto } from './dto/match-request.dto';
44

55
@Injectable()
66
export class RedisService {
7-
private redisPublisher: Redis;
7+
private redisPublisher: Redis;
88
private redisSubscriber: Redis;
99

1010
constructor() {
1111
this.redisPublisher = new Redis({
12-
host: 'backend-redis-1', // Use your Redis host (or Docker network host if in Docker)
12+
host: 'backend-redis-1',
1313
port: 6379,
1414
});
15-
1615
this.redisSubscriber = new Redis({
1716
host: 'backend-redis-1',
1817
port: 6379,
1918
});
2019
}
2120

22-
// Add user to Redis set (matching pool)
23-
async addUserToPool(data: MatchRequestDto) {
21+
// Add user to Redis pool
22+
async addUserToPool(data: MatchRequestDto): Promise<void> {
2423
await this.redisPublisher.sadd('userPool', JSON.stringify(data));
2524
}
2625

27-
// Get users from Redis pool
28-
async getUsersFromPool(): Promise<MatchRequestDto[]> {
26+
// Get users from Redis pool
27+
async getUsersFromPool(): Promise<MatchRequestDto[]> {
2928
const users = await this.redisPublisher.smembers('userPool');
30-
return users.map(user => JSON.parse(user) as MatchRequestDto);
29+
return users.map(user => JSON.parse(user));
3130
}
3231

33-
// Remove users from the Redis pool by userId
34-
async removeUsersFromPool(userIds: string[]) {
35-
const users = await this.getUsersFromPool(); // Get all users in the pool
32+
// Remove users from Redis pool
33+
async removeUsersFromPool(userIds: string[]) {
34+
const users = await this.getUsersFromPool();
3635

3736
// Find and remove users whose userId matches the provided userIds
3837
userIds.forEach(async (userId) => {
3938
const userToRemove = users.find(user => user.userId === userId);
4039
if (userToRemove) {
41-
await this.redisPublisher.srem('userPool', JSON.stringify(userToRemove)); // Remove user from pool
42-
console.log(`Removed user ${userId} from the pool`);
40+
await this.redisPublisher.srem('userPool', JSON.stringify(userToRemove));
4341
}
4442
});
4543
}
4644

47-
// Publish matched users via Redis Pub/Sub
48-
async publishMatchedUsers(matchedUsers: any) {
49-
await this.redisPublisher.publish('matchChannel', JSON.stringify(matchedUsers));
50-
}
51-
52-
// Subscribe to Redis Pub/Sub for user match notifications
53-
subscribeToUserMatchEvents(matchCallback: (users: any) => void) {
54-
this.redisSubscriber.subscribe('matchChannel', (err, count) => {
55-
if (err) {
56-
console.error('Error subscribing to Redis channel:', err);
57-
return;
58-
}
59-
console.log('Subscribed to Redis channel matchChannel.');
60-
});
61-
62-
this.redisSubscriber.on('message', (channel, message) => {
63-
if (channel === 'matchChannel') {
64-
const matchedUsers = JSON.parse(message);
65-
matchCallback(matchedUsers);
66-
}
67-
});
45+
// Publish matched users to Redis Pub/Sub channel
46+
async publishMatchedUsers(matchedUserIds: string[]): Promise<void> {
47+
await this.redisPublisher.publish('matchChannel', JSON.stringify(matchedUserIds));
6848
}
6949
}

0 commit comments

Comments
 (0)