Skip to content

Commit 9a4a8ec

Browse files
authored
Merge pull request #75 from CS3219-AY2425S1/feat/fe_to_rabbitmq
Feat/fe to rabbitmq
2 parents c8c689e + 6fe25a7 commit 9a4a8ec

File tree

4 files changed

+69
-9
lines changed

4 files changed

+69
-9
lines changed

matching-service/src/services/matchingService.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,11 @@ export const processOldUsers = async (): Promise<void> => {
5656
// Send to user subscribed queues if there is a match
5757
await sendToQueue(match._id, {
5858
status: "matched",
59-
user1: match,
60-
user2: user,
59+
match: user,
6160
});
6261
await sendToQueue(user._id, {
6362
status: "matched",
64-
user1: match,
65-
user2: user,
63+
match: match,
6664
});
6765
continue;
6866
}
@@ -101,13 +99,11 @@ export const processNewUser = async (user: User): Promise<void> => {
10199
if (match) {
102100
await sendToQueue(match._id, {
103101
status: "matched",
104-
user1: match,
105-
user2: user,
102+
match: user,
106103
});
107104
await sendToQueue(user._id, {
108105
status: "matched",
109-
user1: match,
110-
user2: user,
106+
match: match,
111107
});
112108
} else {
113109
// Add to the topic queue if no match

matching-service/src/services/rabbitMqService.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ export const sendToQueue = async (
105105

106106
await channel.assertQueue(queue, {
107107
durable: true,
108-
expires: 300000, //expire after 5 minutes of idle
109108
});
110109

111110
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)));

peerprep-fe/src/app/(main)/match/page.tsx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,49 @@
11
'use client';
22

33
import { useState, useEffect } from 'react';
4+
import { useRouter } from 'next/navigation';
45
import { User, Code } from 'lucide-react';
6+
import { consumeMessageFromQueue } from '@/lib/rabbitmq';
57

68
export default function LoadingPage() {
79
const [elapsedTime, setElapsedTime] = useState(0);
810
const [usersWaiting, setUsersWaiting] = useState(4);
11+
const router = useRouter();
12+
13+
const startConsumingMessages = async () => {
14+
try {
15+
await consumeMessageFromQueue().then((message) => {
16+
// This function is called when a message is consumed
17+
if (message.status == 'matched') {
18+
console.log('Match found, your partner is');
19+
router.push('/');
20+
} else {
21+
console.log('Match failed');
22+
router.push('/');
23+
}
24+
});
25+
} catch (error) {
26+
console.error('Error consuming message:', error);
27+
}
28+
};
929

1030
useEffect(() => {
31+
startConsumingMessages();
1132
const timer = setInterval(() => {
1233
setElapsedTime((prevTime) => prevTime + 1);
1334
}, 1000);
1435
setUsersWaiting(5);
1536
return () => clearInterval(timer);
1637
}, []);
1738

39+
useEffect(() => {
40+
if (elapsedTime >= 60) {
41+
// Execute your action here
42+
console.log('Elapsed time reached 60 seconds. Going back to main page');
43+
router.push('/');
44+
}
45+
}, [elapsedTime]);
46+
1847
return (
1948
<div className="flex min-h-screen flex-col bg-[#1a1f2e] text-gray-300">
2049
<header className="flex items-center justify-between border-b border-gray-700 p-4">

peerprep-fe/src/lib/rabbitmq.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,39 @@ export const sendMessageToQueue = async (message: Record<string, any>) => {
3434
throw err;
3535
}
3636
};
37+
38+
export const consumeMessageFromQueue = async () => {
39+
return new Promise<any>((resolve, reject) => {
40+
(async () => {
41+
try {
42+
// Connect to RabbitMQ server
43+
const connection = await connect(process.env.RABBITMQ_URL);
44+
const channel = await connection.createChannel();
45+
const queue = process.env.MATCHING_SERVICE_QUEUE;
46+
47+
// Ensure the queue exists
48+
await channel.assertQueue(queue, { durable: true });
49+
50+
// Consume messages from the queue
51+
console.log(`Waiting for messages in ${queue}...`);
52+
channel.consume(
53+
queue,
54+
(msg: any) => {
55+
if (msg !== null) {
56+
const messageContent = JSON.parse(msg.content.toString());
57+
console.log(`Received:`, messageContent);
58+
channel.ack(msg);
59+
resolve(messageContent); // Resolve the Promise with the message content
60+
}
61+
},
62+
{
63+
noAck: false,
64+
},
65+
);
66+
} catch (error) {
67+
console.error('Error in consuming messages:', error);
68+
reject(error); // Reject the Promise on error
69+
}
70+
})();
71+
});
72+
};

0 commit comments

Comments
 (0)