Skip to content

Commit c0c3fc2

Browse files
authored
Merge pull request #70 from CS3219-AY2425S1/collab-backend
Add backend for collab: Inter-service communication (Kafka) and Collab service
2 parents 0ecbfce + ce695cc commit c0c3fc2

32 files changed

+3995
-216
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ DB_PORT=27017
1515
PEERPREP_QUESTION_INITDB_NAME=peerprepQuestionServiceDB # must match question service .env file and init-mongo.js
1616
PEERPREP_USER_INITDB_NAME=peerprepUserServiceDB # must match user service .env file and init-mongo.js
1717
PEERPREP_MATCHING_INITDB_NAME=peerprepMatchingServiceDB # must match user service .env file and init-mongo.js
18+
PEERPREP_COLLABORATION_INITDB_NAME=peerprepCollaborationServiceDB # must match collab service .env file and init-mongo.js

README.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ This project follows a microservices architecture with the following services:
77
2. **User Service** - Port `3001`
88
3. **Question Service** - Port `3002`
99
4. **Matching Service** - Port `3003`
10-
5. **MongoDB** - Port `27017` (Database)
11-
6. **Nginx API Gateway** - Port `80`
12-
7. **Redis** - Port `6379`
10+
5. **Collaboration Service** - Port `3004`
11+
6. **MongoDB** - Port `27017` (Database)
12+
7. **Nginx API Gateway** - Port `80`
13+
8. **Redis** - Port `6379`
14+
9. **Zookeeper** - Port `2181`
15+
10. **Kafka** - Port `9092`, Port `29092`
1316

