Skip to content

Commit c807af8

Browse files
Add shared and priority queues logic
1 parent fcafa72 commit c807af8

File tree

1 file changed

+201
-34
lines changed

1 file changed

+201
-34
lines changed

backend-nestjs/src/socket/socket.gateway.ts

Lines changed: 201 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ export class SocketGateway
5757
*/
5858
private sharedQueue: Queue<QueueMember> = new Queue<QueueMember>();
5959

60+
/**
61+
* The priority queue that will be used to store the clients waiting for a book with priority
62+
* (e.g. Premium users)
63+
* @type {Queue<QueueMember>}
64+
*/
65+
private priorityQueue: Queue<QueueMember> = new Queue<QueueMember>();
66+
6067
/**
6168
* Creates an instance of SocketGateway.
6269
*
@@ -82,8 +89,8 @@ export class SocketGateway
8289
* @returns {void}
8390
*/
8491
public onModuleInit(): void {
85-
// Start processing the queue
86-
this.processQueue();
92+
// Start processing the queues.
93+
this.processQueues();
8794
}
8895

8996
/**
@@ -139,18 +146,28 @@ export class SocketGateway
139146
// Remove the client from the queue
140147
this.removeFromQueue(client);
141148

149+
// Send the updated list of users to all clients
142150
this.server.emit('users', this.socketService.getUsers());
143151
}
144152

153+
/**
154+
* Event called when a client joins the shared queue
155+
*
156+
* @SubscribeMessage 'joinSharedQueue'
157+
* @param {Socket} client - The client socket
158+
* @returns {Promise<void>}
159+
*/
145160
@SubscribeMessage('joinSharedQueue')
146-
public async joinQueue(@ConnectedSocket() client: Socket): Promise<void> {
161+
public async joinSharedQueue(
162+
@ConnectedSocket() client: Socket,
163+
): Promise<void> {
147164
// Get the user ID from the client
148165
const userId: string = this.socketService.getUserId(client.id);
149166
if (!userId) return;
150167

151168
if (this.sharedQueue.getItems().some((item) => item.userId === userId)) {
152169
// Notify the client that the user is already in the queue
153-
client.emit('queueStatus', {
170+
client.emit('sharedQueueStatus', {
154171
status: 'alreadyInQueue',
155172
userId,
156173
});
@@ -167,54 +184,189 @@ export class SocketGateway
167184
this.sharedQueue.enqueue({ instanceId, client, userId });
168185

169186
console.log(
170-
`Client added to queue. ID #${instanceId}. Queue size: ${this.sharedQueue.size()}`,
187+
`Client added to shared queue. ID #${instanceId}. Queue size: ${this.sharedQueue.size()}`,
171188
);
172189

173190
// Notify the clients with the updated queue size
174-
this.server.emit('queue', {
191+
this.server.emit('sharedQueue', {
175192
size: this.sharedQueue.size(),
176193
});
194+
195+
// Get the user's socket IDs
196+
const userSocketIds: string[] = this.socketService.getUserSocketIds(userId);
197+
if (!userSocketIds.length) return;
198+
199+
// Notify the client with their position in the queue
200+
userSocketIds.forEach((socketId: string) => {
201+
this.server.to(socketId).emit('sharedQueueStatus', {
202+
status: 'queued',
203+
userId,
204+
position: this.sharedQueue.size(),
205+
});
206+
});
177207
}
178208

179-
public async processQueue(): Promise<void> {
180-
while (true) {
181-
if (!this.sharedQueue.isEmpty()) {
182-
// Get the first client in the queue
183-
const instance: QueueMember = this.sharedQueue.peek();
209+
/**
210+
* Event called when a client joins the priority queue
211+
*
212+
* @SubscribeMessage 'joinPriorityQueue'
213+
* @param {Socket} client - The client socket
214+
* @returns {Promise<void>}
215+
*/
216+
@SubscribeMessage('joinPriorityQueue')
217+
public async joinPriorityQueue(
218+
@ConnectedSocket() client: Socket,
219+
): Promise<void> {
220+
// Get the user ID from the client
221+
const userId: string = this.socketService.getUserId(client.id);
222+
if (!userId) return;
223+
224+
if (this.priorityQueue.getItems().some((item) => item.userId === userId)) {
225+
// Notify the client that the user is already in the queue
226+
client.emit('priorityQueueStatus', {
227+
status: 'alreadyInQueue',
228+
userId,
229+
});
230+
231+
console.log(`User ${userId} is already in the priority queue.`);
232+
233+
return;
234+
}
235+
236+
// Generate a unique instance ID for the client
237+
const instanceId: string = uuidv4();
238+
239+
// Add the client to the priority queue
240+
this.priorityQueue.enqueue({ instanceId, client, userId });
241+
242+
console.log(
243+
`Client added to priority queue. ID #${instanceId}. Queue size: ${this.priorityQueue.size()}`,
244+
);
184245

185-
if (instance.client) {
186-
console.log(
187-
`Processing user ${instance.userId} from queue. ID #${instance.instanceId}. Queue size: ${this.sharedQueue.size()}`,
188-
);
246+
// Notify the clients with the updated queue size
247+
this.server.emit('priorityQueue', {
248+
size: this.priorityQueue.size(),
249+
});
189250

190-
// Call the generate method to simulate processing
191-
await this.generate(instance);
251+
const userSocketIds: string[] = this.socketService.getUserSocketIds(userId);
252+
if (!userSocketIds.length) return;
192253

193-
// Remove the client from the queue
194-
this.sharedQueue.dequeue();
254+
// Notify the client with their position in the queue
255+
userSocketIds.forEach((socketId: string) => {
256+
this.server.to(socketId).emit('priorityQueueStatus', {
257+
status: 'queued',
258+
userId,
259+
position: this.priorityQueue.size(),
260+
});
261+
});
262+
}
195263

196-
// Notify all clients of the updated queue size
197-
this.server.emit('queue', {
198-
size: this.sharedQueue.size(),
199-
});
200-
}
201-
} else await this.sleep(1000);
264+
/**
265+
* Checks if any clients are in the queues and processes them.
266+
* This method is called when the module is initialized.
267+
* It will keep checking the queues for clients and process them when they are available.
268+
* Priority clients will be processed first before shared clients.
269+
*
270+
* @uses processPriorityQueue
271+
* @uses processSharedQueue
272+
* @returns {Promise<void>}
273+
*/
274+
public async processQueues(): Promise<void> {
275+
while (true) {
276+
// Process the priority queue
277+
if (!this.priorityQueue.isEmpty()) await this.processPriorityQueue();
278+
// Process the shared queue if the priority queue is empty
279+
else if (!this.sharedQueue.isEmpty()) await this.processSharedQueue();
280+
// If both queues are empty, wait for a second before checking again
281+
else await this.sleep(1000);
202282

203283
// Wait for the next iteration of the loop, this is useful for preventing the event loop from blocking
204284
await new Promise(setImmediate);
205285
}
206286
}
207287

208-
public async generate(instance: QueueMember): Promise<void> {
209-
// Simulate processing with a delay, replace this with actual processing logic
210-
return new Promise((resolve) => {
211-
setTimeout(() => {
212-
// Notify the client that the user has been processed
213-
instance.client.emit('queueStatus', {
288+
/**
289+
* Processes the clients in the priority queue.
290+
*
291+
* @uses generate
292+
* @returns {Promise<void>}
293+
*/
294+
public async processPriorityQueue(): Promise<void> {
295+
if (!this.priorityQueue.isEmpty()) {
296+
// Get the first client in the priority queue
297+
const instance: QueueMember = this.priorityQueue.peek();
298+
299+
if (instance.client) {
300+
console.log(
301+
`Processing user ${instance.userId} from priority queue. ID #${instance.instanceId}. Queue size: ${this.priorityQueue.size()}`,
302+
);
303+
304+
// Call the generate method to simulate processing
305+
await this.generate(instance);
306+
307+
// Remove the client from the queue
308+
this.priorityQueue.dequeue();
309+
310+
// Notify all clients of the updated priority queue size
311+
this.server.emit('priorityQueue', {
312+
size: this.priorityQueue.size(),
313+
});
314+
315+
// Notify the priority clients that a user has been processed
316+
this.server.emit('priorityQueueStatus', {
317+
status: 'processed',
318+
userId: instance.userId,
319+
});
320+
}
321+
}
322+
}
323+
324+
/**
325+
* Processes the clients in the shared queue.
326+
*
327+
* @uses generate
328+
* @returns {Promise<void>}
329+
*/
330+
public async processSharedQueue(): Promise<void> {
331+
if (!this.sharedQueue.isEmpty()) {
332+
// Get the first client in the shared queue
333+
const instance: QueueMember = this.sharedQueue.peek();
334+
335+
if (instance.client) {
336+
console.log(
337+
`Processing user ${instance.userId} from shared queue. ID #${instance.instanceId}. Queue size: ${this.sharedQueue.size()}`,
338+
);
339+
340+
// Call the generate method to simulate processing
341+
await this.generate(instance);
342+
343+
// Remove the client from the queue
344+
this.sharedQueue.dequeue();
345+
346+
// Notify all clients of the updated shared queue size
347+
this.server.emit('sharedQueue', {
348+
size: this.sharedQueue.size(),
349+
});
350+
351+
// Notify the shared clients that a user has been processed
352+
this.server.emit('sharedQueueStatus', {
214353
status: 'processed',
215354
userId: instance.userId,
216355
});
356+
}
357+
}
358+
}
217359

360+
/**
361+
* Simulates processing a client.
362+
*
363+
* @param {QueueMember} instance - The client instance
364+
* @returns {Promise<void>}
365+
*/
366+
public async generate(instance: QueueMember): Promise<void> {
367+
// Simulate processing with a delay, replace this with actual processing logic
368+
return new Promise((resolve) => {
369+
setTimeout(() => {
218370
console.log(
219371
`User ${instance.userId} processed. ID #${instance.instanceId}.`,
220372
);
@@ -225,21 +377,36 @@ export class SocketGateway
225377
}
226378

227379
/**
228-
* Removes a client from the queue with the specified client ID.
380+
* Removes a client from the queues if they are in any.
229381
*
230382
* @param {Socket} client - The client socket
231383
* @returns {void}
232384
*/
233385
private removeFromQueue(client: Socket): void {
234-
// Remove the client from the queue with the specified client ID
386+
// Remove the client from the shared queue with the specified client ID
235387
this.sharedQueue = new Queue<QueueMember>(
236388
this.sharedQueue
237389
.getItems()
238390
.filter((queueItem: QueueMember) => queueItem.client.id !== client.id),
239391
);
240392

393+
this.server.emit('sharedQueue', {
394+
size: this.sharedQueue.size(),
395+
});
396+
397+
// Remove the client from the priority queue with the specified client ID
398+
this.priorityQueue = new Queue<QueueMember>(
399+
this.priorityQueue
400+
.getItems()
401+
.filter((queueItem: QueueMember) => queueItem.client.id !== client.id),
402+
);
403+
404+
this.server.emit('priorityQueue', {
405+
size: this.priorityQueue.size(),
406+
});
407+
241408
console.log(
242-
`Client removed from queue. Queue size: ${this.sharedQueue.size()}`,
409+
`Client removed from queue. Shared queue size: ${this.sharedQueue.size()}, Priority queue size: ${this.priorityQueue.size()}.`,
243410
);
244411
}
245412

0 commit comments

Comments
 (0)