Skip to content

Commit d259255

Browse files
committed
handle removing user from quque
1 parent 2b6b27e commit d259255

File tree

4 files changed

+166
-2
lines changed

4 files changed

+166
-2
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import WebSocket from "ws";
2+
3+
export interface CustomWebSocket extends WebSocket {
4+
userId?: string;
5+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import WebSocket from "ws";
2+
import http from "http";
3+
import { CustomWebSocket } from "../model/CustomWebSocket";
4+
import redisClient from "../config/redisConfig";
5+
import { removeUserFromKey } from "../model/userModel";
6+
7+
class WebSocketService {
8+
private wss: WebSocket.Server;
9+
private clients: Map<string, CustomWebSocket> = new Map();
10+
private pendingNotifications: Map<string, any> = new Map();
11+
12+
constructor(server: http.Server) {
13+
this.wss = new WebSocket.Server({ server });
14+
this.init();
15+
}
16+
17+
private init() {
18+
this.wss.on("connection", (ws: CustomWebSocket) => {
19+
console.log("New WebSocket connection established");
20+
21+
ws.on("message", (message: WebSocket.Data) => {
22+
try {
23+
const data = JSON.parse(message.toString());
24+
if (data.type === "register" && typeof data.userId === "string") {
25+
ws.userId = data.userId;
26+
this.clients.set(data.userId, ws);
27+
console.log(`User ${data.userId} registered for match updates`);
28+
this.sendPendingNotification(data.userId, ws);
29+
} else if (
30+
data.type === "cancel" &&
31+
typeof data.userId === "string"
32+
) {
33+
this.handleCancelRequest(data.userId);
34+
}
35+
} catch (error) {
36+
console.error("Error processing message:", error);
37+
}
38+
});
39+
40+
ws.on("close", () => {
41+
if (ws.userId) {
42+
console.log(`WebSocket connection closed for user: ${ws.userId}`);
43+
this.clients.delete(ws.userId);
44+
} else {
45+
console.log("WebSocket connection closed for unregistered user");
46+
}
47+
});
48+
});
49+
}
50+
51+
private sendPendingNotification(userId: string, ws: CustomWebSocket) {
52+
const pendingNotification = this.pendingNotifications.get(userId);
53+
if (pendingNotification) {
54+
console.log(`Sending pending notification to user: ${userId}`);
55+
ws.send(
56+
JSON.stringify({
57+
type: "match",
58+
data: pendingNotification,
59+
}),
60+
);
61+
this.pendingNotifications.delete(userId);
62+
}
63+
}
64+
65+
public async notifyMatch(user1Id: string, user2Id: string, matchData: any) {
66+
console.log(`Attempting to notify users: ${user1Id} and ${user2Id}`);
67+
68+
const notifyUser = async (userId: string) => {
69+
const client = this.clients.get(userId);
70+
if (client) {
71+
console.log(`Sending match notification to user: ${userId}`);
72+
client.send(
73+
JSON.stringify({
74+
type: "match",
75+
data: matchData,
76+
}),
77+
);
78+
this.pendingNotifications.delete(userId);
79+
} else {
80+
console.log(`User ${userId} not connected, queueing notification`);
81+
this.pendingNotifications.set(userId, matchData);
82+
83+
for (let i = 0; i < 10; i++) {
84+
await new Promise((resolve) => setTimeout(resolve, 3000));
85+
const client = this.clients.get(userId);
86+
if (client) {
87+
console.log(
88+
`Sending delayed match notification to user: ${userId}`,
89+
);
90+
client.send(
91+
JSON.stringify({
92+
type: "match",
93+
data: matchData,
94+
}),
95+
);
96+
this.pendingNotifications.delete(userId);
97+
break;
98+
}
99+
}
100+
if (!this.clients.get(userId)) {
101+
console.log(
102+
`Failed to send notification to user: ${userId} after multiple attempts`,
103+
);
104+
}
105+
}
106+
};
107+
108+
await notifyUser(user1Id);
109+
await notifyUser(user2Id);
110+
}
111+
112+
private handleCancelRequest(userId: string) {
113+
console.log(`Cancellation request received for user: ${userId}`);
114+
// Remove the user from the clients map
115+
this.clients.delete(userId);
116+
// Remove any pending notifications for this user
117+
this.pendingNotifications.delete(userId);
118+
// TODO: Implement logic to remove the user from Redis queues
119+
this.removeUserFromQueues(userId);
120+
}
121+
122+
private async removeUserFromQueues(userId: string) {
123+
// This method should remove the user from all Redis queues
124+
// You'll need to implement this logic based on your Redis structure
125+
// For example:
126+
const allTopics = await redisClient.keys("topic:*");
127+
const allDifficulties = await redisClient.keys("difficulty:*");
128+
129+
const removeFromQueue = async (key: string) => {
130+
const users = await redisClient.zRange(key, 0, -1);
131+
for (const userString of users) {
132+
const user = JSON.parse(userString);
133+
if (user._id === userId) {
134+
await removeUserFromKey(key, userString);
135+
console.log(`Removed user ${userId} from queue ${key}`);
136+
break;
137+
}
138+
}
139+
};
140+
141+
for (const key of [...allTopics, ...allDifficulties]) {
142+
await removeFromQueue(key);
143+
}
144+
}
145+
}
146+
147+
export default WebSocketService;

peerprep-fe/src/app/(main)/match/page.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export default function LoadingPage() {
1515
const [matchData, setMatchData] = useState<MatchData | null>(null);
1616
const router = useRouter();
1717
const { user } = useAuthStore();
18-
const { isConnected, lastMessage, sendMessage } = useWebSocket(
18+
const { isConnected, lastMessage, sendMessage, disconnect } = useWebSocket(
1919
process.env.NEXT_PUBLIC_MATCHING_SERVICE_WS_URL || 'ws://localhost:5001/ws',
2020
);
2121

@@ -73,6 +73,10 @@ export default function LoadingPage() {
7373

7474
const handleCancel = () => {
7575
console.log('Matching cancelled');
76+
if (isConnected) {
77+
sendMessage({ type: 'cancel', userId: user?.id });
78+
disconnect();
79+
}
7680
router.push('/');
7781
};
7882

peerprep-fe/src/hooks/useWebSocket.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ export function useWebSocket(url: string) {
77
const [isConnected, setIsConnected] = useState(false);
88
const [lastMessage, setLastMessage] = useState<Message | null>(null);
99

10+
const disconnect = useCallback(() => {
11+
if (ws) {
12+
ws.close();
13+
setWs(null);
14+
setIsConnected(false);
15+
}
16+
}, [ws]);
17+
1018
useEffect(() => {
1119
const websocket = new WebSocket(url);
1220
setWs(websocket);
@@ -29,5 +37,5 @@ export function useWebSocket(url: string) {
2937
[ws, isConnected],
3038
);
3139

32-
return { isConnected, lastMessage, sendMessage };
40+
return { isConnected, lastMessage, sendMessage, disconnect };
3341
}

0 commit comments

Comments
 (0)