1417
### Setting up the Project
1518
Copy and paste the .env.example files in each service. Rename them as .env files.
@@ -19,6 +22,7 @@ Files to do this in:
1922
3. /backend/user-service
2023
4. /backend/question-service
2124
5. /backend/matching-service
25+
6. /backend/collaboration-service
2226
Then, run `node -e "console.log(require('crypto').randomBytes(32).toString('hex'))"` twice to generate
2327
your 2 JWT token secrets. For the first one, paste it into the JWT_ACCESS_TOKEN_SECRET variable of
2428
the .env files in question-service and user-service. Then, copy the second into the
@@ -39,11 +43,14 @@ Once the containers are up:
3943
- User Service: [http://localhost:3001](http://localhost:3001)
4044
- Question Service: [http://localhost:3002](http://localhost:3002)
4145
- Matching Service: [http://localhost:3003](http://localhost:3003)
46+
- Collaboration Service: [http://localhost:3004](http://localhost:3004)
4247
- MongoDB: [http://localhost:27017](http://localhost:27017)
4348
- Nginx API Gateway: [http://localhost:80](http://localhost:80)
4449
- Redis: [http://localhost:6379](http://localhost:6379)
50+
- Zookeeper: [http://localhost:2181](http://localhost:2181)
51+
- Kafka: [http://localhost:9092](http://localhost:9092)
4552

46-
Note that even after docker says that everything is up and running, there is a risk that they aren't when you load the frontend.
53+
Note that even after docker says that everything is up and running, there is a risk that they aren't when you load the frontend. Wait for the frontend logs to show up in the docker logs.
4754
In this event, wait for about a minute before trying again. If that still doesn't work and there are network errors, try
4855
rebuilding the services by running `docker-compose up --build` again.
4956

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Database Connection String Details
2+
DATABASE_NAME=peerprepCollaborationServiceDB
3+
4+
# Port to run service on
5+
PORT=3004
6+
7+
# Kafka configuration
8+
KAFKA_HOST=localhost
9+
KAFKA_PORT=9092
10+
11+
# Copy root .env
12+
# If using mongoDB containerization, set to true. Else set to false (i.e local testing)
13+
DB_REQUIRE_AUTH=true
14+
15+
# mongoDB auth variables
16+
MONGO_USER=user
17+
MONGO_PASSWORD=password
18+
MONGO_INITDB_ROOT_USERNAME=admin # must match docker-compose.yml and init-mongo.js
19+
MONGO_INITDB_ROOT_PASSWORD=password # must match docker-compose.yml and init-mongo.js
20+
21+
# mongoDB connection string variables
22+
DB_HOST=localhost
23+
DB_PORT=27017
24+
25+
# Service database names
26+
PEERPREP_QUESTION_INITDB_NAME=peerprepQuestionServiceDB # must match question service .env file and init-mongo.js
27+
PEERPREP_USER_INITDB_NAME=peerprepUserServiceDB # must match user service .env file and init-mongo.js
28+
PEERPREP_MATCHING_INITDB_NAME=peerprepMatchingServiceDB # must match user service .env file and init-mongo.js
29+
PEERPREP_COLLABORATION_INITDB_NAME=peerprepCollaborationServiceDB # must match collab service .env file and init-mongo.js
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
FROM node:20-alpine AS base
2+
WORKDIR /app
3+
COPY package*.json ./
4+
RUN npm install
5+
COPY . .
6+
7+
# For development environment
8+
FROM base AS dev
9+
EXPOSE 3004
10+
CMD ["npm", "run", "dev"]
11+
12+
# For production environment
13+
FROM base AS build
14+
RUN npm run build
15+
16+
FROM node:20-alpine AS prod
17+
WORKDIR /app
18+
# Copy only the built files and necessary files for production
19+
COPY --from=build /app/dist ./dist
20+
COPY package*.json ./
21+
# Install only production dependencies
22+
RUN npm install --only=production
23+
EXPOSE 3004
24+
CMD ["npm", "start"]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import cors from "cors";
2+
import express, { Express } from "express";
3+
import dotenv from "dotenv";
4+
import path from "path";
5+
import http from "http";
6+
import { WebSocket } from "ws";
7+
import collaborationRoutes from "./routes/collaborationRoutes";
8+
import { errorHandler } from "./middlewares/errorHandler";
9+
import { connectToDatabase } from "./utils/database";
10+
import { setUpKafkaSubscribers } from "./utils/kafkaClient";
11+
const setupWSConnection = require("y-websocket/bin/utils").setupWSConnection;
12+
13+
14+
dotenv.config({ path: path.resolve(__dirname, "./.env") });
15+
16+
connectToDatabase();
17+
18+
const port = process.env.PORT || 3004;
19+
const app: Express = express();
20+
21+
const server = http.createServer(app);
22+
23+
app.use(express.json());
24+
25+
app.use("/api/collab", collaborationRoutes);
26+
27+
app.use(
28+
cors({
29+
origin: "*",
30+
credentials: true,
31+
})
32+
);
33+
34+
app.use(errorHandler);
35+
36+
const wss = new WebSocket.Server({ server });
37+
38+
wss.on("connection", (ws, req) => {
39+
// // Extract sessionId from the URL for connecting to the Yjs document
40+
// const sessionId = req.url?.split('/').pop();
41+
42+
// console.log(`A user has connected to session: ${sessionId}!`)
43+
44+
// if (sessionId) {
45+
// // Initialize or get the Yjs document from YDocManager
46+
// const ydoc = YDocManager.getDoc(sessionId) || YDocManager.initializeDoc(sessionId);
47+
48+
// // Set up the document synchronization logic
49+
// const encoder = Y.encodeStateAsUpdate(ydoc);
50+
// ws.send(encoder);
51+
52+
// ws.on('message', (message) => {
53+
// const update = new Uint8Array(message instanceof ArrayBuffer ? message : (message as Buffer).buffer);
54+
// Y.applyUpdate(ydoc, update);
55+
// wss.clients.forEach((client) => {
56+
// if (client !== ws && client.readyState === WebSocket.OPEN) {
57+
// client.send(update);
58+
// }
59+
// });
60+
// });
61+
62+
// ws.on('close', () => {
63+
// console.log(`Connection closed for session: ${sessionId}`);
64+
// });
65+
66+
// } else {
67+
// console.error("No session ID found in WebSocket connection URL.");
68+
// ws.close();
69+
// }
70+
setupWSConnection(ws, req);
71+
});
72+
73+
// Start the server
74+
server.listen(port, async () => {
75+
console.log(`Collaboration service running on http://localhost:${port}`);
76+
77+
await setUpKafkaSubscribers();
78+
});
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// controllers/collaborationController.js
2+
import { EachMessagePayload } from 'kafkajs';
3+
import { COLLAB_TOPIC, producer } from '../utils/kafkaClient';
4+
import Session from '../models/Session';
5+
6+
7+
export const createSession = async ( matchId: string, userIds: string[]) => {
8+
// Create unique sessionId:
9+
const sessionId = `session-${Date.now()}-${userIds.join('-')}`;
10+
11+
// Create new Session document in mongodb
12+
const newSession = new Session({ sessionId, matchId, userIds, codeContent: '' });
13+
await newSession.save();
14+
15+
return sessionId;
16+
};
17+
18+
/**
19+
* Process the message from the Kafka topic and create a new session.
20+
* Emits a Kafka message back to the matching service with the session ID.
21+
* @param message - Kafka message payload
22+
*/
23+
export async function handleMatchNotification(message: EachMessagePayload) {
24+
/**
25+
* message contains all the info from the kafka message from matching-service.
26+
* message looks like this:
27+
* {
28+
* topic: 'match_topic',
29+
* message: {
30+
* key: matchId,
31+
* value: {
32+
* user1: { userId: user1.userId, socketId: user1.socketId },
33+
user2: { userId: user2.userId, socketId: user2.socketId },
34+
category: 'algo',
35+
difficulty: 'easy',
36+
matchId: 123 (mongoDB Id for match),
37+
* }
38+
* }
39+
* }
40+
*/
41+
console.log("Collab service creating session");
42+
43+
// Validation for message format
44+
const matchId = message.message.key?.toString();
45+
if (!matchId || !message.message.value) {
46+
console.error("No match ID/value found in message.");
47+
return;
48+
}
49+
50+
// Create session and get sessionId
51+
const messageValue = JSON.parse(message.message.value.toString());
52+
53+
const user1Id = messageValue.user1?.userId;
54+
const user2Id = messageValue.user2?.userId;
55+
56+
if (!user1Id || !user2Id) {
57+
console.error("User IDs not found in message.");
58+
return;
59+
}
60+
61+
const sessionId = await createSession(matchId, [user1Id, user2Id]);
62+
63+
// Send the session ID back to the matching service
64+
const messageBody = JSON.stringify({ sessionId });
65+
await producer.send({
66+
topic: COLLAB_TOPIC,
67+
messages: [
68+
{
69+
key: matchId,
70+
value: messageBody,
71+
},
72+
],
73+
});
74+
75+
console.log(`Sent Session ID ${sessionId} to collab service.`); // modify
76+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { Request, Response, NextFunction } from "express";
2+
3+
export function errorHandler(err: any, req: Request, res: Response, next: NextFunction) {
4+
if (err instanceof Error) {
5+
console.error("Error:", err.message);
6+
return res.status(400).json({ message: err.message });
7+
} else {
8+
console.error("Unexpected error:", err);
9+
return res.status(500).json({ message: "An unexpected error occurred." });
10+
}
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import mongoose, { Types } from "mongoose";
2+
3+
const sessionSchema = new mongoose.Schema({
4+
matchId: { type: String, required: true },
5+
sessionId: { type: String, required: true },
6+
codeContent: { type: String, default: "" },
7+
lastUpdated: { type: Date, default: Date.now }
8+
});
9+
10+
const Session = mongoose.model('session', sessionSchema);
11+
12+
export default Session;

0 commit comments

Comments
 (0)